/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.tools;

import io.confluent.kafka.admin.AdminClient;
import io.confluent.kafka.utils.CommandLineUtils;
import io.confluent.org.apache.kafka.clients.admin.DeleteTopicsResult;
import io.confluent.org.apache.kafka.clients.admin.KafkaAdminClient;
import io.confluent.org.apache.kafka.clients.consumer.Consumer;
import io.confluent.org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import io.confluent.org.apache.kafka.common.KafkaFuture;
import io.confluent.org.apache.kafka.common.TopicPartition;
import io.confluent.org.apache.kafka.common.annotation.InterfaceStability;
import io.confluent.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import io.confluent.org.apache.kafka.common.serialization.Deserializer;
import io.confluent.org.apache.kafka.common.utils.Exit;
import io.confluent.org.apache.kafka.common.utils.Utils;
import java.io.IOException;
import java.io.OutputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import scala.collection.immutable.HashSet;
import scala.collection.immutable.List;

@InterfaceStability.Unstable
public class StreamsResetter {
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_ERROR = 1;
    private static OptionSpec<String> bootstrapServerOption;
    private static OptionSpec<String> applicationIdOption;
    private static OptionSpec<String> inputTopicsOption;
    private static OptionSpec<String> intermediateTopicsOption;
    private static OptionSpec<Long> toOffsetOption;
    private static OptionSpec<String> toDatetimeOption;
    private static OptionSpec<String> byDurationOption;
    private static OptionSpecBuilder toEarliestOption;
    private static OptionSpecBuilder toLatestOption;
    private static OptionSpec<String> fromFileOption;
    private static OptionSpec<Long> shiftByOption;
    private static OptionSpecBuilder dryRunOption;
    private static OptionSpecBuilder executeOption;
    private static OptionSpec<String> commandConfigOption;
    private OptionSet options = null;
    private final java.util.List<String> allTopics = new LinkedList<String>();

    public int run(String[] args) {
        return this.run(args, new Properties());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args, Properties config) {
        int exitCode = 0;
        KafkaAdminClient kafkaAdminClient = null;
        try {
            this.parseArguments(args);
            boolean dryRun = this.options.has((OptionSpec)dryRunOption);
            String groupId = (String)this.options.valueOf(applicationIdOption);
            Properties properties = new Properties();
            if (this.options.has(commandConfigOption)) {
                properties.putAll((Map<?, ?>)Utils.loadProps((String)this.options.valueOf(commandConfigOption)));
            }
            properties.put("bootstrap.servers", this.options.valueOf(bootstrapServerOption));
            this.validateNoActiveConsumers(groupId, properties);
            kafkaAdminClient = (KafkaAdminClient)io.confluent.org.apache.kafka.clients.admin.AdminClient.create(properties);
            this.allTopics.clear();
            this.allTopics.addAll((Collection<String>)kafkaAdminClient.listTopics().names().get(60L, TimeUnit.SECONDS));
            if (dryRun) {
                System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
            }
            HashMap<Object, Object> consumerConfig = new HashMap<Object, Object>(config);
            consumerConfig.putAll(properties);
            exitCode = this.maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
            this.maybeDeleteInternalTopics(kafkaAdminClient, dryRun);
        }
        catch (Throwable e) {
            exitCode = 1;
            System.err.println("ERROR: " + e);
            e.printStackTrace(System.err);
        }
        finally {
            if (kafkaAdminClient != null) {
                kafkaAdminClient.close(60L, TimeUnit.SECONDS);
            }
        }
        return exitCode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validateNoActiveConsumers(String groupId, Properties properties) {
        try (AdminClient olderAdminClient = null;){
            olderAdminClient = AdminClient.create(properties);
            if (!((List)olderAdminClient.describeConsumerGroup(groupId, 0L).consumers().get()).isEmpty()) {
                throw new IllegalStateException("Consumer group '" + groupId + "' is still active. Make sure to stop all running application instances before running the reset tool.");
            }
        }
    }

    private void parseArguments(String[] args) throws IOException {
        OptionParser optionParser = new OptionParser(false);
        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id).").withRequiredArg().ofType(String.class).describedAs("id").required();
        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2").withRequiredArg().ofType(String.class).defaultsTo((Object)"localhost:9092", (Object[])new String[0]).describedAs("urls");
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool will skip to the end.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.").withRequiredArg().ofType(Long.class);
        toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'").withRequiredArg().ofType(String.class);
        byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'").withRequiredArg().ofType(String.class);
        toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset.");
        toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset.");
        fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.").withRequiredArg().ofType(String.class);
        shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative").withRequiredArg().describedAs("number-of-offsets").ofType(Long.class);
        commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.").withRequiredArg().ofType(String.class).describedAs("file name");
        executeOption = optionParser.accepts("execute", "Execute the command.");
        dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
        optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
        try {
            this.options = optionParser.parse(args);
        }
        catch (OptionException e) {
            this.printHelp(optionParser);
            throw e;
        }
        if (this.options.has((OptionSpec)executeOption) && this.options.has((OptionSpec)dryRunOption)) {
            CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
        }
        HashSet allScenarioOptions = new HashSet();
        allScenarioOptions.$plus(toOffsetOption);
        allScenarioOptions.$plus(toDatetimeOption);
        allScenarioOptions.$plus(byDurationOption);
        allScenarioOptions.$plus((Object)toEarliestOption);
        allScenarioOptions.$plus((Object)toLatestOption);
        allScenarioOptions.$plus(fromFileOption);
        allScenarioOptions.$plus(shiftByOption);
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, toOffsetOption, allScenarioOptions.$minus(toOffsetOption));
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, toDatetimeOption, allScenarioOptions.$minus(toDatetimeOption));
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, byDurationOption, allScenarioOptions.$minus(byDurationOption));
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, toEarliestOption, allScenarioOptions.$minus((Object)toEarliestOption));
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, toLatestOption, allScenarioOptions.$minus((Object)toLatestOption));
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, fromFileOption, allScenarioOptions.$minus(fromFileOption));
        CommandLineUtils.checkInvalidArgs(optionParser, this.options, shiftByOption, allScenarioOptions.$minus(shiftByOption));
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(Map consumerConfig, boolean dryRun) throws Exception {
        java.util.List inputTopics = this.options.valuesOf(inputTopicsOption);
        java.util.List intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
        int topicNotFound = 0;
        ArrayList<String> notFoundInputTopics = new ArrayList<String>();
        ArrayList<String> notFoundIntermediateTopics = new ArrayList<String>();
        String groupId = (String)this.options.valueOf(applicationIdOption);
        if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
            System.out.println("No input or intermediate topics specified. Skipping seek.");
            return 0;
        }
        if (inputTopics.size() != 0) {
            System.out.println("Reset-offsets for input topics " + inputTopics);
        }
        if (intermediateTopics.size() != 0) {
            System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
        }
        java.util.HashSet<String> topicsToSubscribe = new java.util.HashSet<String>(inputTopics.size() + intermediateTopics.size());
        for (String topic : inputTopics) {
            if (!this.allTopics.contains(topic)) {
                notFoundInputTopics.add(topic);
                continue;
            }
            topicsToSubscribe.add(topic);
        }
        for (String topic : intermediateTopics) {
            if (!this.allTopics.contains(topic)) {
                notFoundIntermediateTopics.add(topic);
                continue;
            }
            topicsToSubscribe.add(topic);
        }
        if (!notFoundInputTopics.isEmpty()) {
            System.out.println("Following input topics are not found, skipping them");
            for (String topic : notFoundInputTopics) {
                System.out.println("Topic: " + topic);
            }
            topicNotFound = 1;
        }
        if (!notFoundIntermediateTopics.isEmpty()) {
            System.out.println("Following intermediate topics are not found, skipping them");
            for (String topic : notFoundIntermediateTopics) {
                System.out.println("Topic:" + topic);
            }
            topicNotFound = 1;
        }
        if (topicsToSubscribe.isEmpty()) {
            return topicNotFound;
        }
        Properties config = new Properties();
        config.putAll((Map<?, ?>)consumerConfig);
        config.setProperty("group.id", groupId);
        config.setProperty("enable.auto.commit", "false");
        try (KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<byte[], byte[]>(config, (Deserializer<byte[]>)new ByteArrayDeserializer(), (Deserializer<byte[]>)new ByteArrayDeserializer());){
            client.subscribe(topicsToSubscribe);
            client.poll(1L);
            Set<TopicPartition> partitions = client.assignment();
            java.util.HashSet<TopicPartition> inputTopicPartitions = new java.util.HashSet<TopicPartition>();
            java.util.HashSet<TopicPartition> intermediateTopicPartitions = new java.util.HashSet<TopicPartition>();
            for (TopicPartition p : partitions) {
                String topic = p.topic();
                if (this.isInputTopic(topic)) {
                    inputTopicPartitions.add(p);
                    continue;
                }
                if (this.isIntermediateTopic(topic)) {
                    intermediateTopicPartitions.add(p);
                    continue;
                }
                System.err.println("Skipping invalid partition: " + p);
            }
            this.maybeReset(groupId, client, inputTopicPartitions);
            this.maybeSeekToEnd(groupId, client, intermediateTopicPartitions);
            if (!dryRun) {
                for (TopicPartition p : partitions) {
                    client.position(p);
                }
                client.commitSync();
            }
        }
        catch (Exception e) {
            System.err.println("ERROR: Resetting offsets failed.");
            throw e;
        }
        System.out.println("Done.");
        return topicNotFound;
    }

    public void maybeSeekToEnd(String groupId, Consumer<byte[], byte[]> client, Set<TopicPartition> intermediateTopicPartitions) {
        if (intermediateTopicPartitions.size() > 0) {
            System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
            for (TopicPartition topicPartition : intermediateTopicPartitions) {
                if (!this.allTopics.contains(topicPartition.topic())) continue;
                System.out.println("Topic: " + topicPartition.topic());
            }
            client.seekToEnd(intermediateTopicPartitions);
        }
    }

    private void maybeReset(String groupId, Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions) throws Exception {
        if (inputTopicPartitions.size() > 0) {
            System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")");
            if (this.options.has(toOffsetOption)) {
                this.resetOffsetsTo(client, inputTopicPartitions, (Long)this.options.valueOf(toOffsetOption));
            } else if (this.options.has((OptionSpec)toEarliestOption)) {
                client.seekToBeginning(inputTopicPartitions);
            } else if (this.options.has((OptionSpec)toLatestOption)) {
                client.seekToEnd(inputTopicPartitions);
            } else if (this.options.has(shiftByOption)) {
                this.shiftOffsetsBy(client, inputTopicPartitions, (Long)this.options.valueOf(shiftByOption));
            } else if (this.options.has(toDatetimeOption)) {
                String ts = (String)this.options.valueOf(toDatetimeOption);
                Long timestamp = this.getDateTime(ts);
                this.resetToDatetime(client, inputTopicPartitions, timestamp);
            } else if (this.options.has(byDurationOption)) {
                String duration = (String)this.options.valueOf(byDurationOption);
                Duration durationParsed = DatatypeFactory.newInstance().newDuration(duration);
                this.resetByDuration(client, inputTopicPartitions, durationParsed);
            } else if (this.options.has(fromFileOption)) {
                String resetPlanPath = (String)this.options.valueOf(fromFileOption);
                Map<TopicPartition, Long> topicPartitionsAndOffset = this.getTopicPartitionOffsetFromResetPlan(resetPlanPath);
                this.resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
            } else {
                client.seekToBeginning(inputTopicPartitions);
            }
            for (TopicPartition p : inputTopicPartitions) {
                Long position = client.position(p);
                System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + position);
            }
        }
    }

    public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
        Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
        Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            Long offset = validatedTopicPartitionsAndOffset.get(topicPartition);
            client.seek(topicPartition, offset);
        }
    }

    private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
        String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
        return this.parseResetPlan(resetPlanCsv);
    }

    private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
        Date now = new Date();
        duration.negate().addTo(now);
        Long timestamp = now.getTime();
        HashMap<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndTimes.put(topicPartition, timestamp);
        }
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
            client.seek(topicPartition, offset);
        }
    }

    private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
        HashMap<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndTimes.put(topicPartition, timestamp);
        }
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
            client.seek(topicPartition, offset);
        }
    }

    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long shiftBy) {
        Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
        Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        HashMap<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            Long position = client.position(topicPartition);
            Long offset = position + shiftBy;
            topicPartitionsAndOffset.put(topicPartition, offset);
        }
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
        }
    }

    public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
        Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
        Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        HashMap<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndOffset.put(topicPartition, offset);
        }
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
        }
    }

    public Long getDateTime(String timestamp) throws ParseException {
        String[] timestampParts = timestamp.split("T");
        if (timestampParts.length < 2) {
            throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
        }
        String secondPart = timestampParts[1];
        if (secondPart == null || secondPart.isEmpty()) {
            throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length());
        }
        if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) {
            timestamp = timestamp + "Z";
        }
        try {
            Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
            return date.getTime();
        }
        catch (ParseException e) {
            Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
            return date.getTime();
        }
    }

    private Map<TopicPartition, Long> parseResetPlan(String resetPlanCsv) throws ParseException {
        String[] resetPlanCsvParts;
        HashMap<TopicPartition, Long> topicPartitionAndOffset = new HashMap<TopicPartition, Long>();
        if (resetPlanCsv == null || resetPlanCsv.isEmpty()) {
            throw new ParseException("Error parsing reset plan CSV file. It is empty,", 0);
        }
        for (String line : resetPlanCsvParts = resetPlanCsv.split("\n")) {
            String[] lineParts = line.split(",");
            if (lineParts.length != 3) {
                throw new ParseException("Reset plan CSV file is not following the format `TOPIC,PARTITION,OFFSET`.", 0);
            }
            String topic = lineParts[0];
            int partition = Integer.parseInt(lineParts[1]);
            long offset = Long.parseLong(lineParts[2]);
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            topicPartitionAndOffset.put(topicPartition, offset);
        }
        return topicPartitionAndOffset;
    }

    private Map<TopicPartition, Long> checkOffsetRange(Map<TopicPartition, Long> inputTopicPartitionsAndOffset, Map<TopicPartition, Long> beginningOffsets, Map<TopicPartition, Long> endOffsets) {
        HashMap<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
            Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
            Long offset = topicPartitionAndOffset.getValue();
            if (offset < endOffset) {
                Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
                if (offset > beginningOffset) {
                    validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
                    continue;
                }
                System.out.println("New offset (" + offset + ") is lower than earliest offset. Value will be set to " + beginningOffset);
                validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), beginningOffset);
                continue;
            }
            System.out.println("New offset (" + offset + ") is higher than latest offset. Value will be set to " + endOffset);
            validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), endOffset);
        }
        return validatedTopicPartitionsOffsets;
    }

    private boolean isInputTopic(String topic) {
        return this.options.valuesOf(inputTopicsOption).contains(topic);
    }

    private boolean isIntermediateTopic(String topic) {
        return this.options.valuesOf(intermediateTopicsOption).contains(topic);
    }

    private void maybeDeleteInternalTopics(KafkaAdminClient adminClient, boolean dryRun) {
        System.out.println("Deleting all internal/auto-created topics for application " + (String)this.options.valueOf(applicationIdOption));
        ArrayList<String> topicsToDelete = new ArrayList<String>();
        for (String listing : this.allTopics) {
            if (!this.isInternalTopic(listing)) continue;
            if (!dryRun) {
                topicsToDelete.add(listing);
                continue;
            }
            System.out.println("Topic: " + listing);
        }
        if (!dryRun) {
            this.doDelete(topicsToDelete, adminClient);
        }
        System.out.println("Done.");
    }

    public void doDelete(java.util.List<String> topicsToDelete, io.confluent.org.apache.kafka.clients.admin.AdminClient adminClient) {
        boolean hasDeleteErrors = false;
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
        Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
        for (Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
            try {
                entry.getValue().get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                System.err.println("ERROR: deleting topic " + entry.getKey());
                e.printStackTrace(System.err);
                hasDeleteErrors = true;
            }
        }
        if (hasDeleteErrors) {
            throw new RuntimeException("Encountered an error deleting one or more topics");
        }
    }

    private boolean isInternalTopic(String topicName) {
        return topicName.startsWith((String)this.options.valueOf(applicationIdOption) + "-") && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
    }

    private void printHelp(OptionParser parser) throws IOException {
        System.err.println("The Streams Reset Tool allows you to quickly reset an application in order to reprocess its data from scratch.\n* This tool resets offsets of input topics to the earliest available offset and it skips to the end of intermediate topics (topics used in the through() method).\n* This tool deletes the internal topics that were created by Kafka Streams (topics starting with \"<application.id>-\").\nYou do not need to specify internal topics because the tool finds them automatically.\n* This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).\n* This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results).\nYou need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n\n*** Important! You will get wrong output if you don't clean up the local stores after running the reset tool!\n\n");
        parser.printHelpOn((OutputStream)System.err);
    }

    public static void main(String[] args) {
        Exit.exit(new StreamsResetter().run(args));
    }
}

