/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import joptsimple.AbstractOptionSpec;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.tools.ReplicaBuffer;
import kafka.tools.ReplicaFetcher;
import kafka.tools.TopicPartitionReplica;
import kafka.utils.CommandLineUtils$;
import kafka.utils.CoreUtils$;
import kafka.utils.Exit$;
import kafka.utils.IncludeList;
import kafka.utils.Logging;
import kafka.utils.ToolsUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.;
import scala.$less$colon$less$;
import scala.Console$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.convert.AsJavaExtensions;
import scala.collection.convert.AsScalaExtensions;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

public final class ReplicaVerificationTool$
implements Logging {
    public static final ReplicaVerificationTool$ MODULE$ = new ReplicaVerificationTool$();
    private static final String clientId = "replicaVerificationTool";
    private static final String dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS";
    private static final SimpleDateFormat dateFormat = new SimpleDateFormat(MODULE$.dateFormatString());
    private static Logger logger;
    private static String logIdent;
    private static volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!bitmap$0) {
                logger = Logging.logger$(this);
                bitmap$0 = true;
            }
        }
        return logger;
    }

    @Override
    public Logger logger() {
        if (!bitmap$0) {
            return this.logger$lzycompute();
        }
        return logger;
    }

    @Override
    public String logIdent() {
        return logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        logIdent = x$1;
    }

    public String clientId() {
        return clientId;
    }

    public String dateFormatString() {
        return dateFormatString;
    }

    public SimpleDateFormat dateFormat() {
        return dateFormat;
    }

    public String getCurrentTimeString() {
        return this.dateFormat().format(new Date(Time.SYSTEM.milliseconds()));
    }

    public void main(String[] args) {
        Seq<TopicDescription> seq;
        scala.collection.immutable.Map<Object, Node> map;
        OptionParser parser = new OptionParser(false);
        ArgumentAcceptingOptionSpec brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.").withRequiredArg().describedAs("hostname:port,...,hostname:port").ofType(String.class);
        ArgumentAcceptingOptionSpec fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.").withRequiredArg().describedAs("bytes").ofType(Integer.class).defaultsTo((Object)0x100000, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)1000, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class).defaultsTo((Object)".*", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.").withRequiredArg().describedAs("timestamp/-1(latest)/-2(earliest)").ofType(Long.class).defaultsTo((Object)-1L, (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.").withRequiredArg().describedAs("ms").ofType(Long.class).defaultsTo((Object)30000L, (Object[])new Long[0]);
        AbstractOptionSpec helpOpt = parser.accepts("help", "Print usage information.").forHelp();
        AbstractOptionSpec versionOpt = parser.accepts("version", "Print version information and exit.").forHelp();
        OptionSet options = parser.parse(args);
        if (args.length == 0 || options.has((OptionSpec)helpOpt)) {
            throw CommandLineUtils$.MODULE$.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.");
        }
        if (options.has((OptionSpec)versionOpt)) {
            throw CommandLineUtils$.MODULE$.printVersionAndDie();
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (scala.collection.immutable.Seq<OptionSpec<?>>)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{brokerListOpt}));
        String regex = (String)options.valueOf((OptionSpec)topicWhiteListOpt);
        IncludeList topicWhiteListFiler = new IncludeList(regex);
        try {
            Pattern.compile(regex);
        }
        catch (PatternSyntaxException patternSyntaxException) {
            throw new RuntimeException(new StringBuilder(21).append(regex).append(" is an invalid regex.").toString());
        }
        int fetchSize = (Integer)options.valueOf((OptionSpec)fetchSizeOpt);
        int maxWaitMs = (Integer)options.valueOf((OptionSpec)maxWaitMsOpt);
        long initialOffsetTime = (Long)options.valueOf((OptionSpec)initialOffsetTimeOpt);
        long reportInterval = (Long)options.valueOf((OptionSpec)reportIntervalOpt);
        if (this.logger().underlying().isInfoEnabled()) {
            String string;
            String string2 = string = "Getting topic metadata...";
            string = null;
            String msgWithLogIdent_msg = string2;
            Object var34_19 = null;
            this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
        }
        String brokerList = (String)options.valueOf((OptionSpec)brokerListOpt);
        ToolsUtils$.MODULE$.validatePortOrDie(parser, brokerList);
        Admin adminClient = this.createAdminClient(brokerList);
        try {
            map = this.brokerDetails(adminClient);
            seq = this.listTopicsMetadata(adminClient);
        }
        finally {
            CoreUtils$.MODULE$.swallow((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> adminClient.close(), this, Level.WARN);
        }
        Seq filteredTopicMetadata = (Seq)seq.filter((Function1 & Serializable)topicMetaData -> BoxesRunTime.boxToBoolean((boolean)topicWhiteListFiler.isTopicAllowed(topicMetaData.name(), false)));
        if (filteredTopicMetadata.isEmpty()) {
            if (this.logger().underlying().isErrorEnabled()) {
                String msgWithLogIdent_msg = ReplicaVerificationTool$.$anonfun$main$4(topicWhiteListOpt);
                Object var35_26 = null;
                this.logger().underlying().error(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            }
            throw Exit$.MODULE$.exit(1, (Option<String>)None$.MODULE$);
        }
        Seq topicPartitionReplicas = (Seq)filteredTopicMetadata.flatMap((Function1 & Serializable)topicMetadata -> (Buffer)AsScalaExtensions.ListHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (List)topicMetadata.partitions()).asScala().flatMap((Function1 & Serializable)partitionMetadata -> (Buffer)AsScalaExtensions.ListHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (List)partitionMetadata.replicas()).asScala().map((Function1 & Serializable)node -> new TopicPartitionReplica(topicMetadata.name(), partitionMetadata.partition(), node.id()))));
        if (this.logger().underlying().isDebugEnabled()) {
            String msgWithLogIdent_msg = ReplicaVerificationTool$.$anonfun$main$8(topicPartitionReplicas);
            Object var36_28 = null;
            this.logger().underlying().debug(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
        }
        scala.collection.immutable.Map brokerToTopicPartitions = (scala.collection.immutable.Map)topicPartitionReplicas.groupBy((Function1 & Serializable)x$2 -> BoxesRunTime.boxToInteger((int)x$2.replicaId())).map((Function1 & Serializable)x0$1 -> {
            void $minus$greater$extension_y;
            Tuple2 tuple2;
            if (x0$1 == null) {
                throw new MatchError(null);
            }
            int brokerId = x0$1._1$mcI$sp();
            Seq partitions = (Seq)x0$1._2();
            Object object = partitions.map((Function1 & Serializable)partition -> new TopicPartition(partition.topic(), partition.partitionId()));
            Integer $minus$greater$extension_$this = brokerId;
            Tuple2 tuple22 = tuple2 = new Tuple2((Object)$minus$greater$extension_$this, (Object)$minus$greater$extension_y);
            Object var4_4 = null;
            object = null;
            tuple2 = null;
            Tuple2 tuple23 = tuple22;
            return tuple23;
        });
        if (this.logger().underlying().isDebugEnabled()) {
            String msgWithLogIdent_msg = ReplicaVerificationTool$.$anonfun$main$12(brokerToTopicPartitions);
            Object var37_30 = null;
            this.logger().underlying().debug(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
        }
        scala.collection.immutable.Map expectedReplicasPerTopicPartition = (scala.collection.immutable.Map)topicPartitionReplicas.groupBy((Function1 & Serializable)replica -> new TopicPartition(replica.topic(), replica.partitionId())).map((Function1 & Serializable)x0$2 -> {
            Tuple2 tuple2;
            Integer n;
            Seq replicaSet;
            TopicPartition topicAndPartition;
            if (x0$2 != null) {
                topicAndPartition = (TopicPartition)x0$2._1();
                replicaSet = (Seq)x0$2._2();
                if (replicaSet == null) {
                    throw null;
                }
            } else {
                throw new MatchError(null);
            }
            Integer n2 = n = Integer.valueOf(replicaSet.length());
            n = null;
            Integer $minus$greater$extension_y = n2;
            Tuple2 tuple22 = tuple2 = new Tuple2((Object)topicAndPartition, (Object)$minus$greater$extension_y);
            Object var4_4 = null;
            tuple2 = null;
            Tuple2 tuple23 = tuple22;
            return tuple23;
        });
        if (this.logger().underlying().isDebugEnabled()) {
            String msgWithLogIdent_msg = ReplicaVerificationTool$.$anonfun$main$15(expectedReplicasPerTopicPartition);
            Object var38_32 = null;
            this.logger().underlying().debug(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
        }
        Seq topicPartitions = (Seq)filteredTopicMetadata.flatMap((Function1 & Serializable)topicMetaData -> (Buffer)AsScalaExtensions.ListHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (List)topicMetaData.partitions()).asScala().map((Function1 & Serializable)partitionMetadata -> new TopicPartition(topicMetaData.name(), partitionMetadata.partition())));
        Properties consumerProps = this.consumerConfig(brokerList);
        ReplicaBuffer replicaBuffer = new ReplicaBuffer((Map<TopicPartition, Object>)expectedReplicasPerTopicPartition, this.initialOffsets((Seq<TopicPartition>)topicPartitions, consumerProps, initialOffsetTime), brokerToTopicPartitions.size(), reportInterval);
        int verificationBrokerId = ((Tuple2)brokerToTopicPartitions.head())._1$mcI$sp();
        AtomicInteger counter = new AtomicInteger(0);
        scala.collection.immutable.Iterable fetcherThreads = (scala.collection.immutable.Iterable)brokerToTopicPartitions.map((Function1 & Serializable)x0$3 -> {
            if (x0$3 == null) {
                throw new MatchError(null);
            }
            int brokerId = x0$3._1$mcI$sp();
            Seq topicPartitions = (Seq)x0$3._2();
            ReplicaFetcher replicaFetcher = new ReplicaFetcher(new StringBuilder(15).append("ReplicaFetcher-").append(brokerId).toString(), (Node)map.apply((Object)brokerId), (Iterable<TopicPartition>)topicPartitions, replicaBuffer, 30000, 256000, fetchSize, maxWaitMs, 1, brokerId == verificationBrokerId, consumerProps, counter.incrementAndGet());
            return replicaFetcher;
        });
        Exit.addShutdownHook((String)"ReplicaVerificationToolShutdownHook", () -> Exit$.$anonfun$addShutdownHook$1((Function0)(JFunction0.mcV.sp & Serializable)() -> {
            ReplicaVerificationTool$ info_this = MODULE$;
            if (info_this.logger().underlying().isInfoEnabled()) {
                String string;
                String string2 = string = "Stopping all fetchers";
                string = null;
                String msgWithLogIdent_msg = string2;
                Object var2_3 = null;
                info_this.logger().underlying().info(Logging.msgWithLogIdent$(info_this, msgWithLogIdent_msg));
            }
            Object var1_1 = null;
            fetcherThreads.foreach((Function1 & Serializable)x$3 -> {
                x$3.shutdown();
                return BoxedUnit.UNIT;
            });
        }));
        fetcherThreads.foreach((Function1 & Serializable)x$4 -> {
            x$4.start();
            return BoxedUnit.UNIT;
        });
        String println_x = new StringBuilder(34).append(this.getCurrentTimeString()).append(": verification process is started.").toString();
        Console$.MODULE$.println((Object)println_x);
    }

    private Seq<TopicDescription> listTopicsMetadata(Admin adminClient) {
        Set topics = (Set)adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
        Iterable iterable = AsScalaExtensions.CollectionHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, ((java.util.Map)adminClient.describeTopics((Collection)topics).all().get()).values()).asScala();
        if (iterable == null) {
            throw null;
        }
        Iterable toBuffer_this = iterable;
        return (Buffer)Buffer$.MODULE$.from((IterableOnce)toBuffer_this);
    }

    private scala.collection.immutable.Map<Object, Node> brokerDetails(Admin adminClient) {
        return ((IterableOnceOps)AsScalaExtensions.CollectionHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (Collection)((Collection)adminClient.describeCluster().nodes().get())).asScala().map((Function1 & Serializable)n -> new Tuple2((Object)n.id(), n))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
    }

    private Admin createAdminClient(String brokerUrl) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerUrl);
        return Admin.create((Properties)props);
    }

    private Map<TopicPartition, Object> initialOffsets(Seq<TopicPartition> topicPartitions, Properties consumerConfig, long initialOffsetTime) {
        Map map;
        KafkaConsumer kafkaConsumer;
        KafkaConsumer kafkaConsumer2 = kafkaConsumer = new KafkaConsumer(consumerConfig);
        kafkaConsumer = null;
        try (KafkaConsumer consumer = kafkaConsumer2;){
            if (-1L == initialOffsetTime) {
                map = (Map)AsScalaExtensions.MapHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (java.util.Map)consumer.endOffsets((Collection)AsJavaExtensions.SeqHasAsJava$((AsJavaExtensions)CollectionConverters$.MODULE$, topicPartitions).asJava())).asScala().map((Function1 & Serializable)x0$1 -> {
                    Tuple2 tuple2;
                    Long l;
                    if (x0$1 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition k = (TopicPartition)x0$1._1();
                    Long v = (Long)x0$1._2();
                    Long l2 = l = Long.valueOf(v);
                    l = null;
                    Long $minus$greater$extension_y = l2;
                    Tuple2 tuple22 = tuple2 = new Tuple2((Object)k, (Object)$minus$greater$extension_y);
                    Object var4_4 = null;
                    tuple2 = null;
                    Tuple2 tuple23 = tuple22;
                    return tuple23;
                });
            } else if (-2L == initialOffsetTime) {
                map = (Map)AsScalaExtensions.MapHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (java.util.Map)consumer.beginningOffsets((Collection)AsJavaExtensions.SeqHasAsJava$((AsJavaExtensions)CollectionConverters$.MODULE$, topicPartitions).asJava())).asScala().map((Function1 & Serializable)x0$2 -> {
                    Tuple2 tuple2;
                    Long l;
                    if (x0$2 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition k = (TopicPartition)x0$2._1();
                    Long v = (Long)x0$2._2();
                    Long l2 = l = Long.valueOf(v);
                    l = null;
                    Long $minus$greater$extension_y = l2;
                    Tuple2 tuple22 = tuple2 = new Tuple2((Object)k, (Object)$minus$greater$extension_y);
                    Object var4_4 = null;
                    tuple2 = null;
                    Tuple2 tuple23 = tuple22;
                    return tuple23;
                });
            } else {
                scala.collection.immutable.Map timestampsToSearch = ((IterableOnceOps)topicPartitions.map((Function1 & Serializable)tp -> {
                    Long l;
                    Long l2 = l = Long.valueOf(initialOffsetTime);
                    l = null;
                    Long $minus$greater$extension_y = l2;
                    return new Tuple2(tp, (Object)$minus$greater$extension_y);
                })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
                map = (Map)AsScalaExtensions.MapHasAsScala$((AsScalaExtensions)CollectionConverters$.MODULE$, (java.util.Map)consumer.offsetsForTimes(AsJavaExtensions.MapHasAsJava$((AsJavaExtensions)CollectionConverters$.MODULE$, (Map)timestampsToSearch).asJava())).asScala().map((Function1 & Serializable)x0$3 -> {
                    Tuple2 tuple2;
                    Long l;
                    if (x0$3 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition k = (TopicPartition)x0$3._1();
                    OffsetAndTimestamp v = (OffsetAndTimestamp)x0$3._2();
                    Long l2 = l = Long.valueOf(v.offset());
                    l = null;
                    Long $minus$greater$extension_y = l2;
                    Tuple2 tuple22 = tuple2 = new Tuple2((Object)k, (Object)$minus$greater$extension_y);
                    Object var4_4 = null;
                    tuple2 = null;
                    Tuple2 tuple23 = tuple22;
                    return tuple23;
                });
            }
        }
        return map;
    }

    /*
     * WARNING - void declaration
     */
    private Properties consumerConfig(String brokerUrl) {
        void var2_2;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokerUrl);
        properties.put("group.id", "ReplicaVerification");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        return var2_2;
    }

    private KafkaConsumer<String, String> createConsumer(Properties consumerConfig) {
        return new KafkaConsumer(consumerConfig);
    }

    public static final /* synthetic */ String $anonfun$main$1() {
        return "Getting topic metadata...";
    }

    public static final /* synthetic */ String $anonfun$main$4(ArgumentAcceptingOptionSpec topicWhiteListOpt$1) {
        return new StringBuilder(88).append("No topics found. ").append(topicWhiteListOpt$1).append(" if specified, is either filtering out all topics or there is no topic.").toString();
    }

    public static final /* synthetic */ String $anonfun$main$8(Seq topicPartitionReplicas$1) {
        return new StringBuilder(27).append("Selected topic partitions: ").append(topicPartitionReplicas$1).toString();
    }

    public static final /* synthetic */ String $anonfun$main$12(scala.collection.immutable.Map brokerToTopicPartitions$1) {
        return new StringBuilder(29).append("Topic partitions per broker: ").append(brokerToTopicPartitions$1).toString();
    }

    public static final /* synthetic */ String $anonfun$main$15(scala.collection.immutable.Map expectedReplicasPerTopicPartition$1) {
        return new StringBuilder(39).append("Expected replicas per topic partition: ").append(expectedReplicasPerTopicPartition$1).toString();
    }

    public static final /* synthetic */ String $anonfun$main$20() {
        return "Stopping all fetchers";
    }

    private ReplicaVerificationTool$() {
    }
}

