/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.epoch;

import com.typesafe.scalalogging.Logger;
import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.log.LogManager;
import kafka.server.BlockingSend;
import kafka.server.BrokerBlockingSender;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.QuorumTestHarness;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.util.Using;
import scala.util.Using$;

@ScalaSignature(bytes="\u0006\u0005\tUb\u0001B\u0015+\u0001EBQ\u0001\u0010\u0001\u0005\u0002uBq\u0001\u0011\u0001A\u0002\u0013\u0005\u0011\tC\u0004P\u0001\u0001\u0007I\u0011\u0001)\t\r]\u0003\u0001\u0015)\u0003C\u0011\u001dA\u0006A1A\u0005\u0002eCaA\u0019\u0001!\u0002\u0013Q\u0006bB2\u0001\u0005\u0004%\t!\u0017\u0005\u0007I\u0002\u0001\u000b\u0011\u0002.\t\u000f\u0015\u0004!\u0019!C\u0001M\"1!\u000f\u0001Q\u0001\n\u001dDqa\u001d\u0001C\u0002\u0013\u0005a\r\u0003\u0004u\u0001\u0001\u0006Ia\u001a\u0005\bk\u0002\u0011\r\u0011\"\u0001g\u0011\u00191\b\u0001)A\u0005O\"9q\u000f\u0001b\u0001\n\u00031\u0007B\u0002=\u0001A\u0003%q\rC\u0004z\u0001\t\u0007I\u0011\u00014\t\ri\u0004\u0001\u0015!\u0003h\u0011\u001dY\bA1A\u0005\u0002\u0019Da\u0001 \u0001!\u0002\u00139\u0007\"C?\u0001\u0001\u0004\u0005\r\u0011\"\u0001\u007f\u0011-\tI\u0002\u0001a\u0001\u0002\u0004%\t!a\u0007\t\u0015\u0005}\u0001\u00011A\u0001B\u0003&q\u0010C\u0004\u0002\"\u0001!\t%a\t\t\u000f\u0005m\u0002\u0001\"\u0001\u0002$!9\u0011Q\t\u0001\u0005\u0002\u0005\r\u0002bBA%\u0001\u0011\u0005\u00111\u0005\u0005\b\u0003\u001b\u0002A\u0011AA(\u0011\u001d\t\u0019\u0007\u0001C\u0005\u0003kCq!a0\u0001\t\u0013\t\t\rC\u0004\u0002\\\u0002!I!!8\t\u000f\u0005E\b\u0001\"\u0003\u0002$!9\u00111\u001f\u0001\u0005\n\u0005U\bb\u0002B\u0003\u0001\u0011%!q\u0001\u0005\n\u0005;\u0001\u0011\u0013!C\u0005\u0005?1q!!\u0017\u0001\u0001)\nY\u0006\u0003\u0006\u0002d\u0011\u0012\t\u0011)A\u0005\u0003KBa\u0001\u0010\u0013\u0005\u0002\u0005-\u0004bBA8I\u0011\u0005\u0011\u0011\u000f\u0005\b\u0003g#C\u0011AA\u0012\u0005iaU-\u00193fe\u0016\u0003xn\u00195J]R,wM]1uS>tG+Z:u\u0015\tYC&A\u0003fa>\u001c\u0007N\u0003\u0002.]\u000511/\u001a:wKJT\u0011aL\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001!G\u000e\t\u0003gQj\u0011\u0001L\u0005\u0003k1\u0012\u0011#U;peVlG+Z:u\u0011\u0006\u0014h.Z:t!\t9$(D\u00019\u0015\tId&A\u0003vi&d7/\u0003\u0002<q\t9Aj\\4hS:<\u0017A\u0002\u001fj]&$h\bF\u0001?!\ty\u0004!D\u0001+\u0003\u001d\u0011'o\\6feN,\u0012A\u0011\t\u0004\u0007*cU\"\u0001#\u000b\u0005\u00153\u0015aB7vi\u0006\u0014G.\u001a\u0006\u0003\u000f\"\u000b!bY8mY\u0016\u001cG/[8o\u0015\u0005I\u0015!B:dC2\f\u0017BA&E\u0005)a\u0015n\u001d;Ck\u001a4WM\u001d\t\u0003g5K!A\u0014\u0017\u0003\u0017-\u000bgm[1Ce>\\WM]\u0001\fEJ|7.\u001a:t?\u0012*\u0017\u000f\u0006\u0002R+B\u0011!kU\u0007\u0002\u0011&\u0011A\u000b\u0013\u0002\u0005+:LG\u000fC\u0004W\u0007\u0005\u0005\t\u0019\u0001\"\u0002\u0007a$\u0013'\u0001\u0005ce>\\WM]:!\u0003\u0019!x\u000e]5dcU\t!\f\u0005\u0002\\A6\tAL\u0003\u0002^=\u0006!A.\u00198h\u0015\u0005y\u0016\u0001\u00026bm\u0006L!!\u0019/\u0003\rM#(/\u001b8h\u0003\u001d!x\u000e]5dc\u0001\na\u0001^8qS\u000e\u0014\u0014a\u0002;pa&\u001c'\u0007I\u0001\u0005iF\u0002\b'F\u0001h!\tA\u0007/D\u0001j\u0015\tQ7.\u0001\u0004d_6lwN\u001c\u0006\u0003_1T!!\u001c8\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005y\u0017aA8sO&\u0011\u0011/\u001b\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0003\u0015!\u0018\u0007\u001d\u0019!\u0003\u0011!\u0018\u0007]\u0019\u0002\u000bQ\f\u0004/\r\u0011\u0002\tQ\f\u0004OM\u0001\u0006iF\u0002(\u0007I\u0001\u0005iJ\u0002\b'A\u0003ueA\u0004\u0004%\u0001\u0003ueA\u0014\u0014!\u0002;3aJ\u0002\u0013A\u0001;q\u0003\r!\b\u000fI\u0001\taJ|G-^2feV\tq\u0010\u0005\u0005\u0002\u0002\u0005%\u0011QBA\u0007\u001b\t\t\u0019AC\u0002~\u0003\u000bQ1!a\u0002l\u0003\u001d\u0019G.[3oiNLA!a\u0003\u0002\u0004\ti1*\u00194lCB\u0013x\u000eZ;dKJ\u0004RAUA\b\u0003'I1!!\u0005I\u0005\u0015\t%O]1z!\r\u0011\u0016QC\u0005\u0004\u0003/A%\u0001\u0002\"zi\u0016\fA\u0002\u001d:pIV\u001cWM]0%KF$2!UA\u000f\u0011\u001d1f#!AA\u0002}\f\u0011\u0002\u001d:pIV\u001cWM\u001d\u0011\u0002\u0011Q,\u0017M\u001d#po:$\u0012!\u0015\u0015\u00041\u0005\u001d\u0002\u0003BA\u0015\u0003oi!!a\u000b\u000b\t\u00055\u0012qF\u0001\u0004CBL'\u0002BA\u0019\u0003g\tqA[;qSR,'OC\u0002\u000269\fQA[;oSRLA!!\u000f\u0002,\tI\u0011I\u001a;fe\u0016\u000b7\r[\u0001>g\"|W\u000f\u001c3BI\u0012\u001cUO\u001d:f]RdU-\u00193fe\u0016\u0003xn\u00195U_6+7o]1hKN\f5\u000f\u00165fs\u0006\u0013Xm\u0016:jiR,g\u000eV8MK\u0006$WM\u001d\u0015\u00043\u0005}\u0002\u0003BA\u0015\u0003\u0003JA!a\u0011\u0002,\t!A+Z:u\u0003-\u001a\bn\\;mIN+g\u000e\u001a'fC\u0012,'/\u00129pG\"\u0014V-];fgR\fe\u000eZ$fi\u0006\u0013Vm\u001d9p]N,\u0007f\u0001\u000e\u0002@\u0005q3\u000f[8vY\u0012Len\u0019:fCN,G*Z1eKJ,\u0005o\\2i\u0005\u0016$x/Z3o\u0019\u0016\fG-\u001a:SKN$\u0018M\u001d;tQ\rY\u0012qH\u0001-g\"|W\u000f\u001c3TkB\u0004xN\u001d;SKF,Xm\u001d;t\r>\u0014X\t]8dQNtu\u000e^(o)\",G*Z1eKJ$2!UA)\u0011\u001d\t\u0019\u0006\ba\u0001\u0003+\nqAZ3uG\",'\u000fE\u0002\u0002X\u0011j\u0011\u0001\u0001\u0002\u0012)\u0016\u001cHOR3uG\",'\u000f\u00165sK\u0006$7\u0003\u0002\u0013\u0002^Y\u00022AUA0\u0013\r\t\t\u0007\u0013\u0002\u0007\u0003:L(+\u001a4\u0002\rM,g\u000eZ3s!\r\u0019\u0014qM\u0005\u0004\u0003Sb#\u0001\u0004\"m_\u000e\\\u0017N\\4TK:$G\u0003BA+\u0003[Bq!a\u0019'\u0001\u0004\t)'\u0001\tmK\u0006$WM](gMN,Go\u001d$peR!\u00111OAT!\u001d\t)(a\u001eh\u0003wj\u0011AR\u0005\u0004\u0003s2%aA'baB!\u0011QPAQ\u001d\u0011\ty(a'\u000f\t\u0005\u0005\u0015q\u0013\b\u0005\u0003\u0007\u000b)J\u0004\u0003\u0002\u0006\u0006Me\u0002BAD\u0003#sA!!#\u0002\u00106\u0011\u00111\u0012\u0006\u0004\u0003\u001b\u0003\u0014A\u0002\u001fs_>$h(C\u0001p\u0013\tig.\u0003\u00020Y&\u0011!n[\u0005\u0004\u00033K\u0017aB7fgN\fw-Z\u0005\u0005\u0003;\u000by*\u0001\u0011PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fgB|gn]3ECR\f'bAAMS&!\u00111UAS\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTA!!(\u0002 \"9\u0011\u0011V\u0014A\u0002\u0005-\u0016A\u00039beRLG/[8ogB9\u0011QOA<O\u00065\u0006c\u0001*\u00020&\u0019\u0011\u0011\u0017%\u0003\u0007%sG/A\u0003dY>\u001cX\r\u0006\u0004\u0002f\u0005]\u00161\u0018\u0005\u0007\u0003sk\u0002\u0019\u0001'\u0002\t\u0019\u0014x.\u001c\u0005\u0007\u0003{k\u0002\u0019\u0001'\u0002\u0005Q|\u0017\u0001F<bSR4uN]#q_\u000eD7\t[1oO\u0016$v\u000eF\u0004R\u0003\u0007\f).!7\t\u000f\u0005\u0015g\u00041\u0001\u0002H\u0006)Ao\u001c9jGB!\u0011\u0011ZAi\u001d\u0011\tY-!4\u0011\u0007\u0005%\u0005*C\u0002\u0002P\"\u000ba\u0001\u0015:fI\u00164\u0017bA1\u0002T*\u0019\u0011q\u001a%\t\u000f\u0005]g\u00041\u0001\u0002.\u0006I\u0001/\u0019:uSRLwN\u001c\u0005\u0007Wy\u0001\r!!,\u0002/5,7o]1hKND\u0015M^3MK\u0006$WM]#q_\u000eDG\u0003CAp\u0003K\fI/!<\u0011\u0007I\u000b\t/C\u0002\u0002d\"\u0013qAQ8pY\u0016\fg\u000e\u0003\u0004\u0002h~\u0001\r\u0001T\u0001\u0007EJ|7.\u001a:\t\u000f\u0005-x\u00041\u0001\u0002.\u0006\u0019R\r\u001f9fGR,G\rT3bI\u0016\u0014X\t]8dQ\"9\u0011q^\u0010A\u0002\u00055\u0016!C7j]>3gm]3u\u0003m\u0019XM\u001c3G_V\u0014X*Z:tC\u001e,7\u000fV8FC\u000eDGk\u001c9jG\u0006Y1M]3bi\u0016$v\u000e]5d)\u0015\t\u0016q_A}\u0011\u001d\t)-\ta\u0001\u0003\u000fDq!a?\"\u0001\u0004\ti0\u0001\u000eqCJ$\u0018\u000e^5p]J+\u0007\u000f\\5dC\u0006\u001b8/[4o[\u0016tG\u000f\u0005\u0005\u0002v\u0005]\u0014QVA\u0000!\u0019\t)H!\u0001\u0002.&\u0019!1\u0001$\u0003\u0007M+\u0017/\u0001\u000fxC&$XK\u001c;jYF+xN];n\u0019\u0016\fG-\u001a:FY\u0016\u001cG/\u001a3\u0015\r\u00055&\u0011\u0002B\n\u0011\u001d\u0011YA\ta\u0001\u0005\u001b\t\u0001cY8oiJ|G\u000e\\3s'\u0016\u0014h/\u001a:\u0011\u0007M\u0012y!C\u0002\u0003\u00121\u0012\u0001cQ8oiJ|G\u000e\\3s'\u0016\u0014h/\u001a:\t\u0013\tU!\u0005%AA\u0002\t]\u0011a\u0002;j[\u0016|W\u000f\u001e\t\u0004%\ne\u0011b\u0001B\u000e\u0011\n!Aj\u001c8h\u0003\u0019:\u0018-\u001b;V]RLG.U;peVlG*Z1eKJ,E.Z2uK\u0012$C-\u001a4bk2$HEM\u000b\u0003\u0005CQCAa\u0006\u0003$-\u0012!Q\u0005\t\u0005\u0005O\u0011\t$\u0004\u0002\u0003*)!!1\u0006B\u0017\u0003%)hn\u00195fG.,GMC\u0002\u00030!\u000b!\"\u00198o_R\fG/[8o\u0013\u0011\u0011\u0019D!\u000b\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r")
public class LeaderEpochIntegrationTest
extends QuorumTestHarness {
    private ListBuffer<KafkaBroker> brokers = (ListBuffer)ListBuffer$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$);
    private final String topic1;
    private final String topic2;
    private final TopicPartition t1p0 = new TopicPartition(this.topic1(), 0);
    private final TopicPartition t1p1 = new TopicPartition(this.topic1(), 1);
    private final TopicPartition t1p2 = new TopicPartition(this.topic1(), 2);
    private final TopicPartition t2p0 = new TopicPartition(this.topic2(), 0);
    private final TopicPartition t2p2 = new TopicPartition(this.topic2(), 2);
    private final TopicPartition tp = this.t1p0();
    private KafkaProducer<byte[], byte[]> producer;

    public ListBuffer<KafkaBroker> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(ListBuffer<KafkaBroker> x$1) {
        this.brokers = x$1;
    }

    public String topic1() {
        return this.topic1;
    }

    public String topic2() {
        return this.topic2;
    }

    public TopicPartition t1p0() {
        return this.t1p0;
    }

    public TopicPartition t1p1() {
        return this.t1p1;
    }

    public TopicPartition t1p2() {
        return this.t1p2;
    }

    public TopicPartition t2p0() {
        return this.t2p0;
    }

    public TopicPartition t2p2() {
        return this.t2p2;
    }

    public TopicPartition tp() {
        return this.tp;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    @Override
    @AfterEach
    public void tearDown() {
        if (this.producer() != null) {
            this.producer().close();
        }
        TestUtils$.MODULE$.shutdownServers(this.brokers(), true);
        super.tearDown();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
        this.brokers().$plus$plus$eq((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).map((Function1 & Serializable)id -> this.createBroker(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false)), this.createBroker$default$2(), this.createBroker$default$3(), this.createBroker$default$4())));
        new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).foreach((Function1 & Serializable)topic -> {
            this.createTopic(topic, (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1})))}))));
            return BoxedUnit.UNIT;
        });
        this.sendFourMessagesToEachTopic();
        int n = 0;
        long waitUntilTrue_pause = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!this.messagesHaveLeaderEpoch((KafkaBroker)this.brokers().apply(0), n, 0)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Leader epoch should be 0");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), waitUntilTrue_pause));
        }
        ((KafkaBroker)this.brokers().apply(0)).shutdown();
        ((KafkaBroker)this.brokers().apply(0)).startup();
        n = 1;
        this.waitForEpochChangeTo(this.topic1(), 0, n);
        this.waitForEpochChangeTo(this.topic2(), 0, n);
        this.sendFourMessagesToEachTopic();
        long l = 100L;
        long waitUntilTrue_waitTimeMs2 = 15000L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!this.messagesHaveLeaderEpoch((KafkaBroker)this.brokers().apply(0), n, 4)) {
            void waitUntilTrue_pause2;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                Assertions.fail((String)"Leader epoch should be 1");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause2));
        }
    }

    @Test
    public void shouldSendLeaderEpochRequestAndGetAResponse() {
        this.brokers().$plus$plus$eq((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(100), 102).map((Function1 & Serializable)id -> this.createBroker(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false)), this.createBroker$default$2(), this.createBroker$default$3(), this.createBroker$default$4())));
        Map assignment1 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{100}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{101})))}));
        this.createTopic(this.topic1(), (Map<Object, Seq<Object>>)assignment1);
        Map assignment2 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{100})))}));
        this.createTopic(this.topic2(), (Map<Object, Seq<Object>>)assignment2);
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), -1, 60000L, 0x100000L, Integer.MAX_VALUE, 30000, 0, 16384, "none", 20000, SecurityProtocol.PLAINTEXT, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, new ByteArraySerializer(), new ByteArraySerializer(), false));
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10).foreach((Function1 & Serializable)x$2 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(this, BoxesRunTime.unboxToInt((Object)x$2)));
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 20).foreach((Function1 & Serializable)x$3 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(this, BoxesRunTime.unboxToInt((Object)x$3)));
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 30).foreach((Function1 & Serializable)x$4 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(this, BoxesRunTime.unboxToInt((Object)x$4)));
        this.producer().flush();
        TestFetcherThread fetcher0 = new TestFetcherThread(this, this.sender((KafkaBroker)this.brokers().apply(2), (KafkaBroker)this.brokers().apply(0)));
        Map epochsRequested = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p2()), (Object)BoxesRunTime.boxToInteger((int)0))}));
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> offsetsForEpochs = fetcher0.leaderOffsetsFor((Map<TopicPartition, Object>)epochsRequested);
        fetcher0.close();
        Assertions.assertEquals((long)10L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p0())).endOffset());
        Assertions.assertEquals((long)30L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t2p0())).endOffset());
        Assertions.assertEquals((short)Errors.NOT_LEADER_OR_FOLLOWER.code(), (short)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p1())).errorCode());
        Assertions.assertEquals((long)-1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p1())).endOffset());
        TestFetcherThread fetcher1 = new TestFetcherThread(this, this.sender((KafkaBroker)this.brokers().apply(2), (KafkaBroker)this.brokers().apply(1)));
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> offsetsForEpochs1 = fetcher1.leaderOffsetsFor((Map<TopicPartition, Object>)epochsRequested);
        Assertions.assertEquals((long)20L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs1.apply((Object)this.t1p1())).endOffset());
        fetcher1.close();
    }

    @Test
    public void shouldIncreaseLeaderEpochBetweenLeaderRestarts() {
        this.brokers().$plus$eq((Object)this.createBroker(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(100, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false)), this.createBroker$default$2(), this.createBroker$default$3(), this.createBroker$default$4()));
        Assertions.assertEquals((int)this.controllerServer().config().nodeId(), (int)this.waitUntilQuorumLeaderElected(this.controllerServer(), 15000L));
        this.brokers().$plus$eq((Object)this.createBroker(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(101, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false)), this.createBroker$default$2(), this.createBroker$default$3(), this.createBroker$default$4()));
        this.createTopic(this.tp().topic(), (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)this.tp().partition())), (Object)Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{101})))}))));
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), -1, 60000L, 0x100000L, Integer.MAX_VALUE, 30000, 0, 16384, "none", 20000, SecurityProtocol.PLAINTEXT, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, new ByteArraySerializer(), new ByteArraySerializer(), false));
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        TestFetcherThread fetcher = new TestFetcherThread(this, this.sender((KafkaBroker)this.brokers().apply(0), (KafkaBroker)this.brokers().apply(1)));
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((long)1L, (long)this.leo$1());
        ((KafkaBroker)this.brokers().apply(1)).shutdown();
        ((KafkaBroker)this.brokers().apply(1)).startup();
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        fetcher.close();
        fetcher = new TestFetcherThread(this, this.sender((KafkaBroker)this.brokers().apply(0), (KafkaBroker)this.brokers().apply(1)));
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)1))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)2))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)2, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)2L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((long)2L, (long)this.leo$1());
        ((KafkaBroker)this.brokers().apply(1)).shutdown();
        ((KafkaBroker)this.brokers().apply(1)).startup();
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        fetcher.close();
        fetcher = new TestFetcherThread(this, this.sender((KafkaBroker)this.brokers().apply(0), (KafkaBroker)this.brokers().apply(1)));
        Assertions.assertEquals((long)1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)2L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)2))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)3L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)4))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)this.leo$1(), (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)4))})))).apply((Object)this.tp())).endOffset());
        this.shouldSupportRequestsForEpochsNotOnTheLeader(fetcher);
        fetcher.close();
    }

    public void shouldSupportRequestsForEpochsNotOnTheLeader(TestFetcherThread fetcher) {
        Map epoch1 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)1))}));
        Assertions.assertEquals((long)1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch1).apply((Object)this.t1p0())).endOffset());
        Map epoch3 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)3))}));
        Assertions.assertEquals((long)2L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch3).apply((Object)this.t1p0())).endOffset());
        Map epoch5 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)5))}));
        Assertions.assertEquals((long)-1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch5).apply((Object)this.t1p0())).endOffset());
    }

    private BlockingSend sender(KafkaBroker from, KafkaBroker to) {
        Node node = (Node)from.metadataCache().getAliveBrokerNode(to.config().brokerId(), from.config().interBrokerListenerName()).get();
        BrokerEndPoint endPoint = new BrokerEndPoint(node.id(), node.host(), node.port());
        return new BrokerBlockingSender(endPoint, from.config(), new Metrics(), Time.SYSTEM, 42, "TestFetcher", new LogContext());
    }

    /*
     * WARNING - void declaration
     */
    private void waitForEpochChangeTo(String topic, int partition, int epoch) {
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$1(this, topic, partition, epoch)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Epoch didn't change");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private boolean messagesHaveLeaderEpoch(KafkaBroker broker, int expectedLeaderEpoch, int minOffset) {
        BooleanRef result = BooleanRef.create((boolean)true);
        new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).foreach((Function1 & Serializable)topic -> {
            LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$1(this, broker, result, minOffset, expectedLeaderEpoch, topic);
            return BoxedUnit.UNIT;
        });
        return result.elem;
    }

    private void sendFourMessagesToEachTopic() {
        .colon.colon testMessageList1 = new .colon.colon((Object)"test1", (List)new .colon.colon((Object)"test2", (List)new .colon.colon((Object)"test3", (List)new .colon.colon((Object)"test4", (List)Nil$.MODULE$))));
        .colon.colon testMessageList2 = new .colon.colon((Object)"test5", (List)new .colon.colon((Object)"test6", (List)new .colon.colon((Object)"test7", (List)new .colon.colon((Object)"test8", (List)Nil$.MODULE$))));
        String x$12 = TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers());
        StringSerializer x$2 = new StringSerializer();
        StringSerializer x$3 = new StringSerializer();
        int x$4 = -1;
        long x$5 = 60000L;
        long x$6 = 0x100000L;
        int x$72 = Integer.MAX_VALUE;
        int x$8 = 30000;
        int x$9 = 0;
        int x$10 = 16384;
        String x$11 = "none";
        int x$122 = 20000;
        SecurityProtocol x$13 = SecurityProtocol.PLAINTEXT;
        None$ x$14 = None$.MODULE$;
        None$ x$15 = None$.MODULE$;
        boolean x$16 = false;
        KafkaProducer producer = TestUtils$.MODULE$.createProducer(x$12, x$4, x$5, x$6, x$72, x$8, x$9, x$10, x$11, x$122, x$13, (Option<File>)x$14, (Option<Properties>)x$15, x$2, x$3, x$16);
        ((List)testMessageList1.map((Function1 & Serializable)m -> new ProducerRecord(this.topic1(), m, m)).$plus$plus((IterableOnce)testMessageList2.map((Function1 & Serializable)m -> new ProducerRecord(this.topic2(), m, m)))).map((Function1 & Serializable)x$1 -> producer.send(x$1)).foreach((Function1 & Serializable)x$7 -> (RecordMetadata)x$7.get());
        producer.close();
    }

    private void createTopic(String topic, Map<Object, Seq<Object>> partitionReplicaAssignment) {
        Using$.MODULE$.resource((Object)TestUtils$.MODULE$.createAdminClient(this.brokers(), ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), new Properties()), (Function1 & Serializable)admin -> {
            scala.collection.immutable.Map<Object, Object> map;
            try {
                ListBuffer<KafkaBroker> x$4 = this.brokers();
                Seq<ControllerServer> x$5 = this.controllerServers();
                int x$6 = 1;
                int x$7 = 1;
                Properties x$8 = new Properties();
                map = TestUtils$.MODULE$.createTopicWithAdmin((Admin)admin, topic, x$4, x$5, x$6, x$7, partitionReplicaAssignment, x$8);
            }
            finally {
                admin.close();
            }
            return map;
        }, (Using.Releasable)Using.Releasable$.AutoCloseableIsReleasable$.MODULE$);
    }

    /*
     * WARNING - void declaration
     */
    private int waitUntilQuorumLeaderElected(ControllerServer controllerServer, long timeout) {
        void var5_7;
        Tuple2 tuple2;
        long computeUntilTrue_pause = 100L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            LeaderAndEpoch computeUntilTrue_result;
            if (LeaderEpochIntegrationTest.$anonfun$waitUntilQuorumLeaderElected$2(computeUntilTrue_result = LeaderEpochIntegrationTest.$anonfun$waitUntilQuorumLeaderElected$1(controllerServer))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + timeout) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(timeout), computeUntilTrue_pause));
        }
        Object var10_5 = null;
        Tuple2 tuple22 = tuple2;
        if (tuple22 == null) {
            throw new MatchError(null);
        }
        LeaderAndEpoch leaderAndEpoch = (LeaderAndEpoch)tuple22._1();
        return var5_7.leaderId().orElseThrow(() -> new AssertionError((Object)("Quorum Controller leader not elected after " + timeout + " ms")));
    }

    private long waitUntilQuorumLeaderElected$default$2() {
        return 15000L;
    }

    public static final /* synthetic */ boolean $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$3(LeaderEpochIntegrationTest $this, IntRef expectedLeaderEpoch$1) {
        return $this.messagesHaveLeaderEpoch((KafkaBroker)$this.brokers().apply(0), expectedLeaderEpoch$1.elem, 0);
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4() {
        return "Leader epoch should be 0";
    }

    public static final /* synthetic */ boolean $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$5(LeaderEpochIntegrationTest $this, IntRef expectedLeaderEpoch$1) {
        return $this.messagesHaveLeaderEpoch((KafkaBroker)$this.brokers().apply(0), expectedLeaderEpoch$1.elem, 4);
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6() {
        return "Leader epoch should be 1";
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(LeaderEpochIntegrationTest $this, int x$2) {
        return $this.producer().send(new ProducerRecord($this.topic1(), Predef$.MODULE$.int2Integer(0), null, (Object)"IHeartLogs".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(LeaderEpochIntegrationTest $this, int x$3) {
        return $this.producer().send(new ProducerRecord($this.topic1(), Predef$.MODULE$.int2Integer(1), null, (Object)"OhAreThey".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(LeaderEpochIntegrationTest $this, int x$4) {
        return $this.producer().send(new ProducerRecord($this.topic2(), Predef$.MODULE$.int2Integer(0), null, (Object)"IReallyDo".getBytes()));
    }

    private final long leo$1() {
        return ((UnifiedLog)((KafkaBroker)this.brokers().apply(1)).replicaManager().localLog(this.tp()).get()).logEndOffset();
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$1(LeaderEpochIntegrationTest $this, String topic$1, int partition$1, int epoch$1) {
        return ((KafkaBroker)$this.brokers().apply(0)).metadataCache().getLeaderAndIsr(topic$1, partition$1).filter(x$5 -> x$5.leaderEpoch() == epoch$1).isPresent();
    }

    public static final /* synthetic */ String $anonfun$waitForEpochChangeTo$3() {
        return "Epoch didn't change";
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$4(int expectedLeaderEpoch$2, RecordBatch x$6) {
        return expectedLeaderEpoch$2 == x$6.partitionLeaderEpoch();
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$2(TopicPartition tp$1, int minOffset$1, int expectedLeaderEpoch$2, KafkaBroker broker) {
        LogManager qual$2 = broker.logManager();
        boolean x$4 = qual$2.getLog$default$2();
        return ((UnifiedLog)qual$2.getLog(tp$1, x$4).get()).logSegments().stream().allMatch(segment -> {
            if (segment.read((long)minOffset$1, Integer.MAX_VALUE) == null) {
                return false;
            }
            return CollectionConverters$.MODULE$.IteratorHasAsScala(segment.read((long)((long)minOffset$1), (int)Integer.MAX_VALUE).records.batches().iterator()).asScala().forall((Function1 & Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$4(expectedLeaderEpoch$2, x$6)));
        });
    }

    public static final /* synthetic */ void $anonfun$messagesHaveLeaderEpoch$1(LeaderEpochIntegrationTest $this, KafkaBroker broker$1, BooleanRef result$1, int minOffset$1, int expectedLeaderEpoch$2, String topic) {
        TopicPartition tp = new TopicPartition(topic, 0);
        LogManager qual$1 = broker$1.logManager();
        boolean x$2 = qual$1.getLog$default$2();
        long leo = ((UnifiedLog)qual$1.getLog(tp, x$2).get()).logEndOffset();
        result$1.elem = result$1.elem && leo > 0L && $this.brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$2(tp, minOffset$1, expectedLeaderEpoch$2, broker)));
    }

    public static final /* synthetic */ LeaderAndEpoch $anonfun$waitUntilQuorumLeaderElected$1(ControllerServer controllerServer$1) {
        return controllerServer$1.raftManager().leaderAndEpoch();
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilQuorumLeaderElected$2(LeaderAndEpoch x$8) {
        return x$8.leaderId().isPresent();
    }

    public LeaderEpochIntegrationTest() {
        this.topic1 = "foo";
        this.topic2 = "bar";
    }

    public static final /* synthetic */ Object $anonfun$waitUntilQuorumLeaderElected$2$adapted(LeaderAndEpoch x$8) {
        return BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$waitUntilQuorumLeaderElected$2(x$8));
    }

    public class TestFetcherThread
    implements Logging {
        private final BlockingSend sender;
        private Logger logger;
        private String logIdent;
        private volatile boolean bitmap$0;
        public final /* synthetic */ LeaderEpochIntegrationTest $outer;

        public String loggerName() {
            return Logging.loggerName$((Logging)this);
        }

        public String msgWithLogIdent(String msg) {
            return Logging.msgWithLogIdent$((Logging)this, (String)msg);
        }

        public void trace(Function0<String> msg) {
            Logging.trace$((Logging)this, msg);
        }

        public void trace(Function0<String> msg, Function0<Throwable> e) {
            Logging.trace$((Logging)this, msg, e);
        }

        public boolean isDebugEnabled() {
            return Logging.isDebugEnabled$((Logging)this);
        }

        public boolean isTraceEnabled() {
            return Logging.isTraceEnabled$((Logging)this);
        }

        public void debug(Function0<String> msg) {
            Logging.debug$((Logging)this, msg);
        }

        public void debug(Function0<String> msg, Function0<Throwable> e) {
            Logging.debug$((Logging)this, msg, e);
        }

        public void info(Function0<String> msg) {
            Logging.info$((Logging)this, msg);
        }

        public void info(Function0<String> msg, Function0<Throwable> e) {
            Logging.info$((Logging)this, msg, e);
        }

        public void warn(Function0<String> msg) {
            Logging.warn$((Logging)this, msg);
        }

        public void warn(Function0<String> msg, Function0<Throwable> e) {
            Logging.warn$((Logging)this, msg, e);
        }

        public void error(Function0<String> msg) {
            Logging.error$((Logging)this, msg);
        }

        public void error(Function0<String> msg, Function0<Throwable> e) {
            Logging.error$((Logging)this, msg, e);
        }

        public void fatal(Function0<String> msg) {
            Logging.fatal$((Logging)this, msg);
        }

        public void fatal(Function0<String> msg, Function0<Throwable> e) {
            Logging.fatal$((Logging)this, msg, e);
        }

        private Logger logger$lzycompute() {
            synchronized (this) {
                if (!this.bitmap$0) {
                    this.logger = Logging.logger$((Logging)this);
                    this.bitmap$0 = true;
                }
            }
            return this.logger;
        }

        public Logger logger() {
            if (!this.bitmap$0) {
                return this.logger$lzycompute();
            }
            return this.logger;
        }

        public String logIdent() {
            return this.logIdent;
        }

        public void logIdent_$eq(String x$1) {
            this.logIdent = x$1;
        }

        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> leaderOffsetsFor(Map<TopicPartition, Object> partitions) {
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection topics = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection(partitions.size());
            partitions.foreachEntry((Function2 & Serializable)(topicPartition, leaderEpoch) -> BoxesRunTime.boxToBoolean((boolean)TestFetcherThread.$anonfun$leaderOffsetsFor$1(topics, topicPartition, BoxesRunTime.unboxToInt((Object)leaderEpoch))));
            OffsetsForLeaderEpochRequest.Builder request = OffsetsForLeaderEpochRequest.Builder.forFollower((OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection)topics, (int)1);
            ClientResponse response = this.sender.sendRequest((AbstractRequest.Builder)request);
            return ((IterableOnceOps)CollectionConverters$.MODULE$.CollectionHasAsScala((Collection)((OffsetsForLeaderEpochResponse)response.responseBody()).data().topics()).asScala().flatMap((Function1 & Serializable)topic -> (Buffer)CollectionConverters$.MODULE$.ListHasAsScala(topic.partitions()).asScala().map((Function1 & Serializable)partition -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic.topic(), partition.partition())), partition)))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        }

        public void close() {
            this.sender.close();
        }

        public /* synthetic */ LeaderEpochIntegrationTest kafka$server$epoch$LeaderEpochIntegrationTest$TestFetcherThread$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ boolean $anonfun$leaderOffsetsFor$1(OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection topics$1, TopicPartition topicPartition, int leaderEpoch) {
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic = topics$1.find(topicPartition.topic());
            if (topic == null) {
                topic = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic().setTopic(topicPartition.topic());
                topics$1.add((ImplicitLinkedHashCollection.Element)topic);
            }
            return topic.partitions().add(new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setLeaderEpoch(leaderEpoch));
        }

        public TestFetcherThread(LeaderEpochIntegrationTest $outer, BlockingSend sender) {
            this.sender = sender;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
        }
    }
}

