/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import kafka.server.Defaults$;
import kafka.tier.TierTopicManagerCommitter;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.topic.TierTopicManagerConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005m2AAC\u0006\u0001!!)q\u0003\u0001C\u00011!)1\u0004\u0001C\u00019!)Q\u0006\u0001C\u00019!)q\u0006\u0001C\u00019!)\u0011\u0007\u0001C\u00019!)1\u0007\u0001C\u00019!)Q\u0007\u0001C\u00019!)q\u0007\u0001C\u00019!)\u0011\b\u0001C\u00019\tiB+[3s)>\u0004\u0018nY'b]\u0006<WM]\"p[6LG\u000f^3s)\u0016\u001cHO\u0003\u0002\r\u001b\u0005!A/[3s\u0015\u0005q\u0011!B6bM.\f7\u0001A\n\u0003\u0001E\u0001\"AE\u000b\u000e\u0003MQ\u0011\u0001F\u0001\u0006g\u000e\fG.Y\u0005\u0003-M\u0011a!\u00118z%\u00164\u0017A\u0002\u001fj]&$h\bF\u0001\u001a!\tQ\u0002!D\u0001\f\u0003I)\u0017M\u001d7jKN$xJ\u001a4tKR$Vm\u001d;\u0015\u0003u\u0001\"A\u0005\u0010\n\u0005}\u0019\"\u0001B+oSRD#AA\u0011\u0011\u0005\tZS\"A\u0012\u000b\u0005\u0011*\u0013aA1qS*\u0011aeJ\u0001\bUV\u0004\u0018\u000e^3s\u0015\tA\u0013&A\u0003kk:LGOC\u0001+\u0003\ry'oZ\u0005\u0003Y\r\u0012A\u0001V3ti\u0006\u0019rN\u001a4tKRLen\u00148f\u001d>$x\n\u001e5fe\"\u00121!I\u0001\u0015_\u001a47/\u001a;t\u000b6\u0004H/_%o\u001f:,G)\u001b:)\u0005\u0011\t\u0013!D<sSR,'+Z1e)\u0016\u001cH\u000f\u000b\u0002\u0006C\u0005\tSO\\:vaB|'\u000f^3e-\u0016\u00148/[8o%\u0016\u001cX\r^:Q_NLG/[8og\"\u0012a!I\u0001\u001eS:4\u0018\r\\5e-\u0016\u00148/[8o%\u0016\u001cX\r^:Q_NLG/[8og\"\u0012q!I\u0001\u001cS:4\u0018\r\\5e\u001f\u001a47/\u001a;t\u0019&tW\rU8tSRLwN\\:)\u0005!\t\u0013A\u0005;fgR,\u0006\u000fZ1uKB{7/\u001b;j_:D#!C\u0011")
public class TierTopicManagerCommitterTest {
    @Test
    public void earliestOffsetTest() {
        Map<Integer, OffsetAndEpoch> positions1 = Collections.singletonMap(3, new OffsetAndEpoch(5L, Optional.of(Predef$.MODULE$.int2Integer(3))));
        Map<Integer, OffsetAndEpoch> positions2 = Collections.singletonMap(3, new OffsetAndEpoch(2L, Optional.of(Predef$.MODULE$.int2Integer(1))));
        Assertions.assertEquals((Object)new OffsetAndEpoch(2L, Optional.of(Predef$.MODULE$.int2Integer(1))), TierTopicManagerCommitter.earliestOffsets(Arrays.asList(positions1, positions2)).get(BoxesRunTime.boxToInteger((int)3)));
        Assertions.assertEquals((Object)new OffsetAndEpoch(2L, Optional.of(Predef$.MODULE$.int2Integer(1))), TierTopicManagerCommitter.earliestOffsets(Arrays.asList(positions2, positions1)).get(BoxesRunTime.boxToInteger((int)3)));
    }

    @Test
    public void offsetInOneNotOther() {
        Map<Integer, OffsetAndEpoch> positions1 = Collections.singletonMap(2, new OffsetAndEpoch(5L, Optional.of(Predef$.MODULE$.int2Integer(2))));
        Map<Integer, OffsetAndEpoch> positions2 = Collections.singletonMap(3, new OffsetAndEpoch(5L, Optional.of(Predef$.MODULE$.int2Integer(3))));
        Assertions.assertTrue((boolean)TierTopicManagerCommitter.earliestOffsets(Arrays.asList(positions1, positions2)).isEmpty(), (String)"Overall offset positions not reset, even though positions were missing.");
    }

    @Test
    public void offsetsEmptyInOneDir() {
        Map positions1 = Collections.emptyMap();
        Map<Integer, OffsetAndEpoch> positions2 = Collections.singletonMap(3, new OffsetAndEpoch(5L, Optional.empty()));
        Assertions.assertTrue((boolean)TierTopicManagerCommitter.earliestOffsets(Arrays.asList(positions1, positions2)).isEmpty(), (String)"Overall offset positions not reset, even though positions were missing.");
    }

    @Test
    public void writeReadTest() {
        String logDir = new StringBuilder(1).append(System.getProperty("java.io.tmpdir")).append("/").append(UUID.randomUUID().toString()).toString();
        new File(logDir).mkdir();
        short numPartitions = 6;
        TierTopicManagerConfig tierTopicManagerConfig = new TierTopicManagerConfig(() -> Collections.singletonMap("bootstrap.servers", "bootstrap"), null, numPartitions, 1, 33, "cluster99", Predef$.MODULE$.long2Long(200L), Predef$.MODULE$.int2Integer(500), Predef$.MODULE$.int2Integer(500), Collections.singletonList(logDir), Predef$.MODULE$.boolean2Boolean(Defaults$.MODULE$.TierTopicProducerEnableIdempotence()), Predef$.MODULE$.boolean2Boolean(Defaults$.MODULE$.TierTopicDataLossDetectionEnable()), Predef$.MODULE$.boolean2Boolean(Defaults$.MODULE$.TierTopicFencingDuringDataLossEnable()));
        TierTopicManagerCommitter committer = new TierTopicManagerCommitter(tierTopicManagerConfig, (LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class));
        committer.updatePosition(3, new OffsetAndEpoch(1L, Optional.of(Predef$.MODULE$.int2Integer(1))));
        committer.updatePosition(5, new OffsetAndEpoch(4L, Optional.of(Predef$.MODULE$.int2Integer(2))));
        committer.updatePosition(5, new OffsetAndEpoch(5L, Optional.empty()));
        committer.writePositionsSnapshot(committer.takePositionsSnapshot());
        TierTopicManagerCommitter committer2 = new TierTopicManagerCommitter(tierTopicManagerConfig, (LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class));
        Option[] expectedPositions = new Option[]{None$.MODULE$, None$.MODULE$, None$.MODULE$, new Some((Object)new OffsetAndEpoch(1L, Optional.of(Predef$.MODULE$.int2Integer(1)))), None$.MODULE$, new Some((Object)new OffsetAndEpoch(5L, Optional.empty()))};
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)numPartitions).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)partitionId -> Assertions.assertEquals((Object)expectedPositions[partitionId].getOrElse((Function0 & Serializable)() -> null), (Object)committer2.positionFor(partitionId)));
    }

    @Test
    public void unsupportedVersionResetsPositions() {
        File testDir = TestUtils.tempDirectory(null, null);
        File file = new File(new StringBuilder(13).append(testDir.getAbsolutePath()).append("/tier.offsets").toString());
        try (FileWriter fileWriter = new FileWriter(file);){
            BufferedWriter write = new BufferedWriter(fileWriter);
            try {
                write.write(Integer.toString(TierTopicManagerCommitter.CURRENT_VERSION.version + 1));
                write.newLine();
                write.write("0 3");
                write.newLine();
            }
            finally {
                write.flush();
                write.close();
            }
        }
        Assertions.assertTrue((boolean)TierTopicManagerCommitter.committed((String)testDir.getAbsolutePath(), (LogDirFailureChannel)((LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class))).isEmpty());
    }

    @Test
    public void invalidVersionResetsPositions() {
        File testDir = TestUtils.tempDirectory(null, null);
        File file = new File(new StringBuilder(13).append(testDir.getAbsolutePath()).append("/tier.offsets").toString());
        try (FileWriter fileWriter = new FileWriter(file);){
            BufferedWriter write = new BufferedWriter(fileWriter);
            try {
                write.write("aaa");
                write.newLine();
                write.write("0 3");
                write.newLine();
            }
            finally {
                write.flush();
                write.close();
            }
        }
        Assertions.assertTrue((boolean)TierTopicManagerCommitter.committed((String)testDir.getAbsolutePath(), (LogDirFailureChannel)((LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class))).isEmpty());
    }

    @Test
    public void invalidOffsetsLinePositions() {
        File testDir = TestUtils.tempDirectory(null, null);
        File file = new File(new StringBuilder(13).append(testDir.getAbsolutePath()).append("/tier.offsets").toString());
        try (FileWriter fileWriter = new FileWriter(file);){
            BufferedWriter write = new BufferedWriter(fileWriter);
            try {
                write.write("0");
                write.newLine();
                write.write("3");
                write.newLine();
                write.write("0 5");
            }
            finally {
                write.flush();
                write.close();
            }
        }
        Assertions.assertTrue((boolean)TierTopicManagerCommitter.committed((String)testDir.getAbsolutePath(), (LogDirFailureChannel)((LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class))).isEmpty());
    }

    @Test
    public void testUpdatePosition() {
        String logDir = TestUtils.tempDirectory(null, null).getAbsolutePath();
        new File(logDir).mkdir();
        short numPartitions = 6;
        TierTopicManagerConfig tierTopicManagerConfig = new TierTopicManagerConfig(() -> Collections.singletonMap("bootstrap.servers", "bootstrap"), null, numPartitions, 1, 33, "cluster99", Predef$.MODULE$.long2Long(200L), Predef$.MODULE$.int2Integer(500), Predef$.MODULE$.int2Integer(500), Collections.singletonList(logDir), Predef$.MODULE$.boolean2Boolean(Defaults$.MODULE$.TierTopicProducerEnableIdempotence()), Predef$.MODULE$.boolean2Boolean(Defaults$.MODULE$.TierTopicDataLossDetectionEnable()), Predef$.MODULE$.boolean2Boolean(Defaults$.MODULE$.TierTopicFencingDuringDataLossEnable()));
        int partitionId = 10;
        TierTopicManagerCommitter committer = new TierTopicManagerCommitter(tierTopicManagerConfig, (LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class));
        committer.updatePosition(partitionId, new OffsetAndEpoch(100L, Optional.of(Predef$.MODULE$.int2Integer(5))));
        committer.updatePosition(partitionId, new OffsetAndEpoch(200L, Optional.of(Predef$.MODULE$.int2Integer(5))));
        Assertions.assertEquals((Object)new OffsetAndEpoch(200L, Optional.of(Predef$.MODULE$.int2Integer(5))), (Object)committer.positionFor(partitionId));
        committer.updatePosition(partitionId, new OffsetAndEpoch(350L, Optional.of(Predef$.MODULE$.int2Integer(7))));
        Assertions.assertEquals((Object)new OffsetAndEpoch(350L, Optional.of(Predef$.MODULE$.int2Integer(7))), (Object)committer.positionFor(partitionId));
        committer.updatePosition(partitionId, new OffsetAndEpoch(375L, Optional.empty()));
        Assertions.assertEquals((Object)new OffsetAndEpoch(375L, Optional.empty()), (Object)committer.positionFor(partitionId));
        committer.updatePosition(partitionId, new OffsetAndEpoch(400L, Optional.of(Predef$.MODULE$.int2Integer(10))));
        Assertions.assertEquals((Object)new OffsetAndEpoch(400L, Optional.of(Predef$.MODULE$.int2Integer(10))), (Object)committer.positionFor(partitionId));
        Assertions$.MODULE$.assertThrows((Function0)(JFunction0.mcV.sp & Serializable)() -> committer.updatePosition(partitionId, new OffsetAndEpoch(400L, Optional.of(Predef$.MODULE$.int2Integer(10)))), ClassTag$.MODULE$.apply(IllegalStateException.class), new Position("TierTopicManagerCommitterTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 191));
        Assertions$.MODULE$.assertThrows((Function0)(JFunction0.mcV.sp & Serializable)() -> committer.updatePosition(partitionId, new OffsetAndEpoch(399L, Optional.of(Predef$.MODULE$.int2Integer(10)))), ClassTag$.MODULE$.apply(IllegalStateException.class), new Position("TierTopicManagerCommitterTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 194));
        Assertions$.MODULE$.assertThrows((Function0)(JFunction0.mcV.sp & Serializable)() -> committer.updatePosition(partitionId, new OffsetAndEpoch(500L, Optional.of(Predef$.MODULE$.int2Integer(9)))), ClassTag$.MODULE$.apply(IllegalStateException.class), new Position("TierTopicManagerCommitterTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 199));
        Assertions.assertEquals((Object)new OffsetAndEpoch(400L, Optional.of(Predef$.MODULE$.int2Integer(10))), (Object)committer.positionFor(partitionId));
    }
}

