/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.cluster.Replica;
import kafka.log.LogManager;
import kafka.server.BlockingSend;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsManager;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicationQuotaManager;
import kafka.server.epoch.LeaderEpochCache;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001}4A!\u0001\u0002\u0001\u000f\tA\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$Vm\u001d;\u000b\u0005\r!\u0011AB:feZ,'OC\u0001\u0006\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u0005\u0011\u0005%aQ\"\u0001\u0006\u000b\u0003-\tQa]2bY\u0006L!!\u0004\u0006\u0003\r\u0005s\u0017PU3g\u0011\u0015y\u0001\u0001\"\u0001\u0011\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0003\u0005\u0002\u0013\u00015\t!\u0001C\u0004\u0015\u0001\t\u0007I\u0011B\u000b\u0002\tQ\f\u0004\u000fM\u000b\u0002-A\u0011qcH\u0007\u00021)\u0011\u0011DG\u0001\u0007G>lWn\u001c8\u000b\u0005\u0015Y\"B\u0001\u000f\u001e\u0003\u0019\t\u0007/Y2iK*\ta$A\u0002pe\u001eL!\u0001\t\r\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\"1!\u0005\u0001Q\u0001\nY\tQ\u0001^\u0019qa\u0001Bq\u0001\n\u0001C\u0002\u0013%Q#\u0001\u0003ucA\f\u0004B\u0002\u0014\u0001A\u0003%a#A\u0003ucA\f\u0004\u0005C\u0004)\u0001\t\u0007I\u0011B\u000b\u0002\tQ\u0014\u0004/\r\u0005\u0007U\u0001\u0001\u000b\u0011\u0002\f\u0002\u000bQ\u0014\u0004/\r\u0011\t\u000f1\u0002!\u0019!C\u0005[\u0005q!M]8lKJ,e\u000e\u001a)pS:$X#\u0001\u0018\u0011\u0005=\u0012T\"\u0001\u0019\u000b\u0005E\"\u0011aB2mkN$XM]\u0005\u0003gA\u0012aB\u0011:pW\u0016\u0014XI\u001c3Q_&tG\u000f\u0003\u00046\u0001\u0001\u0006IAL\u0001\u0010EJ|7.\u001a:F]\u0012\u0004v.\u001b8uA!)q\u0007\u0001C\u0001q\u0005A3\u000f[8vY\u0012\u001cVM\u001c3MCR,7\u000f\u001e*fcV,7\u000f\u001e,feNLwN\\:Cs\u0012+g-Y;miR\t\u0011\b\u0005\u0002\nu%\u00111H\u0003\u0002\u0005+:LG\u000f\u000b\u00027{A\u0011a(Q\u0007\u0002\u007f)\u0011\u0001)H\u0001\u0006UVt\u0017\u000e^\u0005\u0003\u0005~\u0012A\u0001V3ti\")A\t\u0001C\u0001q\u0005Y4\u000f[8vY\u0012tu\u000e^%tgV,G*Z1eKJ,\u0005o\\2i%\u0016\fX/Z:u\u0013\u001aLe\u000e^3sEJ|7.\u001a:WKJ\u001c\u0018n\u001c8CK2|w/M\u0019)\u0005\rk\u0004\"B$\u0001\t\u0003A\u0014!J:i_VdG\rS1oI2,W\t_2faRLwN\u001c$s_6\u0014En\\2lS:<7+\u001a8eQ\t1U\bC\u0003K\u0001\u0011\u0005\u0001(\u0001 tQ>,H\u000e\u001a$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[(oYfLe\rT3bI\u0016\u0014X\t]8dQ.swn\u001e8U_\n{G\u000f\u001b\u0015\u0003\u0013vBQ!\u0014\u0001\u0005\u0002a\nAg\u001d5pk2$GK];oG\u0006$X\rV8PM\u001a\u001cX\r^*qK\u000eLg-[3e\u0013:,\u0005o\\2i\u001f\u001a47/\u001a;SKN\u0004xN\\:fQ\taU\bC\u0003Q\u0001\u0011\u0005\u0001(A'tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_>3gm]3u'B,7-\u001b4jK\u0012Le.\u00129pG\"|eMZ:fiJ+7\u000f]8og\u0016LeMR8mY><XM\u001d%bg:{Wj\u001c:f\u000bB|7\r[:)\u0005=k\u0004\"B*\u0001\t\u0003A\u0014AS:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195TK\u000e|g\u000e\u001a+j[\u0016Le\rT3bI\u0016\u0014(+\u001a9mS\u0016\u001cx+\u001b;i\u000bB|7\r\u001b(pi.swn\u001e8U_\u001a{G\u000e\\8xKJD#AU\u001f\t\u000bY\u0003A\u0011\u0001\u001d\u0002gMDw.\u001e7e+N,G*Z1eKJ,e\u000eZ(gMN,G/\u00134J]R,'O\u0011:pW\u0016\u0014h+\u001a:tS>t')\u001a7poJ\u0002\u0004FA+>\u0011\u0015I\u0006\u0001\"\u00019\u0003\u0001\u001b\bn\\;mIR\u0013XO\\2bi\u0016$v.\u00138ji&\fGNR3uG\"|eMZ:fi&3G*Z1eKJ\u0014V\r^;s]N,f\u000eZ3gS:,Gm\u00144gg\u0016$\bF\u0001->\u0011\u0015a\u0006\u0001\"\u00019\u0003E\u001a\bn\\;mIB{G\u000e\\%oI\u00164\u0017N\\5uK2L\u0018J\u001a'fC\u0012,'OU3ukJt7/\u00118z\u000bb\u001cW\r\u001d;j_:D#aW\u001f\t\u000b}\u0003A\u0011\u0001\u001d\u0002WMDw.\u001e7e\u001b>4X\rU1si&$\u0018n\u001c8t\u001fV$xJ\u001a+sk:\u001c\u0017\r^5oO2{wm\u0015;bi\u0016D#AX\u001f\t\u000b\t\u0004A\u0011\u0001\u001d\u0002qMDw.\u001e7e\r&dG/\u001a:QCJ$\u0018\u000e^5p]Nl\u0015\rZ3MK\u0006$WM\u001d#ve&tw\rT3bI\u0016\u0014X\t]8dQJ+\u0017/^3ti\"\u0012\u0011-\u0010\u0005\u0006K\u0002!\tAZ\u0001\u0005gR,(\r\u0006\u0003hgbT\bc\u00015l[6\t\u0011N\u0003\u0002k;\u0005AQ-Y:z[>\u001c7.\u0003\u0002mS\n\u0019\u0012*\u0012=qK\u000e$\u0018\r^5p]N+G\u000f^3sgB\u0019\u0011B\u001c9\n\u0005=T!AB(qi&|g\u000e\u0005\u00020c&\u0011!\u000f\r\u0002\n!\u0006\u0014H/\u001b;j_:DQ\u0001\u001e3A\u0002U\fqA]3qY&\u001c\u0017\r\u0005\u00020m&\u0011q\u000f\r\u0002\b%\u0016\u0004H.[2b\u0011\u0015IH\r1\u0001q\u0003%\u0001\u0018M\u001d;ji&|g\u000eC\u0003|I\u0002\u0007A0\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\u0011\u0005Ii\u0018B\u0001@\u0003\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJ\u0004")
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, null, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)None$.MODULE$);
        Assert.assertEquals((long)ApiKeys.FETCH.latestVersion(), (long)thread.fetchRequestVersion());
        Assert.assertEquals((long)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (long)thread.offsetForLeaderEpochRequestVersion());
        Assert.assertEquals((long)ApiKeys.LIST_OFFSETS.latestVersion(), (long)thread.listOffsetRequestVersion());
    }

    @Test
    public void shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.10.2");
        props.put(KafkaConfig$.MODULE$.LogMessageFormatVersionProp(), "0.10.2");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, null, new Metrics(), (Time)new SystemTime(), null, (Option)None$.MODULE$);
        Map result = thread.fetchEpochsFromLeader((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToInteger((int)0))})));
        Map expected = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NONE, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.NONE, -1, -1L))}));
        Assert.assertEquals((String)"results from leader epoch request should have undefined offset", (Object)expected, (Object)result);
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        EasyMock.expect((Object)mockBlockingSend.sendRequest((AbstractRequest.Builder)EasyMock.anyObject())).andThrow((Throwable)new NullPointerException()).once();
        EasyMock.replay((Object[])new Object[]{mockBlockingSend});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, null, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend));
        Map result = thread.fetchEpochsFromLeader((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToInteger((int)0))})));
        Map expected = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}));
        Assert.assertEquals((String)"results from leader epoch request should have undefined offset", (Object)expected, (Object)result);
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)leaderEpoch));
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((Object)new Tuple2.mcIJ.sp(leaderEpoch, 0L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, replica});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)3L, (long)mockNetwork.fetchCount());
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLEO, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 1), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)).anyTimes();
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((Object)new Tuple2.mcIJ.sp(leaderEpoch, (long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new EpochEndOffset(leaderEpoch, 172L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 156 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t2p1()).append((Object)" to truncate to offset 172 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)172)));
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochAtFollower = 5;
        int leaderEpochAtLeader = 4;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLEO, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 3), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)leaderEpochAtFollower)).anyTimes();
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(leaderEpochAtLeader)).andReturn((Object)new Tuple2.mcIJ.sp(-1, -1L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpochAtLeader, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new EpochEndOffset(leaderEpochAtLeader, 202L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 156 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t2p1()).append((Object)" to truncate to offset ").append((Object)BoxesRunTime.boxToInteger((int)initialLEO)).append((Object)" (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)initialLEO)));
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLEO, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 2), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)5));
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(4)).andReturn((Object)new Tuple2.mcIJ.sp(3, 120L)).anyTimes();
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(3)).andReturn((Object)new Tuple2.mcIJ.sp(3, 120L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(4, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(4, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)0L, (long)mockNetwork.fetchCount());
        java.util.Map nextOffsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(3, 101L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(3, 102L))}))).asJava();
        mockNetwork.setOffsetsForNextResponse(nextOffsets);
        thread.doWork();
        Assert.assertEquals((long)2L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        Assert.assertEquals((String)"OffsetsForLeaderEpochRequest version.", (long)1L, (long)mockNetwork.lastUsedOffsetForLeaderEpochVersion());
        thread.doWork();
        Assert.assertEquals((long)2L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p1()).append((Object)" to truncate to offset 102 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)102)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 101 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)101)));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLEO, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 2), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)5));
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(4)).andReturn((Object)new Tuple2.mcIJ.sp(3, 120L)).anyTimes();
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(3)).andReturn((Object)new Tuple2.mcIJ.sp(3, 120L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(-1, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(-1, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        Assert.assertEquals((String)"OffsetsForLeaderEpochRequest version.", (long)0L, (long)mockNetwork.lastUsedOffsetForLeaderEpochVersion());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 155 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)155)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p1()).append((Object)" to truncate to offset 143 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)143)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialFetchOffset = 100;
        int initialLeo = 300;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLeo, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)initialFetchOffset, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)5));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(-1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)initialFetchOffset))})));
        thread.doWork();
        Assert.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int highWaterMark = 100;
        int initialLeo = 300;
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)highWaterMark, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLeo, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)leaderEpoch));
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((Object)new Tuple2.mcIJ.sp(leaderEpoch, (long)initialLeo)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)new Serializable(this, thread){
            public static final long serialVersionUID = 0L;
            private final ReplicaFetcherThread thread$1;

            public final void apply(int x$1) {
                this.apply$mcVI$sp(x$1);
            }

            public void apply$mcVI$sp(int x$1) {
                this.thread$1.doWork();
            }
            {
                this.thread$1 = thread$1;
            }
        });
        Assert.assertEquals((long)0L, (long)truncated.getValues().size());
        offsetsReply.put(this.t1p0(), new EpochEndOffset(leaderEpoch, 156L));
        thread.doWork();
        Assert.assertEquals((long)156L, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        int leaderEpoch = 4;
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)leaderEpoch));
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((Object)new Tuple2.mcIJ.sp(leaderEpoch, 0L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        Assert.assertTrue((boolean)((IterableLike)JavaConverters$.MODULE$.asScalaBufferConverter(thread.partitionStates().partitionStates()).asScala()).forall((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(PartitionStates.PartitionState<PartitionFetchState> x$2) {
                return ((PartitionFetchState)x$2.value()).truncatingLog();
            }
        }));
        thread.doWork();
        Assert.assertFalse((boolean)((IterableLike)JavaConverters$.MODULE$.asScalaBufferConverter(thread.partitionStates().partitionStates()).asScala()).forall((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(PartitionStates.PartitionState<PartitionFetchState> x$3) {
                return ((PartitionFetchState)x$3.value()).truncatingLog();
            }
        }));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        int initialLEO = 100;
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochs = (LeaderEpochCache)EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)replica.epochs()).andReturn((Object)new Some((Object)leaderEpochs)).anyTimes();
        EasyMock.expect((Object)replica.logEndOffset()).andReturn((Object)new LogOffsetMetadata((long)initialLEO, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 2), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)leaderEpochs.latestEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)5));
        EasyMock.expect((Object)leaderEpochs.endOffsetFor(5)).andReturn((Object)new Tuple2.mcIJ.sp(5, (long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{leaderEpochs, replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(5, 52L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(5, 49L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToLong((long)0L))})));
        TopicPartition partitionThatBecameLeader = this.t1p0();
        mockNetwork.setEpochRequestCallback((Function0<BoxedUnit>)new Serializable(this, thread, partitionThatBecameLeader){
            public static final long serialVersionUID = 0L;
            private final ReplicaFetcherThread thread$2;
            private final TopicPartition partitionThatBecameLeader$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                this.thread$2.removePartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.partitionThatBecameLeader$1})));
            }
            {
                this.thread$2 = thread$2;
                this.partitionThatBecameLeader$1 = partitionThatBecameLeader$1;
            }
        });
        thread.doWork();
        Assert.assertEquals((long)49L, (long)BoxesRunTime.unboxToLong((Object)truncateToCapture.getValue()));
    }

    public IExpectationSetters<Option<Partition>> stub(Replica replica, Partition partition, ReplicaManager replicaManager) {
        EasyMock.expect((Object)replicaManager.getReplica(this.t1p0())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.getReplicaOrException(this.t1p0())).andReturn((Object)replica).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartition(this.t1p0())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.getReplica(this.t1p1())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.getReplicaOrException(this.t1p1())).andReturn((Object)replica).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartition(this.t1p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.getReplica(this.t2p1())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.getReplicaOrException(this.t2p1())).andReturn((Object)replica).anyTimes();
        return EasyMock.expect((Object)replicaManager.getPartition(this.t2p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
    }
}

