/*
 * Decompiled with CFR 0.152.
 */
package io.github.embeddedkafka.ops;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.FaultHandlerFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer$;
import kafka.server.Server$;
import kafka.server.SharedServer;
import kafka.server.StandardFaultHandlerFactory;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.metadata.properties.PropertiesUtils;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.ServerSocketFactory;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.MapOps;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Using;
import scala.util.Using$;

@ScalaSignature(bytes="\u0006\u0005\u0005MaaB\u0006\r!\u0003\r\t!\u0006\u0005\u00069\u0001!\t!\b\u0005\bC\u0001\u0011\r\u0011\"\u0003#\u0011\u001dY\u0003A1A\u0005\u00121Bq\u0001\r\u0001C\u0002\u0013E\u0011\u0007C\u00046\u0001\t\u0007I\u0011\u0003\u0017\t\rY\u0002A\u0011\u0001\b8\u0011\u00151\u0007\u0001\"\u0003h\u0011\u0015A\u0007\u0001\"\u0003j\u0011\u0015q\b\u0001\"\u0003\u0000\u0011\u001d\t\u0019\u0001\u0001C\u0005\u0003\u000b\u0011\u0001bS1gW\u0006|\u0005o\u001d\u0006\u0003\u001b9\t1a\u001c9t\u0015\ty\u0001#A\u0007f[\n,G\rZ3eW\u000647.\u0019\u0006\u0003#I\taaZ5uQV\u0014'\"A\n\u0002\u0005%|7\u0001A\n\u0003\u0001Y\u0001\"a\u0006\u000e\u000e\u0003aQ\u0011!G\u0001\u0006g\u000e\fG.Y\u0005\u00037a\u0011a!\u00118z%\u00164\u0017A\u0002\u0013j]&$H\u0005F\u0001\u001f!\t9r$\u0003\u0002!1\t!QK\\5u\u0003\u0019awnZ4feV\t1\u0005\u0005\u0002%S5\tQE\u0003\u0002'O\u0005)1\u000f\u001c45U*\t\u0001&A\u0002pe\u001eL!AK\u0013\u0003\r1{wmZ3s\u0003\u0019qw\u000eZ3JIV\tQ\u0006\u0005\u0002\u0018]%\u0011q\u0006\u0007\u0002\u0004\u0013:$\u0018\u0001E1vi>\u001c%/Z1uKR{\u0007/[2t+\u0005\u0011\u0004CA\f4\u0013\t!\u0004DA\u0004C_>dW-\u00198\u000251|wm\u00117fC:,'\u000fR3ekB,')\u001e4gKJ\u001c\u0016N_3\u0002\u0015M$\u0018M\u001d;LC\u001a\\\u0017\rF\u00039\r\"S%\f\u0005\u0003\u0018sm\u001a\u0015B\u0001\u001e\u0019\u0005\u0019!V\u000f\u001d7feA\u0011A(Q\u0007\u0002{)\u0011ahP\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0001\u000bQa[1gW\u0006L!AQ\u001f\u0003\u0019\t\u0013xn[3s'\u0016\u0014h/\u001a:\u0011\u0005q\"\u0015BA#>\u0005A\u0019uN\u001c;s_2dWM]*feZ,'\u000fC\u0003H\r\u0001\u0007Q&A\u0005lC\u001a\\\u0017\rU8si\")\u0011J\u0002a\u0001[\u0005q1m\u001c8ue>dG.\u001a:Q_J$\b\"B&\u0007\u0001\u0004a\u0015AF2vgR|WN\u0011:pW\u0016\u0014\bK]8qKJ$\u0018.Z:\u0011\t5#vk\u0016\b\u0003\u001dJ\u0003\"a\u0014\r\u000e\u0003AS!!\u0015\u000b\u0002\rq\u0012xn\u001c;?\u0013\t\u0019\u0006$\u0001\u0004Qe\u0016$WMZ\u0005\u0003+Z\u00131!T1q\u0015\t\u0019\u0006\u0004\u0005\u0002N1&\u0011\u0011L\u0016\u0002\u0007'R\u0014\u0018N\\4\t\u000bm3\u0001\u0019\u0001/\u0002\u0017-\fgm[1M_\u001e$\u0015N\u001d\t\u0003;\u0012l\u0011A\u0018\u0006\u0003?\u0002\fAAZ5mK*\u0011\u0011MY\u0001\u0004]&|'\"A2\u0002\t)\fg/Y\u0005\u0003Kz\u0013A\u0001U1uQ\u00069r-\u001a8fe\u0006$XMU1oI>l7\t\\;ti\u0016\u0014\u0018\n\u001a\u000b\u0002/\u0006\u0019rO]5uK6+G/\u0019)s_B,'\u000f^5fgR\u0019aD[9\t\u000b-D\u0001\u0019\u00017\u0002\r1|w\rR5s!\tiw.D\u0001o\u0015\t\u0019\"-\u0003\u0002q]\n!a)\u001b7f\u0011\u0015\u0011\b\u00021\u0001t\u00039iW\r^1Qe>\u0004XM\u001d;jKN\u0004\"\u0001\u001e?\u000e\u0003UT!A^<\u0002\u0015A\u0014x\u000e]3si&,7O\u0003\u0002ys\u0006AQ.\u001a;bI\u0006$\u0018M\u0003\u0002Au*\u00111pJ\u0001\u0007CB\f7\r[3\n\u0005u,(AD'fi\u0006\u0004&o\u001c9feRLWm]\u0001\u001cM&tG\rU8si\u001a{'oQ8oiJ|G\u000e\\3s\u001fJ4\u0015-\u001b7\u0015\u00075\n\t\u0001C\u0003J\u0013\u0001\u0007Q&\u0001\ngS:$'+\u00198e_64%/Z3Q_J$HCAA\u0004!\u0015\tI!a\u0004.\u001b\t\tYAC\u0002\u0002\u000ea\tA!\u001e;jY&!\u0011\u0011CA\u0006\u0005\r!&/\u001f")
public interface KafkaOps {
    public void io$github$embeddedkafka$ops$KafkaOps$_setter_$io$github$embeddedkafka$ops$KafkaOps$$logger_$eq(Logger var1);

    public void io$github$embeddedkafka$ops$KafkaOps$_setter_$nodeId_$eq(int var1);

    public void io$github$embeddedkafka$ops$KafkaOps$_setter_$autoCreateTopics_$eq(boolean var1);

    public void io$github$embeddedkafka$ops$KafkaOps$_setter_$logCleanerDedupeBufferSize_$eq(int var1);

    public Logger io$github$embeddedkafka$ops$KafkaOps$$logger();

    public int nodeId();

    public boolean autoCreateTopics();

    public int logCleanerDedupeBufferSize();

    public static /* synthetic */ Tuple2 startKafka$(KafkaOps $this, int kafkaPort, int controllerPort, scala.collection.immutable.Map customBrokerProperties, Path kafkaLogDir) {
        return $this.startKafka(kafkaPort, controllerPort, (scala.collection.immutable.Map<String, String>)customBrokerProperties, kafkaLogDir);
    }

    default public Tuple2<BrokerServer, ControllerServer> startKafka(int kafkaPort, int controllerPort, scala.collection.immutable.Map<String, String> customBrokerProperties, Path kafkaLogDir) {
        int actualControllerPort = this.findPortForControllerOrFail(controllerPort);
        String brokerListener = new StringBuilder(13).append(SecurityProtocol.PLAINTEXT).append("://localhost:").append(kafkaPort).toString();
        String controllerListener = new StringBuilder(23).append("CONTROLLER://localhost:").append(actualControllerPort).toString();
        scala.collection.immutable.Map configProperties = (scala.collection.immutable.Map)((MapOps)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"process.roles"), (Object)"broker,controller"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"node.id"), (Object)Integer.toString(this.nodeId())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"controller.listener.names"), (Object)"CONTROLLER"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"controller.quorum.voters"), (Object)new StringBuilder(11).append(this.nodeId()).append("@localhost:").append(actualControllerPort).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker.id"), (Object)Integer.toString(this.nodeId())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"listeners"), (Object)new StringBuilder(1).append(brokerListener).append(",").append(controllerListener).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"advertised.listeners"), (Object)brokerListener), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"listener.security.protocol.map"), (Object)"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.create.topics.enable"), (Object)Boolean.toString(this.autoCreateTopics())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"log.dirs"), (Object)((Object)kafkaLogDir.toAbsolutePath()).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG), (Object)Integer.toString(1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"offsets.topic.replication.factor"), (Object)Integer.toString(1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"offsets.topic.num.partitions"), (Object)Integer.toString(1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"log.cleaner.dedupe.buffer.size"), (Object)Integer.toString(this.logCleanerDedupeBufferSize()))}))).$plus$plus(customBrokerProperties);
        KafkaConfig config = new KafkaConfig(CollectionConverters$.MODULE$.MapHasAsJava((Map)configProperties).asJava());
        Time time = Time.SYSTEM;
        String clusterIdBase64 = this.generateRandomClusterId();
        MetaProperties metaProperties = new MetaProperties.Builder().setVersion(MetaPropertiesVersion.V1).setClusterId(clusterIdBase64).setNodeId(this.nodeId()).build();
        String logIdent = new StringBuilder(26).append("[KafkaRaftServer nodeId=").append(config.nodeId()).append("] ").toString();
        this.writeMetaProperties(Paths.get(config.metadataLogDir(), new String[0]).toFile(), metaProperties);
        Tuple2 tuple2 = KafkaRaftServer$.MODULE$.initializeLogDirs(config, this.io$github$embeddedkafka$ops$KafkaOps$$logger(), logIdent);
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        MetaPropertiesEnsemble metaPropsEnsemble = (MetaPropertiesEnsemble)tuple2._1();
        BootstrapMetadata bootstrapMetadata = (BootstrapMetadata)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)metaPropsEnsemble, (Object)bootstrapMetadata);
        MetaPropertiesEnsemble metaPropsEnsemble2 = (MetaPropertiesEnsemble)tuple22._1();
        BootstrapMetadata bootstrapMetadata2 = (BootstrapMetadata)tuple22._2();
        Metrics metrics = Server$.MODULE$.initializeMetrics(config, time, (String)metaPropsEnsemble2.clusterId().get());
        SharedServer sharedServer = new SharedServer(config, metaPropsEnsemble2, time, metrics, CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections((List)config.quorumConfig().voters())), (Collection)QuorumConfig.parseBootstrapServers((List)config.quorumConfig().bootstrapServers()), (FaultHandlerFactory)new StandardFaultHandlerFactory(), ServerSocketFactory.INSTANCE);
        BrokerServer broker = new BrokerServer(sharedServer);
        ControllerServer controller = new ControllerServer(sharedServer, KafkaRaftServer$.MODULE$.configSchema(), bootstrapMetadata2);
        controller.startup();
        broker.startup();
        return new Tuple2((Object)broker, (Object)controller);
    }

    private String generateRandomClusterId() {
        return Uuid.randomUuid().toString();
    }

    private void writeMetaProperties(File logDir, MetaProperties metaProperties) {
        File metaPropertiesFile = new File(logDir.getAbsolutePath(), "meta.properties");
        PropertiesUtils.writePropertiesFile((Properties)metaProperties.toProperties(), (String)metaPropertiesFile.getAbsolutePath(), (boolean)false);
    }

    private int findPortForControllerOrFail(int controllerPort) {
        if (controllerPort == 0) {
            Try<Object> try_ = this.findRandomFreePort();
            if (try_ instanceof Success) {
                Success success = (Success)try_;
                int port = BoxesRunTime.unboxToInt((Object)success.value());
                this.io$github$embeddedkafka$ops$KafkaOps$$logger().info(new StringBuilder(31).append("Found free port ").append(port).append(" for controller").toString());
                return port;
            }
            if (try_ instanceof Failure) {
                Failure failure = (Failure)try_;
                Throwable exception = failure.exception();
                this.io$github$embeddedkafka$ops$KafkaOps$$logger().error("Could not find a free port for the controller", exception);
                throw new RuntimeException("Could not find a free port for the controller", exception);
            }
            throw new MatchError(try_);
        }
        return controllerPort;
    }

    private Try<Object> findRandomFreePort() {
        return Using$.MODULE$.apply((Function0 & Serializable)() -> new ServerSocket(0), (Function1 & Serializable)serverSocket -> BoxesRunTime.boxToInteger((int)serverSocket.getLocalPort()), (Using.Releasable)Using.Releasable$.AutoCloseableIsReleasable$.MODULE$).recoverWith((PartialFunction)new Serializable(null){
            private static final long serialVersionUID = 0L;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                if (A1 instanceof IOException) {
                    IOException iOException = (IOException)A1;
                    return (B1)new Failure((Throwable)new RuntimeException("Could not find a free port", iOException));
                }
                return (B1)function1.apply(x1);
            }

            public final boolean isDefinedAt(Throwable x1) {
                Throwable throwable = x1;
                return throwable instanceof IOException;
            }
        });
    }

    public static void $init$(KafkaOps $this) {
        $this.io$github$embeddedkafka$ops$KafkaOps$_setter_$io$github$embeddedkafka$ops$KafkaOps$$logger_$eq(LoggerFactory.getLogger($this.getClass()));
        $this.io$github$embeddedkafka$ops$KafkaOps$_setter_$nodeId_$eq(0);
        $this.io$github$embeddedkafka$ops$KafkaOps$_setter_$autoCreateTopics_$eq(true);
        $this.io$github$embeddedkafka$ops$KafkaOps$_setter_$logCleanerDedupeBufferSize_$eq(0x100001);
    }
}

