/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
implements ProcessorSupplier<KO, SubscriptionWrapper<K>> {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStoreReceiveProcessorSupplier.class);
    private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
    private final CombinedKeySchema<KO, K> keySchema;

    public SubscriptionStoreReceiveProcessorSupplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, CombinedKeySchema<KO, K> keySchema) {
        this.storeBuilder = storeBuilder;
        this.keySchema = keySchema;
    }

    @Override
    public Processor<KO, SubscriptionWrapper<K>> get() {
        return new AbstractProcessor<KO, SubscriptionWrapper<K>>(){
            private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
            private Sensor droppedRecordsSensor;

            @Override
            public void init(ProcessorContext context) {
                super.init(context);
                InternalProcessorContext internalProcessorContext = (InternalProcessorContext)context;
                this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
                this.store = (TimestampedKeyValueStore)internalProcessorContext.getStateStore(SubscriptionStoreReceiveProcessorSupplier.this.storeBuilder);
                SubscriptionStoreReceiveProcessorSupplier.this.keySchema.init(context);
            }

            @Override
            public void process(KO key, SubscriptionWrapper<K> value) {
                if (key == null) {
                    LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", new Object[]{value, this.context().topic(), this.context().partition(), this.context().offset()});
                    this.droppedRecordsSensor.record();
                    return;
                }
                if (value.getVersion() != 0) {
                    throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
                }
                Bytes subscriptionKey = SubscriptionStoreReceiveProcessorSupplier.this.keySchema.toBytes(key, value.getPrimaryKey());
                ValueAndTimestamp newValue = ValueAndTimestamp.make(value, this.context().timestamp());
                ValueAndTimestamp oldValue = (ValueAndTimestamp)this.store.get(subscriptionKey);
                if (value.getInstruction().equals((Object)SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) || value.getInstruction().equals((Object)SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
                    this.store.delete(subscriptionKey);
                } else {
                    this.store.put(subscriptionKey, (SubscriptionWrapper<Bytes>)((Object)newValue));
                }
                Change<ValueAndTimestamp> change = new Change<ValueAndTimestamp>(newValue, oldValue);
                this.context().forward(new CombinedKey(key, value.getPrimaryKey()), change, To.all().withTimestamp(newValue.timestamp()));
            }
        };
    }
}

