/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;

public class InternalStreamsBuilder
implements InternalNameProvider {
    final InternalTopologyBuilder internalTopologyBuilder;
    private final AtomicInteger index = new AtomicInteger(0);

    public InternalStreamsBuilder(InternalTopologyBuilder internalTopologyBuilder) {
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    public <K, V> KStream<K, V> stream(Collection<String> topics, ConsumedInternal<K, V> consumed) {
        String name = this.newProcessorName("KSTREAM-SOURCE-");
        this.internalTopologyBuilder.addSource(consumed.offsetResetPolicy(), name, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topics.toArray(new String[topics.size()]));
        return new KStreamImpl(this, name, Collections.singleton(name), false);
    }

    public <K, V> KStream<K, V> stream(Pattern topicPattern, ConsumedInternal<K, V> consumed) {
        String name = this.newProcessorName("KSTREAM-SOURCE-");
        this.internalTopologyBuilder.addSource(consumed.offsetResetPolicy(), name, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topicPattern);
        return new KStreamImpl(this, name, Collections.singleton(name), false);
    }

    public <K, V> KTable<K, V> table(String topic, ConsumedInternal<K, V> consumed, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<K, V>(materialized).materialize();
        String source = this.newProcessorName("KSTREAM-SOURCE-");
        String name = this.newProcessorName("KTABLE-SOURCE-");
        KTable<K, V> kTable = this.createKTable(consumed, topic, storeBuilder.name(), materialized.isQueryable(), source, name);
        this.internalTopologyBuilder.addStateStore(storeBuilder, name);
        this.internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
        return kTable;
    }

    private <K, V> KTable<K, V> createKTable(ConsumedInternal<K, V> consumed, String topic, String storeName, boolean isQueryable, String source, String name) {
        KTableSource processorSupplier = new KTableSource(storeName);
        this.internalTopologyBuilder.addSource(consumed.offsetResetPolicy(), source, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topic);
        this.internalTopologyBuilder.addProcessor(name, processorSupplier, source);
        return new KTableImpl(this, name, processorSupplier, consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
    }

    public <K, V> GlobalKTable<K, V> globalTable(String topic, ConsumedInternal<K, V> consumed, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(consumed, "consumed can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        materialized.withLoggingDisabled();
        StoreBuilder<KeyValueStore> storeBuilder = new KeyValueStoreMaterializer<K, V>(materialized).materialize();
        String sourceName = this.newProcessorName("KSTREAM-SOURCE-");
        String processorName = this.newProcessorName("KTABLE-SOURCE-");
        KTableSource tableSource = new KTableSource(storeBuilder.name());
        this.internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topic, processorName, tableSource);
        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier(storeBuilder.name()), materialized.isQueryable());
    }

    @Override
    public String newProcessorName(String prefix) {
        return prefix + String.format("%010d", this.index.getAndIncrement());
    }

    @Override
    public String newStoreName(String prefix) {
        return prefix + String.format("STATE-STORE-%010d", this.index.getAndIncrement());
    }

    public synchronized void addStateStore(StoreBuilder builder) {
        this.internalTopologyBuilder.addStateStore(builder, new String[0]);
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String sourceName, String topic, ConsumedInternal consumed, String processorName, ProcessorSupplier stateUpdateSupplier) {
        storeBuilder.withLoggingDisabled();
        this.internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topic, processorName, stateUpdateSupplier);
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String topic, ConsumedInternal consumed, ProcessorSupplier stateUpdateSupplier) {
        storeBuilder.withLoggingDisabled();
        String sourceName = this.newProcessorName("KSTREAM-SOURCE-");
        String processorName = this.newProcessorName("KTABLE-SOURCE-");
        this.addGlobalStore(storeBuilder, sourceName, topic, consumed, processorName, stateUpdateSupplier);
    }
}

