/*
 * Decompiled with CFR 0.152.
 */
package kafka.testkit;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.raft.KafkaRaftManager;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.FaultHandlerFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.SharedServer;
import kafka.testkit.BrokerNode;
import kafka.testkit.ControllerNode;
import kafka.testkit.TestKitNode;
import kafka.testkit.TestKitNodes;
import kafka.tools.StorageTool;
import kafka.utils.Logging;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class KafkaClusterTestKit
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
    private final ExecutorService executorService;
    private final TestKitNodes nodes;
    private final Map<Integer, ControllerServer> controllers;
    private final Map<Integer, BrokerServer> brokers;
    private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
    private final File baseDirectory;
    private final SimpleFaultHandlerFactory faultHandlerFactory;

    private KafkaClusterTestKit(ExecutorService executorService, TestKitNodes nodes, Map<Integer, ControllerServer> controllers, Map<Integer, BrokerServer> brokers, ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager, File baseDirectory, SimpleFaultHandlerFactory faultHandlerFactory) {
        this.executorService = executorService;
        this.nodes = nodes;
        this.controllers = controllers;
        this.brokers = brokers;
        this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
        this.baseDirectory = baseDirectory;
        this.faultHandlerFactory = faultHandlerFactory;
    }

    public void format() throws Exception {
        ArrayList futures = new ArrayList();
        try {
            int nodeId;
            for (Map.Entry<Integer, ControllerServer> entry : this.controllers.entrySet()) {
                nodeId = entry.getKey();
                ControllerServer controller = entry.getValue();
                this.formatNodeAndLog(this.nodes.controllerProperties(nodeId), controller.config().metadataLogDir(), (Logging)controller, futures::add);
            }
            for (Map.Entry<Integer, ControllerServer> entry : this.brokers.entrySet()) {
                nodeId = entry.getKey();
                if (this.controllers.containsKey(nodeId)) continue;
                BrokerServer broker = (BrokerServer)entry.getValue();
                this.formatNodeAndLog(this.nodes.brokerProperties(nodeId), broker.config().metadataLogDir(), (Logging)broker, futures::add);
            }
            for (Future future : futures) {
                future.get();
            }
        }
        catch (Exception e) {
            for (Future future : futures) {
                future.cancel(true);
            }
            throw e;
        }
    }

    private void formatNodeAndLog(MetaProperties properties, String metadataLogDir, Logging loggingMixin, Consumer<Future<?>> futureConsumer) {
        futureConsumer.accept(this.executorService.submit(() -> {
            try (ByteArrayOutputStream stream = new ByteArrayOutputStream();){
                try {
                    block27: {
                        PrintStream out;
                        block28: {
                            out = new PrintStream(stream);
                            Throwable throwable = null;
                            try {
                                StorageTool.formatCommand((PrintStream)out, (Seq)JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(), (MetaProperties)properties, (MetadataVersion)MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, (boolean)false);
                                if (out == null) break block27;
                                if (throwable == null) break block28;
                            }
                            catch (Throwable throwable3) {
                                try {
                                    throwable = throwable3;
                                    throw throwable3;
                                }
                                catch (Throwable throwable4) {
                                    if (out == null) throw throwable4;
                                    if (throwable == null) {
                                        out.close();
                                        throw throwable4;
                                    }
                                    try {
                                        out.close();
                                        throw throwable4;
                                    }
                                    catch (Throwable throwable5) {
                                        throwable.addSuppressed(throwable5);
                                        throw throwable4;
                                    }
                                }
                            }
                            try {
                                out.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        out.close();
                    }
                }
                finally {
                    String[] stringArray = stream.toString().split(String.format("%n", new Object[0]));
                    int n = stringArray.length;
                    int n2 = 0;
                    while (n2 < n) {
                        String line = stringArray[n2];
                        loggingMixin.info(() -> line);
                        ++n2;
                    }
                    return;
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
    }

    public void startup() throws ExecutionException, InterruptedException {
        ArrayList futures = new ArrayList();
        try {
            for (ControllerServer controllerServer : this.controllers.values()) {
                futures.add(this.executorService.submit(() -> ((ControllerServer)controllerServer).startup()));
            }
            for (BrokerServer brokerServer : this.brokers.values()) {
                futures.add(this.executorService.submit(() -> ((BrokerServer)brokerServer).startup()));
            }
            for (Future future : futures) {
                future.get();
            }
        }
        catch (Exception e) {
            for (Future future : futures) {
                future.cancel(true);
            }
            throw e;
        }
    }

    public void waitForReadyBrokers() throws ExecutionException, InterruptedException {
        ControllerServer controllerServer = this.controllers.values().iterator().next();
        Controller controller = controllerServer.controller();
        controller.waitForReadyBrokers(this.brokers.size()).get();
        TestUtils.waitForCondition(() -> this.brokers().values().stream().allMatch(brokerServer -> brokerServer.metadataCache().getAliveBrokers().size() == this.brokers.size()), (String)"Failed to wait for publisher to publish the metadata update to each broker.");
    }

    public Properties controllerClientProperties() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        if (!this.controllers.isEmpty()) {
            List controllerNodes = RaftConfig.voterConnectionsToNodes((Map)((Map)this.controllerQuorumVotersFutureManager.future.get()));
            StringBuilder bld = new StringBuilder();
            String prefix = "";
            for (Node node : controllerNodes) {
                bld.append(prefix).append(node.id()).append('@');
                bld.append(node.host()).append(":").append(node.port());
                prefix = ",";
            }
            properties.setProperty("controller.quorum.voters", bld.toString());
            properties.setProperty("bootstrap.servers", controllerNodes.stream().map(n -> n.host() + ":" + n.port()).collect(Collectors.joining(",")));
        }
        return properties;
    }

    public Properties clientProperties() {
        return this.clientProperties(new Properties());
    }

    public Properties clientProperties(Properties configOverrides) {
        if (!this.brokers.isEmpty()) {
            StringBuilder bld = new StringBuilder();
            String prefix = "";
            for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                ListenerName listenerName;
                int brokerId = entry.getKey();
                BrokerServer broker = entry.getValue();
                int port = broker.boundPort(listenerName = this.nodes.externalListenerName());
                if (port <= 0) {
                    throw new RuntimeException("Broker " + brokerId + " does not yet have a bound port for " + listenerName + ".  Did you start the cluster yet?");
                }
                bld.append(prefix).append("localhost:").append(port);
                prefix = ",";
            }
            configOverrides.putIfAbsent("bootstrap.servers", bld.toString());
        }
        return configOverrides;
    }

    public Map<Integer, ControllerServer> controllers() {
        return this.controllers;
    }

    public Map<Integer, BrokerServer> brokers() {
        return this.brokers;
    }

    public Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers() {
        HashMap<Integer, KafkaRaftManager<ApiMessageAndVersion>> results = new HashMap<Integer, KafkaRaftManager<ApiMessageAndVersion>>();
        for (BrokerServer brokerServer : this.brokers().values()) {
            results.put(brokerServer.config().brokerId(), (KafkaRaftManager<ApiMessageAndVersion>)brokerServer.sharedServer().raftManager());
        }
        for (ControllerServer controllerServer : this.controllers().values()) {
            if (results.containsKey(controllerServer.config().nodeId())) continue;
            results.put(controllerServer.config().nodeId(), (KafkaRaftManager<ApiMessageAndVersion>)controllerServer.sharedServer().raftManager());
        }
        return results;
    }

    public TestKitNodes nodes() {
        return this.nodes;
    }

    public MockFaultHandler fatalFaultHandler() {
        return this.faultHandlerFactory.fatalFaultHandler();
    }

    public MockFaultHandler nonFatalFaultHandler() {
        return this.faultHandlerFactory.nonFatalFaultHandler();
    }

    @Override
    public void close() throws Exception {
        ArrayList futureEntries = new ArrayList();
        try {
            this.controllerQuorumVotersFutureManager.close();
            for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                int n = entry.getKey();
                BrokerServer broker = entry.getValue();
                futureEntries.add(new AbstractMap.SimpleImmutableEntry("broker" + n, this.executorService.submit(() -> ((BrokerServer)broker).shutdown())));
            }
            this.waitForAllFutures(futureEntries);
            futureEntries.clear();
            for (Map.Entry<Integer, BrokerServer> entry : this.controllers.entrySet()) {
                int n = entry.getKey();
                ControllerServer controller = (ControllerServer)entry.getValue();
                futureEntries.add(new AbstractMap.SimpleImmutableEntry("controller" + n, this.executorService.submit(() -> ((ControllerServer)controller).shutdown())));
            }
            this.waitForAllFutures(futureEntries);
            futureEntries.clear();
            Utils.delete((File)this.baseDirectory);
        }
        catch (Exception e) {
            for (Map.Entry entry : futureEntries) {
                ((Future)entry.getValue()).cancel(true);
            }
            throw e;
        }
        finally {
            this.executorService.shutdownNow();
            this.executorService.awaitTermination(5L, TimeUnit.MINUTES);
        }
        this.faultHandlerFactory.fatalFaultHandler().maybeRethrowFirstException();
        this.faultHandlerFactory.nonFatalFaultHandler().maybeRethrowFirstException();
    }

    private void waitForAllFutures(List<Map.Entry<String, Future<?>>> futureEntries) throws Exception {
        for (Map.Entry<String, Future<?>> entry : futureEntries) {
            log.debug("waiting for {} to shut down.", (Object)entry.getKey());
            entry.getValue().get();
            log.debug("{} successfully shut down.", (Object)entry.getKey());
        }
    }

    public static class Builder {
        private TestKitNodes nodes;
        private Map<String, String> configProps = new HashMap<String, String>();
        private SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();

        public Builder(TestKitNodes nodes) {
            this.nodes = nodes;
        }

        public Builder setConfigProp(String key, String value) {
            this.configProps.put(key, value);
            return this;
        }

        private KafkaConfig createNodeConfig(TestKitNode node) {
            BrokerNode brokerNode = (BrokerNode)this.nodes.brokerNodes().get(node.id());
            ControllerNode controllerNode = this.nodes.controllerNodes().get(node.id());
            HashMap<String, String> props = new HashMap<String, String>(this.configProps);
            props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), this.roles(node.id()));
            props.put(KafkaConfig$.MODULE$.NodeIdProp(), Integer.toString(node.id()));
            if (controllerNode != null) {
                props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), controllerNode.metadataDirectory());
            } else {
                props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), node.metadataDirectory());
            }
            if (brokerNode != null) {
                props.put(KafkaConfig$.MODULE$.LogDirsProp(), String.join((CharSequence)",", brokerNode.logDataDirectories()));
            }
            props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
            props.put(KafkaConfig$.MODULE$.ListenersProp(), this.listeners(node.id()));
            props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), this.nodes.interBrokerListenerName().value());
            props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
            String uninitializedQuorumVotersString = this.nodes.controllerNodes().keySet().stream().map(n -> String.format("%d@0.0.0.0:0", n)).collect(Collectors.joining(","));
            props.put("controller.quorum.voters", uninitializedQuorumVotersString);
            props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152");
            if (brokerNode != null) {
                props.putAll(brokerNode.propertyOverrides());
            }
            return new KafkaConfig(props, false, Option.empty());
        }

        public KafkaClusterTestKit build() throws Exception {
            HashMap<Integer, ControllerServer> controllers = new HashMap<Integer, ControllerServer>();
            HashMap<Integer, BrokerServer> brokers = new HashMap<Integer, BrokerServer>();
            HashMap<Integer, SharedServer> jointServers = new HashMap<Integer, SharedServer>();
            int numOfExecutorThreads = (this.nodes.brokerNodes().size() + this.nodes.controllerNodes().size()) * 2;
            ExecutorService executorService = null;
            ControllerQuorumVotersFutureManager connectFutureManager = new ControllerQuorumVotersFutureManager(this.nodes.controllerNodes().size());
            File baseDirectory = null;
            try {
                baseDirectory = TestUtils.tempDirectory();
                this.nodes = this.nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
                executorService = Executors.newFixedThreadPool(numOfExecutorThreads, ThreadUtils.createThreadFactory((String)"KafkaClusterTestKit%d", (boolean)false));
                for (ControllerNode controllerNode : this.nodes.controllerNodes().values()) {
                    Builder.setupNodeDirectories(baseDirectory, controllerNode.metadataDirectory(), Collections.emptyList());
                    BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromVersion((MetadataVersion)this.nodes.bootstrapMetadataVersion(), (String)"testkit");
                    String threadNamePrefix = this.nodes.brokerNodes().containsKey(controllerNode.id()) ? String.format("colocated%d", controllerNode.id()) : String.format("controller%d", controllerNode.id());
                    SharedServer sharedServer = new SharedServer(this.createNodeConfig(controllerNode), MetaProperties.apply((String)this.nodes.clusterId().toString(), (int)controllerNode.id()), Time.SYSTEM, new Metrics(), Option.apply((Object)threadNamePrefix), connectFutureManager.future, (FaultHandlerFactory)this.faultHandlerFactory);
                    ControllerServer controller = null;
                    try {
                        controller = new ControllerServer(sharedServer, KafkaRaftServer.configSchema(), bootstrapMetadata);
                    }
                    catch (Throwable e2) {
                        log.error("Error creating controller {}", (Object)controllerNode.id(), (Object)e2);
                        Utils.swallow((Logger)log, (String)"sharedServer.stopForController", () -> sharedServer.stopForController());
                        if (controller != null) {
                            controller.shutdown();
                        }
                        throw e2;
                    }
                    controllers.put(controllerNode.id(), controller);
                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
                        if (e != null) {
                            connectFutureManager.fail((Throwable)e);
                        } else {
                            connectFutureManager.registerPort(controllerNode.id(), (int)port);
                        }
                    });
                    jointServers.put(controllerNode.id(), sharedServer);
                }
                for (BrokerNode brokerNode : this.nodes.brokerNodes().values()) {
                    SharedServer sharedServer = jointServers.computeIfAbsent(brokerNode.id(), id -> new SharedServer(this.createNodeConfig(brokerNode), MetaProperties.apply((String)this.nodes.clusterId().toString(), (int)id), Time.SYSTEM, new Metrics(), Option.apply((Object)String.format("broker%d_", id)), connectFutureManager.future, (FaultHandlerFactory)this.faultHandlerFactory));
                    BrokerServer broker = null;
                    try {
                        broker = new BrokerServer(sharedServer, JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq());
                    }
                    catch (Throwable e3) {
                        log.error("Error creating broker {}", (Object)brokerNode.id(), (Object)e3);
                        Utils.swallow((Logger)log, (String)"sharedServer.stopForBroker", () -> sharedServer.stopForBroker());
                        if (broker != null) {
                            broker.shutdown();
                        }
                        throw e3;
                    }
                    brokers.put(brokerNode.id(), broker);
                }
            }
            catch (Exception e4) {
                if (executorService != null) {
                    executorService.shutdownNow();
                    executorService.awaitTermination(5L, TimeUnit.MINUTES);
                }
                for (BrokerServer brokerServer : brokers.values()) {
                    brokerServer.shutdown();
                }
                for (ControllerServer controller : controllers.values()) {
                    controller.shutdown();
                }
                connectFutureManager.close();
                if (baseDirectory != null) {
                    Utils.delete((File)baseDirectory);
                }
                throw e4;
            }
            return new KafkaClusterTestKit(executorService, this.nodes, controllers, brokers, connectFutureManager, baseDirectory, this.faultHandlerFactory);
        }

        private String listeners(int node) {
            if (this.nodes.isCoResidentNode(node)) {
                return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
            }
            if (this.nodes.controllerNodes().containsKey(node)) {
                return "CONTROLLER://localhost:0";
            }
            return "EXTERNAL://localhost:0";
        }

        private String roles(int node) {
            if (this.nodes.isCoResidentNode(node)) {
                return "broker,controller";
            }
            if (this.nodes.controllerNodes().containsKey(node)) {
                return "controller";
            }
            return "broker";
        }

        private static void setupNodeDirectories(File baseDirectory, String metadataDirectory, Collection<String> logDataDirectories) throws Exception {
            Files.createDirectories(new File(baseDirectory, "local").toPath(), new FileAttribute[0]);
            Files.createDirectories(Paths.get(metadataDirectory, new String[0]), new FileAttribute[0]);
            for (String logDataDirectory : logDataDirectories) {
                Files.createDirectories(Paths.get(logDataDirectory, new String[0]), new FileAttribute[0]);
            }
        }
    }

    static class SimpleFaultHandlerFactory
    implements FaultHandlerFactory {
        private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
        private final MockFaultHandler nonFatalFaultHandler = new MockFaultHandler("nonFatalFaultHandler");

        SimpleFaultHandlerFactory() {
        }

        MockFaultHandler fatalFaultHandler() {
            return this.fatalFaultHandler;
        }

        MockFaultHandler nonFatalFaultHandler() {
            return this.nonFatalFaultHandler;
        }

        public FaultHandler build(String name, boolean fatal, Runnable action) {
            if (fatal) {
                return this.fatalFaultHandler;
            }
            return this.nonFatalFaultHandler;
        }
    }

    private static class ControllerQuorumVotersFutureManager
    implements AutoCloseable {
        private final int expectedControllers;
        private final CompletableFuture<Map<Integer, RaftConfig.AddressSpec>> future = new CompletableFuture();
        private final Map<Integer, Integer> controllerPorts = new TreeMap<Integer, Integer>();

        ControllerQuorumVotersFutureManager(int expectedControllers) {
            this.expectedControllers = expectedControllers;
        }

        synchronized void registerPort(int nodeId, int port) {
            this.controllerPorts.put(nodeId, port);
            if (this.controllerPorts.size() >= this.expectedControllers) {
                this.future.complete(this.controllerPorts.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", (int)((Integer)entry.getValue()))))));
            }
        }

        void fail(Throwable e) {
            this.future.completeExceptionally(e);
        }

        @Override
        public void close() {
            this.future.cancel(true);
        }
    }
}

