/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.TopicPartitionCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;

@Experimental
class WatchKafkaTopicPartitionDoFn
extends DoFn<KV<byte[], byte[]>, KafkaSourceDescriptor> {
    private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours((long)1L);
    private static final String TIMER_ID = "watch_timer";
    private static final String STATE_ID = "topic_partition_set";
    private final Duration checkDuration;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn;
    private final SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
    private final Map<String, Object> kafkaConsumerConfig;
    private final Instant startReadTime;
    private final Instant stopReadTime;
    private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
    private final List<String> topics;
    @DoFn.TimerId(value="watch_timer")
    private final TimerSpec timerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);
    @DoFn.StateId(value="topic_partition_set")
    private final StateSpec<BagState<TopicPartition>> bagStateSpec = StateSpecs.bag((Coder)new TopicPartitionCoder());

    WatchKafkaTopicPartitionDoFn(Duration checkDuration, SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn, SerializableFunction<TopicPartition, Boolean> checkStopReadingFn, Map<String, Object> kafkaConsumerConfig, Instant startReadTime, Instant stopReadTime, List<String> topics) {
        this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
        this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
        this.checkStopReadingFn = checkStopReadingFn;
        this.kafkaConsumerConfig = kafkaConsumerConfig;
        this.startReadTime = startReadTime;
        this.stopReadTime = stopReadTime;
        this.topics = topics;
    }

    @VisibleForTesting
    Set<TopicPartition> getAllTopicPartitions() {
        HashSet<TopicPartition> current = new HashSet<TopicPartition>();
        try (Consumer kafkaConsumer = (Consumer)this.kafkaConsumerFactoryFn.apply(this.kafkaConsumerConfig);){
            if (this.topics != null && !this.topics.isEmpty()) {
                for (String string : this.topics) {
                    for (PartitionInfo partition : kafkaConsumer.partitionsFor(string)) {
                        current.add(new TopicPartition(string, partition.partition()));
                    }
                }
            } else {
                for (Map.Entry entry : kafkaConsumer.listTopics().entrySet()) {
                    for (PartitionInfo partition : (List)entry.getValue()) {
                        current.add(new TopicPartition((String)entry.getKey(), partition.partition()));
                    }
                }
            }
        }
        return current;
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.TimerId(value="watch_timer") Timer timer, @DoFn.StateId(value="topic_partition_set") BagState<TopicPartition> existingTopicPartitions, DoFn.OutputReceiver<KafkaSourceDescriptor> outputReceiver) {
        Set<TopicPartition> current = this.getAllTopicPartitions();
        current.forEach(topicPartition -> {
            if (this.checkStopReadingFn == null || !((Boolean)this.checkStopReadingFn.apply(topicPartition)).booleanValue()) {
                Counter foundedTopicPartition = Metrics.counter((String)COUNTER_NAMESPACE, (String)topicPartition.toString());
                foundedTopicPartition.inc();
                existingTopicPartitions.add(topicPartition);
                outputReceiver.output((Object)KafkaSourceDescriptor.of(topicPartition, null, this.startReadTime, null, this.stopReadTime, null));
            }
        });
        timer.offset(this.checkDuration).setRelative();
    }

    @DoFn.OnTimer(value="watch_timer")
    public void onTimer(@DoFn.TimerId(value="watch_timer") Timer timer, @DoFn.StateId(value="topic_partition_set") BagState<TopicPartition> existingTopicPartitions, DoFn.OutputReceiver<KafkaSourceDescriptor> outputReceiver) {
        HashSet readingTopicPartitions = new HashSet();
        existingTopicPartitions.read().forEach(topicPartition -> readingTopicPartitions.add(topicPartition));
        existingTopicPartitions.clear();
        Set<TopicPartition> currentAll = this.getAllTopicPartitions();
        Sets.SetView newAdded = Sets.difference(currentAll, readingTopicPartitions);
        newAdded.forEach(topicPartition -> {
            if (this.checkStopReadingFn == null || !((Boolean)this.checkStopReadingFn.apply(topicPartition)).booleanValue()) {
                Counter foundedTopicPartition = Metrics.counter((String)COUNTER_NAMESPACE, (String)topicPartition.toString());
                foundedTopicPartition.inc();
                outputReceiver.output((Object)KafkaSourceDescriptor.of(topicPartition, null, this.startReadTime, null, this.stopReadTime, null));
            }
        });
        currentAll.forEach(topicPartition -> {
            if (this.checkStopReadingFn == null || !((Boolean)this.checkStopReadingFn.apply(topicPartition)).booleanValue()) {
                existingTopicPartitions.add(topicPartition);
            }
        });
        timer.set(Instant.now().plus((ReadableDuration)Duration.millis((long)this.checkDuration.getMillis())));
    }
}

