/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import java.util.concurrent.BlockingQueue;
import kafka.api.LeaderAndIsrResponse$;
import kafka.api.RequestKeys$;
import kafka.api.RequestOrResponse;
import kafka.api.StopReplicaResponse$;
import kafka.api.UpdateMetadataResponse$;
import kafka.cluster.Broker;
import kafka.controller.ControllerContext;
import kafka.controller.KafkaController;
import kafka.controller.KafkaController$;
import kafka.network.BlockingChannel;
import kafka.network.Receive;
import kafka.utils.ShutdownableThread;
import kafka.utils.ShutdownableThread$;
import kafka.utils.Utils$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001}4A!\u0001\u0002\u0001\u000f\t\t\"+Z9vKN$8+\u001a8e)\"\u0014X-\u00193\u000b\u0005\r!\u0011AC2p]R\u0014x\u000e\u001c7fe*\tQ!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001A\u0001CA\u0005\r\u001b\u0005Q!BA\u0006\u0005\u0003\u0015)H/\u001b7t\u0013\ti!B\u0001\nTQV$Hm\\<oC\ndW\r\u00165sK\u0006$\u0007\u0002C\b\u0001\u0005\u000b\u0007I\u0011\u0001\t\u0002\u0019\r|g\u000e\u001e:pY2,'/\u00133\u0016\u0003E\u0001\"AE\u000b\u000e\u0003MQ\u0011\u0001F\u0001\u0006g\u000e\fG.Y\u0005\u0003-M\u00111!\u00138u\u0011!A\u0002A!A!\u0002\u0013\t\u0012!D2p]R\u0014x\u000e\u001c7fe&#\u0007\u0005\u0003\u0005\u001b\u0001\t\u0015\r\u0011\"\u0001\u001c\u0003E\u0019wN\u001c;s_2dWM]\"p]R,\u0007\u0010^\u000b\u00029A\u0011QDH\u0007\u0002\u0005%\u0011qD\u0001\u0002\u0012\u0007>tGO]8mY\u0016\u00148i\u001c8uKb$\b\u0002C\u0011\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002%\r|g\u000e\u001e:pY2,'oQ8oi\u0016DH\u000f\t\u0005\tG\u0001\u0011)\u0019!C\u0001I\u0005AAo\u001c\"s_.,'/F\u0001&!\t1\u0013&D\u0001(\u0015\tAC!A\u0004dYV\u001cH/\u001a:\n\u0005):#A\u0002\"s_.,'\u000f\u0003\u0005-\u0001\t\u0005\t\u0015!\u0003&\u0003%!xN\u0011:pW\u0016\u0014\b\u0005\u0003\u0005/\u0001\t\u0015\r\u0011\"\u00010\u0003\u0015\tX/Z;f+\u0005\u0001\u0004cA\u00199u5\t!G\u0003\u00024i\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0005U2\u0014\u0001B;uS2T\u0011aN\u0001\u0005U\u00064\u0018-\u0003\u0002:e\ti!\t\\8dW&tw-U;fk\u0016\u0004BAE\u001e>\u0007&\u0011Ah\u0005\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0005y\nU\"A \u000b\u0005\u0001#\u0011aA1qS&\u0011!i\u0010\u0002\u0012%\u0016\fX/Z:u\u001fJ\u0014Vm\u001d9p]N,\u0007\u0003\u0002\nE{\u0019K!!R\n\u0003\u0013\u0019+hn\u0019;j_:\f\u0004C\u0001\nH\u0013\tA5C\u0001\u0003V]&$\b\u0002\u0003&\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0019\u0002\rE,X-^3!\u0011!a\u0005A!b\u0001\n\u0003i\u0015aB2iC:tW\r\\\u000b\u0002\u001dB\u0011qJU\u0007\u0002!*\u0011\u0011\u000bB\u0001\b]\u0016$xo\u001c:l\u0013\t\u0019\u0006KA\bCY>\u001c7.\u001b8h\u0007\"\fgN\\3m\u0011!)\u0006A!A!\u0002\u0013q\u0015\u0001C2iC:tW\r\u001c\u0011\t\u000b]\u0003A\u0011\u0001-\u0002\rqJg.\u001b;?)\u0019I&l\u0017/^=B\u0011Q\u0004\u0001\u0005\u0006\u001fY\u0003\r!\u0005\u0005\u00065Y\u0003\r\u0001\b\u0005\u0006GY\u0003\r!\n\u0005\u0006]Y\u0003\r\u0001\r\u0005\u0006\u0019Z\u0003\rA\u0014\u0005\bA\u0002\u0011\r\u0011\"\u0003b\u0003\u0011awnY6\u0016\u0003\t\u0004\"a\u00194\u000e\u0003\u0011T!!\u001a\u001c\u0002\t1\fgnZ\u0005\u0003O\u0012\u0014aa\u00142kK\u000e$\bBB5\u0001A\u0003%!-A\u0003m_\u000e\\\u0007\u0005C\u0004l\u0001\t\u0007I\u0011\u00027\u0002#M$\u0018\r^3DQ\u0006tw-\u001a'pO\u001e,'/F\u0001n!\tq\u0017O\u0004\u0002\u001e_&\u0011\u0001OA\u0001\u0010\u0017\u000647.Y\"p]R\u0014x\u000e\u001c7fe&\u0011!o\u001d\u0002\u0012'R\fG/Z\"iC:<W\rT8hO\u0016\u0014(B\u00019\u0003\u0011\u0019)\b\u0001)A\u0005[\u0006\u00112\u000f^1uK\u000eC\u0017M\\4f\u0019><w-\u001a:!\u0011\u00159\b\u0001\"\u0011y\u0003\u0019!wnV8sWR\ta\tC\u0003{\u0001\u0011%10A\bd_:tWm\u0019;U_\n\u0013xn[3s)\r1EP \u0005\u0006{f\u0004\r!J\u0001\u0007EJ|7.\u001a:\t\u000b1K\b\u0019\u0001(")
public class RequestSendThread
extends ShutdownableThread {
    private final int controllerId;
    private final ControllerContext controllerContext;
    private final Broker toBroker;
    private final BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue;
    private final BlockingChannel channel;
    private final Object lock;
    private final KafkaController.StateChangeLogger stateChangeLogger;

    public int controllerId() {
        return this.controllerId;
    }

    public ControllerContext controllerContext() {
        return this.controllerContext;
    }

    public Broker toBroker() {
        return this.toBroker;
    }

    public BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue() {
        return this.queue;
    }

    public BlockingChannel channel() {
        return this.channel;
    }

    private Object lock() {
        return this.lock;
    }

    private KafkaController.StateChangeLogger stateChangeLogger() {
        return this.stateChangeLogger;
    }

    @Override
    public void doWork() {
        block7: {
            Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>> queueItem = this.queue().take();
            RequestOrResponse request = (RequestOrResponse)queueItem._1();
            Function1 callback = (Function1)queueItem._2();
            ObjectRef receive = ObjectRef.create(null);
            try {
                Object object = this.lock();
                synchronized (object) {
                    short s;
                    block11: {
                        ObjectRef response;
                        block9: {
                            block10: {
                                block8: {
                                    BooleanRef isSendSuccessful = BooleanRef.create((boolean)false);
                                    while (this.isRunning().get() && !isSendSuccessful.elem) {
                                        this.liftedTree1$1(request, receive, isSendSuccessful);
                                    }
                                    response = ObjectRef.create(null);
                                    s = BoxesRunTime.unboxToShort((Object)request.requestId().get());
                                    if (RequestKeys$.MODULE$.LeaderAndIsrKey() != s) break block8;
                                    response.elem = LeaderAndIsrResponse$.MODULE$.readFrom(((Receive)receive.elem).buffer());
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    break block9;
                                }
                                if (RequestKeys$.MODULE$.StopReplicaKey() != s) break block10;
                                response.elem = StopReplicaResponse$.MODULE$.readFrom(((Receive)receive.elem).buffer());
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                break block9;
                            }
                            if (RequestKeys$.MODULE$.UpdateMetadataKey() != s) break block11;
                            response.elem = UpdateMetadataResponse$.MODULE$.readFrom(((Receive)receive.elem).buffer());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        }
                        this.stateChangeLogger().trace((Function0<String>)new Serializable(this, response){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ RequestSendThread $outer;
                            private final ObjectRef response$1;

                            public final String apply() {
                                return new StringOps(Predef$.MODULE$.augmentString("Controller %d epoch %d received response %s for a request sent to broker %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), BoxesRunTime.boxToInteger((int)this.$outer.controllerContext().epoch()), ((RequestOrResponse)this.response$1.elem).toString(), this.$outer.toBroker().toString()}));
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.response$1 = response$1;
                            }
                        });
                        Object object2 = callback == null ? BoxedUnit.UNIT : callback.apply((Object)((RequestOrResponse)response.elem));
                        break block7;
                    }
                    throw new MatchError((Object)BoxesRunTime.boxToShort((short)s));
                }
            }
            catch (Throwable throwable) {
                this.error((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ RequestSendThread $outer;

                    public final String apply() {
                        return new StringOps(Predef$.MODULE$.augmentString("Controller %d fails to send a request to broker %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.$outer.toBroker().toString()}));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                }, (Function0<Throwable>)new Serializable(this, throwable){
                    public static final long serialVersionUID = 0L;
                    private final Throwable e$3;

                    public final Throwable apply() {
                        return this.e$3;
                    }
                    {
                        this.e$3 = e$3;
                    }
                });
                this.channel().disconnect();
            }
        }
    }

    private void connectToBroker(Broker broker, BlockingChannel channel) {
        try {
            channel.connect();
            this.info((Function0<String>)new Serializable(this, broker){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RequestSendThread $outer;
                private final Broker broker$2;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Controller %d connected to %s for sending state change requests")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.broker$2.toString()}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.broker$2 = broker$2;
                }
            });
        }
        catch (Throwable throwable) {
            channel.disconnect();
            this.error((Function0<String>)new Serializable(this, broker){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RequestSendThread $outer;
                private final Broker broker$2;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Controller %d's connection to broker %s was unsuccessful")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.broker$2.toString()}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.broker$2 = broker$2;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$4;

                public final Throwable apply() {
                    return this.e$4;
                }
                {
                    this.e$4 = e$4;
                }
            });
        }
    }

    private final void liftedTree1$1(RequestOrResponse request$2, ObjectRef receive$1, BooleanRef isSendSuccessful$1) {
        try {
            this.channel().send(request$2);
            receive$1.elem = this.channel().receive();
            isSendSuccessful$1.elem = true;
        }
        catch (Throwable throwable) {
            this.warn((Function0<String>)new Serializable(this, request$2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RequestSendThread $outer;
                private final RequestOrResponse request$2;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Controller %d epoch %d fails to send request %s to broker %s. Reconnecting to broker.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), BoxesRunTime.boxToInteger((int)this.$outer.controllerContext().epoch()), this.request$2.toString(), this.$outer.toBroker().toString()}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.request$2 = request$2;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$2;

                public final Throwable apply() {
                    return this.e$2;
                }
                {
                    this.e$2 = e$2;
                }
            });
            this.channel().disconnect();
            this.connectToBroker(this.toBroker(), this.channel());
            isSendSuccessful$1.elem = false;
            Utils$.MODULE$.swallow((Function0<BoxedUnit>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    Thread.sleep(300L);
                }
            });
        }
    }

    public RequestSendThread(int controllerId, ControllerContext controllerContext, Broker toBroker, BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue, BlockingChannel channel) {
        this.controllerId = controllerId;
        this.controllerContext = controllerContext;
        this.toBroker = toBroker;
        this.queue = queue;
        this.channel = channel;
        super(new StringOps(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)controllerId), BoxesRunTime.boxToInteger((int)toBroker.id())})), ShutdownableThread$.MODULE$.$lessinit$greater$default$2());
        this.lock = new Object();
        this.stateChangeLogger = KafkaController$.MODULE$.stateChangeLogger();
        this.connectToBroker(toBroker, channel);
    }
}

