/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.junit;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.junit.ClusterInstanceParameterResolver;
import kafka.test.junit.GenericParameterResolver;
import kafka.utils.EmptyTestInfo;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.compat.java8.OptionConverters;

public class ZkClusterInvocationContext
implements TestTemplateInvocationContext {
    private final String baseDisplayName;
    private final ClusterConfig clusterConfig;
    private final AtomicReference<IntegrationTestHarness> clusterReference;

    public ZkClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig) {
        this.baseDisplayName = baseDisplayName;
        this.clusterConfig = clusterConfig;
        this.clusterReference = new AtomicReference();
    }

    public String getDisplayName(int invocationIndex) {
        String clusterDesc = this.clusterConfig.nameTags().entrySet().stream().map(Object::toString).collect(Collectors.joining(", "));
        return String.format("%s [%d] Type=ZK, %s", this.baseDisplayName, invocationIndex, clusterDesc);
    }

    public List<Extension> getAdditionalExtensions() {
        if (this.clusterConfig.numControllers() != 1) {
            throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
        }
        ZkClusterInstance clusterShim = new ZkClusterInstance(this.clusterConfig, this.clusterReference);
        return Arrays.asList(context -> {
            IntegrationTestHarness cluster = new IntegrationTestHarness(){

                @Override
                public void modifyConfigs(Seq<Properties> props) {
                    super.modifyConfigs(props);
                    for (int i = 0; i < props.length(); ++i) {
                        ((Properties)props.apply(i)).putAll((Map<?, ?>)ZkClusterInvocationContext.this.clusterConfig.brokerServerProperties(i));
                    }
                }

                @Override
                public Properties serverConfig() {
                    Properties props = ZkClusterInvocationContext.this.clusterConfig.serverProperties();
                    props.put(KafkaConfig.InterBrokerProtocolVersionProp(), ZkClusterInvocationContext.this.clusterConfig.metadataVersion().version());
                    return props;
                }

                @Override
                public Properties adminClientConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.adminClientProperties();
                }

                @Override
                public Properties consumerConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.consumerProperties();
                }

                @Override
                public Properties producerConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.producerProperties();
                }

                @Override
                public SecurityProtocol securityProtocol() {
                    return ZkClusterInvocationContext.this.clusterConfig.securityProtocol();
                }

                @Override
                public ListenerName listenerName() {
                    return ZkClusterInvocationContext.this.clusterConfig.listenerName().map(ListenerName::normalised).orElseGet(() -> ListenerName.forSecurityProtocol((SecurityProtocol)this.securityProtocol()));
                }

                @Override
                public Option<Properties> serverSaslProperties() {
                    if (ZkClusterInvocationContext.this.clusterConfig.saslServerProperties().isEmpty()) {
                        return Option.empty();
                    }
                    return Option.apply((Object)ZkClusterInvocationContext.this.clusterConfig.saslServerProperties());
                }

                @Override
                public Option<Properties> clientSaslProperties() {
                    if (ZkClusterInvocationContext.this.clusterConfig.saslClientProperties().isEmpty()) {
                        return Option.empty();
                    }
                    return Option.apply((Object)ZkClusterInvocationContext.this.clusterConfig.saslClientProperties());
                }

                @Override
                public int brokerCount() {
                    return ZkClusterInvocationContext.this.clusterConfig.numBrokers();
                }

                @Override
                public Option<File> trustStoreFile() {
                    return OptionConverters.toScala(ZkClusterInvocationContext.this.clusterConfig.trustStoreFile());
                }
            };
            this.clusterReference.set(cluster);
            if (this.clusterConfig.isAutoStart()) {
                clusterShim.start();
            }
        }, context -> clusterShim.stop(), new ClusterInstanceParameterResolver(clusterShim), new GenericParameterResolver<ClusterConfig>(this.clusterConfig, ClusterConfig.class));
    }

    public static class ZkClusterInstance
    implements ClusterInstance {
        final AtomicReference<IntegrationTestHarness> clusterReference;
        final ClusterConfig config;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);

        ZkClusterInstance(ClusterConfig config, AtomicReference<IntegrationTestHarness> clusterReference) {
            this.config = config;
            this.clusterReference = clusterReference;
        }

        @Override
        public String bootstrapServers() {
            return kafka.utils.TestUtils.bootstrapServers(this.clusterReference.get().servers(), this.clusterReference.get().listenerName());
        }

        @Override
        public String bootstrapControllers() {
            throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
        }

        @Override
        public Collection<SocketServer> brokerSocketServers() {
            return this.servers().map(KafkaServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public ListenerName clientListener() {
            return this.clusterReference.get().listenerName();
        }

        @Override
        public Optional<ListenerName> controlPlaneListenerName() {
            return OptionConverters.toJava((Option)((KafkaServer)this.clusterReference.get().servers().head()).config().controlPlaneListenerName());
        }

        @Override
        public Collection<SocketServer> controllerSocketServers() {
            return this.servers().filter(broker -> broker.kafkaController().isActive()).map(KafkaServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public SocketServer anyBrokerSocketServer() {
            return this.servers().map(KafkaServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
        }

        @Override
        public SocketServer anyControllerSocketServer() {
            return this.servers().filter(broker -> broker.kafkaController().isActive()).map(KafkaServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
        }

        @Override
        public Map<Integer, BrokerFeatures> brokerFeatures() {
            return this.servers().collect(Collectors.toMap(brokerServer -> brokerServer.config().nodeId(), KafkaServer::brokerFeatures));
        }

        @Override
        public String clusterId() {
            return this.servers().findFirst().map(KafkaServer::clusterId).orElseThrow(() -> new RuntimeException("No broker instances found"));
        }

        @Override
        public ClusterInstance.ClusterType clusterType() {
            return ClusterInstance.ClusterType.ZK;
        }

        @Override
        public ClusterConfig config() {
            return this.config;
        }

        @Override
        public Set<Integer> controllerIds() {
            return this.brokerIds();
        }

        @Override
        public Set<Integer> brokerIds() {
            return this.servers().map(brokerServer -> brokerServer.config().nodeId()).collect(Collectors.toSet());
        }

        @Override
        public IntegrationTestHarness getUnderlying() {
            return this.clusterReference.get();
        }

        @Override
        public Admin createAdminClient(Properties configOverrides) {
            return this.clusterReference.get().createAdminClient(this.clientListener(), configOverrides);
        }

        @Override
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                this.clusterReference.get().setUp(new EmptyTestInfo());
            }
        }

        @Override
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.clusterReference.get().tearDown();
            }
        }

        @Override
        public void shutdownBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).shutdown();
        }

        @Override
        public void startBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).startup();
        }

        @Override
        public void rollingBrokerRestart() {
            if (!this.started.get()) {
                throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
            }
            for (int i = 0; i < this.clusterReference.get().brokerCount(); ++i) {
                this.clusterReference.get().killBroker(i);
            }
            this.clusterReference.get().restartDeadBrokers(true);
            this.clusterReference.get().adminClientConfig().put("bootstrap.servers", this.bootstrapServers());
        }

        @Override
        public void waitForReadyBrokers() throws InterruptedException {
            TestUtils.waitForCondition(() -> {
                int numRegisteredBrokers = this.clusterReference.get().zkClient().getAllBrokersInCluster().size();
                return numRegisteredBrokers == this.config.numBrokers();
            }, (String)"Timed out while waiting for brokers to become ready");
        }

        private KafkaServer findBrokerOrThrow(int brokerId) {
            return this.servers().filter(server -> server.config().brokerId() == brokerId).findFirst().orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
        }

        public Stream<KafkaServer> servers() {
            return JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream();
        }
    }
}

