/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.InterBrokerSendThread;
import kafka.common.RequestAndCompletionHandler;
import kafka.server.BrokerToControllerQueueItem;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005-f\u0001B\u000f\u001f\u0001\rB\u0001B\u000b\u0001\u0003\u0002\u0003\u0006Ia\u000b\u0005\tm\u0001\u0011\t\u0019!C\u0001o!Aa\b\u0001BA\u0002\u0013\u0005q\b\u0003\u0005F\u0001\t\u0005\t\u0015)\u00039\u0011!1\u0005A!A!\u0002\u00139\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011B(\t\u0011I\u0003!\u0011!Q\u0001\nMC\u0001B\u0016\u0001\u0003\u0002\u0003\u0006Ia\u0016\u0005\t5\u0002\u0011\t\u0011)A\u00057\"A!\r\u0001B\u0001B\u0003%1\r\u0003\u0005o\u0001\t\u0005\t\u0015!\u0003p\u0011\u0015\u0011\b\u0001\"\u0001t\u0011\u0015q\b\u0001\"\u0003\u0000\u0011%\t)\u0001\u0001b\u0001\n\u0013\t9\u0001\u0003\u0005\u0002$\u0001\u0001\u000b\u0011BA\u0005\u0011%\t)\u0003\u0001b\u0001\n\u0013\t9\u0003\u0003\u0005\u0002>\u0001\u0001\u000b\u0011BA\u0015\u0011%\ty\u0004\u0001a\u0001\n\u0003qr\u0007\u0003\u0006\u0002B\u0001\u0001\r\u0011\"\u0001\u001f\u0003\u0007Bq!a\u0012\u0001A\u0003&\u0001\bC\u0004\u0002R\u0001!\t!a\u0015\t\u000f\u0005m\u0003\u0001\"\u0003\u0002^!9\u00111\r\u0001\u0005\u0002\u0005\u0015\u0004bBA6\u0001\u0011\u0005\u0011Q\u000e\u0005\b\u0003k\u0002A\u0011IA<\u0011!\t\t\n\u0001C\u0001=\u0005M\u0005bBAS\u0001\u0011\u0005\u0013q\u0015\u0005\b\u0003S\u0003A\u0011IAT\u0005}\u0011%o\\6feR{7i\u001c8ue>dG.\u001a:SKF,Xm\u001d;UQJ,\u0017\r\u001a\u0006\u0003?\u0001\naa]3sm\u0016\u0014(\"A\u0011\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\n\t\u0003K!j\u0011A\n\u0006\u0003O\u0001\naaY8n[>t\u0017BA\u0015'\u0005UIe\u000e^3s\u0005J|7.\u001a:TK:$G\u000b\u001b:fC\u0012\fA#\u001b8ji&\fGNT3uo>\u00148n\u00117jK:$\bC\u0001\u00175\u001b\u0005i#B\u0001\u00180\u0003\u001d\u0019G.[3oiNT!!\t\u0019\u000b\u0005E\u0012\u0014AB1qC\u000eDWMC\u00014\u0003\ry'oZ\u0005\u0003k5\u00121bS1gW\u0006\u001cE.[3oi\u0006q\u0012n\u001d(fi^|'o[\"mS\u0016tGOR8s5.\u001cuN\u001c;s_2dWM]\u000b\u0002qA\u0011\u0011\bP\u0007\u0002u)\t1(A\u0003tG\u0006d\u0017-\u0003\u0002>u\t9!i\\8mK\u0006t\u0017AI5t\u001d\u0016$xo\u001c:l\u00072LWM\u001c;G_JT6nQ8oiJ|G\u000e\\3s?\u0012*\u0017\u000f\u0006\u0002A\u0007B\u0011\u0011(Q\u0005\u0003\u0005j\u0012A!\u00168ji\"9AiAA\u0001\u0002\u0004A\u0014a\u0001=%c\u0005y\u0012n\u001d(fi^|'o[\"mS\u0016tGOR8s5.\u001cuN\u001c;s_2dWM\u001d\u0011\u0002)9,Go^8sW\u000ec\u0017.\u001a8u\r\u0006\u001cGo\u001c:z!\u0011I\u0004JS\u0016\n\u0005%S$!\u0003$v]\u000e$\u0018n\u001c82!\tYE*D\u0001\u001f\u0013\tieDA\u000bD_:$(o\u001c7mKJLeNZ8s[\u0006$\u0018n\u001c8\u0002\u001f5,G/\u00193bi\u0006,\u0006\u000fZ1uKJ\u0004\"\u0001\f)\n\u0005Ek#!F'b]V\fG.T3uC\u0012\fG/Y+qI\u0006$XM]\u0001\u0017G>tGO]8mY\u0016\u0014hj\u001c3f!J|g/\u001b3feB\u00111\nV\u0005\u0003+z\u0011acQ8oiJ|G\u000e\\3s\u001d>$W\r\u0015:pm&$WM]\u0001\u0007G>tg-[4\u0011\u0005-C\u0016BA-\u001f\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\tQLW.\u001a\t\u00039\u0002l\u0011!\u0018\u0006\u0003=~\u000bQ!\u001e;jYNT!aJ\u0018\n\u0005\u0005l&\u0001\u0002+j[\u0016\f!\u0002\u001e5sK\u0006$g*Y7f!\t!7N\u0004\u0002fSB\u0011aMO\u0007\u0002O*\u0011\u0001NI\u0001\u0007yI|w\u000e\u001e \n\u0005)T\u0014A\u0002)sK\u0012,g-\u0003\u0002m[\n11\u000b\u001e:j]\u001eT!A\u001b\u001e\u0002\u001dI,GO]=US6,w.\u001e;NgB\u0011\u0011\b]\u0005\u0003cj\u0012A\u0001T8oO\u00061A(\u001b8jiz\"\"\u0002^;wobL(p\u001f?~!\tY\u0005\u0001C\u0003+\u0019\u0001\u00071\u0006C\u00037\u0019\u0001\u0007\u0001\bC\u0003G\u0019\u0001\u0007q\tC\u0003O\u0019\u0001\u0007q\nC\u0003S\u0019\u0001\u00071\u000bC\u0003W\u0019\u0001\u0007q\u000bC\u0003[\u0019\u0001\u00071\fC\u0003c\u0019\u0001\u00071\rC\u0003o\u0019\u0001\u0007q.A\fnCf\u0014WMU3tKRtU\r^<pe.\u001cE.[3oiR\u0019\u0001)!\u0001\t\r\u0005\rQ\u00021\u0001K\u0003U\u0019wN\u001c;s_2dWM]%oM>\u0014X.\u0019;j_:\fAB]3rk\u0016\u001cH/U;fk\u0016,\"!!\u0003\u0011\r\u0005-\u0011\u0011DA\u000f\u001b\t\tiA\u0003\u0003\u0002\u0010\u0005E\u0011AC2p]\u000e,(O]3oi*!\u00111CA\u000b\u0003\u0011)H/\u001b7\u000b\u0005\u0005]\u0011\u0001\u00026bm\u0006LA!a\u0007\u0002\u000e\t\u0019B*\u001b8lK\u0012\u0014En\\2lS:<G)Z9vKB\u00191*a\b\n\u0007\u0005\u0005bDA\u000eCe>\\WM\u001d+p\u0007>tGO]8mY\u0016\u0014\u0018+^3vK&#X-\\\u0001\u000ee\u0016\fX/Z:u#V,W/\u001a\u0011\u0002!\u0005\u001cG/\u001b<f\u0007>tGO]8mY\u0016\u0014XCAA\u0015!\u0019\tY#!\r\u000265\u0011\u0011Q\u0006\u0006\u0005\u0003_\ti!\u0001\u0004bi>l\u0017nY\u0005\u0005\u0003g\tiCA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\u0011\t9$!\u000f\u000e\u0003}K1!a\u000f`\u0005\u0011qu\u000eZ3\u0002#\u0005\u001cG/\u001b<f\u0007>tGO]8mY\u0016\u0014\b%A\u0004ti\u0006\u0014H/\u001a3\u0002\u0017M$\u0018M\u001d;fI~#S-\u001d\u000b\u0004\u0001\u0006\u0015\u0003b\u0002#\u0014\u0003\u0003\u0005\r\u0001O\u0001\tgR\f'\u000f^3eA!\u001aA#a\u0013\u0011\u0007e\ni%C\u0002\u0002Pi\u0012\u0001B^8mCRLG.Z\u0001\u0018C\u000e$\u0018N^3D_:$(o\u001c7mKJ\fE\r\u001a:fgN$\"!!\u0016\u0011\u000be\n9&!\u000e\n\u0007\u0005e#H\u0001\u0004PaRLwN\\\u0001\u0018kB$\u0017\r^3D_:$(o\u001c7mKJ\fE\r\u001a:fgN$2\u0001QA0\u0011\u001d\t\tG\u0006a\u0001\u0003k\t1C\\3x\u0003\u000e$\u0018N^3D_:$(o\u001c7mKJ\fq!\u001a8rk\u0016,X\rF\u0002A\u0003OBq!!\u001b\u0018\u0001\u0004\ti\"A\u0004sKF,Xm\u001d;\u0002\u0013E,X-^3TSj,WCAA8!\rI\u0014\u0011O\u0005\u0004\u0003gR$aA%oi\u0006\u0001r-\u001a8fe\u0006$XMU3rk\u0016\u001cHo\u001d\u000b\u0003\u0003s\u0002b!a\u001f\u0002\u0006\u0006-e\u0002BA?\u0003\u0003s1AZA@\u0013\u0005Y\u0014bAABu\u00059\u0001/Y2lC\u001e,\u0017\u0002BAD\u0003\u0013\u0013\u0001\"\u0013;fe\u0006\u0014G.\u001a\u0006\u0004\u0003\u0007S\u0004cA\u0013\u0002\u000e&\u0019\u0011q\u0012\u0014\u00037I+\u0017/^3ti\u0006sGmQ8na2,G/[8o\u0011\u0006tG\r\\3s\u00039A\u0017M\u001c3mKJ+7\u000f]8og\u0016$B!!&\u0002\"R\u0019\u0001)a&\t\u000f\u0005e%\u00041\u0001\u0002\u001c\u0006A!/Z:q_:\u001cX\rE\u0002-\u0003;K1!a(.\u00059\u0019E.[3oiJ+7\u000f]8og\u0016Dq!a)\u001b\u0001\u0004\ti\"A\u0005rk\u0016,X-\u0013;f[\u00061Am\\,pe.$\u0012\u0001Q\u0001\u0006gR\f'\u000f\u001e")
public class BrokerToControllerRequestThread
extends InterBrokerSendThread {
    private boolean isNetworkClientForZkController;
    private final Function1<ControllerInformation, KafkaClient> networkClientFactory;
    private final ManualMetadataUpdater metadataUpdater;
    private final ControllerNodeProvider controllerNodeProvider;
    private final Time time;
    private final long retryTimeoutMs;
    private final LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue;
    private final AtomicReference<Node> activeController;
    private volatile boolean started;

    public boolean isNetworkClientForZkController() {
        return this.isNetworkClientForZkController;
    }

    public void isNetworkClientForZkController_$eq(boolean x$1) {
        this.isNetworkClientForZkController = x$1;
    }

    private void maybeResetNetworkClient(ControllerInformation controllerInformation) {
        if (this.isNetworkClientForZkController() != controllerInformation.isZkController()) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(88).append("Controller changed to ").append((Object)(this.isNetworkClientForZkController() ? "kraft" : "zk")).append(" mode. ").append("Resetting network client with new controller information : ").append(controllerInformation).toString());
            KafkaClient oldClient = this.networkClient();
            oldClient.initiateClose();
            oldClient.close();
            this.isNetworkClientForZkController_$eq(controllerInformation.isZkController());
            this.updateControllerAddress((Node)controllerInformation.node().orNull(Predef$.MODULE$.$conforms()));
            controllerInformation.node().foreach((Function1 & Serializable & scala.Serializable)n -> {
                this.metadataUpdater.setNodes((java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)n, (List)Nil$.MODULE$)).asJava());
                return BoxedUnit.UNIT;
            });
            this.networkClient_$eq((KafkaClient)this.networkClientFactory.apply((Object)controllerInformation));
        }
    }

    private LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue() {
        return this.requestQueue;
    }

    private AtomicReference<Node> activeController() {
        return this.activeController;
    }

    public boolean started() {
        return this.started;
    }

    public void started_$eq(boolean x$1) {
        this.started = x$1;
    }

    public Option<Node> activeControllerAddress() {
        return Option$.MODULE$.apply((Object)this.activeController().get());
    }

    private void updateControllerAddress(Node newActiveController) {
        this.activeController().set(newActiveController);
    }

    public void enqueue(BrokerToControllerQueueItem request) {
        if (!this.started()) {
            throw new IllegalStateException("Cannot enqueue a request if the request thread is not running");
        }
        this.requestQueue().add(request);
        if (this.activeControllerAddress().isDefined()) {
            this.wakeup();
        }
    }

    public int queueSize() {
        return this.requestQueue().size();
    }

    @Override
    public Iterable<RequestAndCompletionHandler> generateRequests() {
        long currentTimeMs = this.time.milliseconds();
        Iterator<BrokerToControllerQueueItem> requestIter = this.requestQueue().iterator();
        while (requestIter.hasNext()) {
            BrokerToControllerQueueItem request = requestIter.next();
            if (currentTimeMs - request.createdTimeMs() >= this.retryTimeoutMs) {
                requestIter.remove();
                request.callback().onTimeout();
                continue;
            }
            Option<Node> controllerAddress = this.activeControllerAddress();
            if (!controllerAddress.isDefined()) continue;
            requestIter.remove();
            return Option$.MODULE$.option2Iterable((Option)new Some((Object)new RequestAndCompletionHandler(this.time.milliseconds(), (Node)controllerAddress.get(), request.request(), response -> this.handleResponse(request, response))));
        }
        return Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
    }

    public void handleResponse(BrokerToControllerQueueItem queueItem, ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Request ").append(queueItem.request()).append(" received ").append(response).toString());
        if (response.authenticationException() != null) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(59).append("Request ").append(queueItem.request()).append(" failed due to authentication error with controller").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> response.authenticationException());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.versionMismatch() != null) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Request ").append(queueItem.request()).append(" failed due to unsupported version error").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> response.versionMismatch());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.wasDisconnected()) {
            this.updateControllerAddress(null);
            this.requestQueue().putFirst(queueItem);
            return;
        }
        if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(97).append("Request ").append(queueItem.request()).append(" received NOT_CONTROLLER exception. Disconnecting the ").append("connection to the stale controller ").append(this.activeControllerAddress().map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.idString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "null")).toString());
            this.activeControllerAddress().foreach((Function1 & Serializable & scala.Serializable)controllerAddress -> {
                BrokerToControllerRequestThread.$anonfun$handleResponse$9(this, controllerAddress);
                return BoxedUnit.UNIT;
            });
            this.requestQueue().putFirst(queueItem);
            return;
        }
        queueItem.callback().onComplete(response);
    }

    @Override
    public void doWork() {
        ControllerInformation controllerInformation = this.controllerNodeProvider.getControllerInfo();
        this.maybeResetNetworkClient(controllerInformation);
        if (this.activeControllerAddress().isDefined()) {
            super.pollOnce(Long.MAX_VALUE);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Controller isn't cached, looking for local metadata changes");
        Option<Node> option = controllerInformation.node();
        if (option instanceof Some) {
            Node controllerNode = (Node)((Some)option).value();
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("Recorded new controller, from now on will use node ").append(controllerNode).toString());
            this.updateControllerAddress(controllerNode);
            this.metadataUpdater.setNodes((java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)controllerNode, (List)Nil$.MODULE$)).asJava());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "No controller provided, retrying after backoff");
            super.pollOnce(100L);
            return;
        }
        throw new MatchError(option);
    }

    public void start() {
        super.start();
        this.started_$eq(true);
    }

    public static final /* synthetic */ void $anonfun$handleResponse$9(BrokerToControllerRequestThread $this, Node controllerAddress) {
        try {
            $this.networkClient().disconnect(controllerAddress.idString());
        }
        catch (Throwable t) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Had an error while disconnecting from NetworkClient.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> t);
        }
        $this.updateControllerAddress(null);
    }

    public BrokerToControllerRequestThread(KafkaClient initialNetworkClient, boolean isNetworkClientForZkController, Function1<ControllerInformation, KafkaClient> networkClientFactory, ManualMetadataUpdater metadataUpdater, ControllerNodeProvider controllerNodeProvider, KafkaConfig config, Time time, String threadName, long retryTimeoutMs) {
        this.isNetworkClientForZkController = isNetworkClientForZkController;
        this.networkClientFactory = networkClientFactory;
        this.metadataUpdater = metadataUpdater;
        this.controllerNodeProvider = controllerNodeProvider;
        this.time = time;
        this.retryTimeoutMs = retryTimeoutMs;
        super(threadName, initialNetworkClient, (int)Math.min(Integer.MAX_VALUE, Math.min((long)config.controllerSocketTimeoutMs(), retryTimeoutMs)), time, false);
        this.requestQueue = new LinkedBlockingDeque();
        this.activeController = new AtomicReference<Object>(null);
        this.started = false;
    }
}

