/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.MethodForwardingTestUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class CheckpointStreamWithResultProviderTest {
    @TempDir
    private Path temporaryFolder;

    CheckpointStreamWithResultProviderTest() {
    }

    @Test
    void testFactory() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        try (CheckpointStreamWithResultProvider primaryOnly = CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory);){
            Assertions.assertThat((Object)primaryOnly).isInstanceOf(CheckpointStreamWithResultProvider.PrimaryStreamOnly.class);
        }
        LocalSnapshotDirectoryProvider directoryProvider = this.createLocalRecoveryDirectoryProvider();
        try (CheckpointStreamWithResultProvider primaryAndSecondary = CheckpointStreamWithResultProvider.createDuplicatingStream((long)42L, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory, (LocalSnapshotDirectoryProvider)directoryProvider);){
            Assertions.assertThat((Object)primaryAndSecondary).isInstanceOf(CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream.class);
        }
    }

    @Test
    void testCloseAndFinalizeCheckpointStreamResultPrimaryOnly() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        CheckpointStreamWithResultProvider resultProvider = CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory);
        SnapshotResult<StreamStateHandle> result = this.writeCheckpointTestData(resultProvider);
        Assertions.assertThat((Object)result.getJobManagerOwnedSnapshot()).isNotNull();
        Assertions.assertThat((Object)result.getTaskLocalSnapshot()).isNull();
        try (FSDataInputStream inputStream = ((StreamStateHandle)result.getJobManagerOwnedSnapshot()).openInputStream();){
            Assertions.assertThat((int)inputStream.read()).isEqualTo(66);
            Assertions.assertThat((int)inputStream.read()).isEqualTo(-1);
        }
    }

    @Test
    void testCloseAndFinalizeCheckpointStreamResultPrimaryAndSecondary() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        LocalSnapshotDirectoryProvider directoryProvider = this.createLocalRecoveryDirectoryProvider();
        CheckpointStreamWithResultProvider resultProvider = CheckpointStreamWithResultProvider.createDuplicatingStream((long)42L, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory, (LocalSnapshotDirectoryProvider)directoryProvider);
        SnapshotResult<StreamStateHandle> result = this.writeCheckpointTestData(resultProvider);
        Assertions.assertThat((Object)result.getJobManagerOwnedSnapshot()).isNotNull();
        Assertions.assertThat((Object)result.getTaskLocalSnapshot()).isNotNull();
        try (FSDataInputStream inputStream = ((StreamStateHandle)result.getJobManagerOwnedSnapshot()).openInputStream();){
            Assertions.assertThat((int)inputStream.read()).isEqualTo(66);
            Assertions.assertThat((int)inputStream.read()).isEqualTo(-1);
        }
        inputStream = ((StreamStateHandle)result.getTaskLocalSnapshot()).openInputStream();
        var6_6 = null;
        try {
            Assertions.assertThat((int)inputStream.read()).isEqualTo(66);
            Assertions.assertThat((int)inputStream.read()).isEqualTo(-1);
        }
        catch (Throwable throwable) {
            var6_6 = throwable;
            throw throwable;
        }
        finally {
            if (inputStream != null) {
                if (var6_6 != null) {
                    try {
                        inputStream.close();
                    }
                    catch (Throwable throwable) {
                        var6_6.addSuppressed(throwable);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
    }

    @Test
    void testCompletedAndCloseStateHandling() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        this.testCloseBeforeComplete((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        this.testCompleteBeforeClose((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        this.testCloseBeforeComplete((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        this.testCompleteBeforeClose((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
    }

    @Test
    void testCloseMethodForwarding() throws Exception {
        CheckpointStreamFactory streamFactory = this.createCheckpointStreamFactory();
        MethodForwardingTestUtil.testMethodForwarding(Closeable.class, CheckpointStreamWithResultProvider.PrimaryStreamOnly::new, () -> {
            try {
                return streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        MethodForwardingTestUtil.testMethodForwarding(Closeable.class, CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream::new, () -> {
            try {
                return new DuplicatingCheckpointOutputStream(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private SnapshotResult<StreamStateHandle> writeCheckpointTestData(CheckpointStreamWithResultProvider resultProvider) throws IOException {
        CheckpointStateOutputStream checkpointOutputStream = resultProvider.getCheckpointOutputStream();
        checkpointOutputStream.write(66);
        return resultProvider.closeAndFinalizeCheckpointStreamResult();
    }

    private CheckpointStreamFactory createCheckpointStreamFactory() {
        return new MemCheckpointStreamFactory(16384);
    }

    private void testCloseBeforeComplete(CheckpointStreamWithResultProvider resultProvider) throws IOException {
        resultProvider.getCheckpointOutputStream().write(66);
        resultProvider.close();
        Assertions.assertThatThrownBy(() -> ((CheckpointStreamWithResultProvider)resultProvider).closeAndFinalizeCheckpointStreamResult()).isInstanceOf(IOException.class);
    }

    private void testCompleteBeforeClose(CheckpointStreamWithResultProvider resultProvider) throws IOException {
        resultProvider.getCheckpointOutputStream().write(66);
        Assertions.assertThat((Object)resultProvider.closeAndFinalizeCheckpointStreamResult()).isNotNull();
        resultProvider.close();
    }

    private LocalSnapshotDirectoryProvider createLocalRecoveryDirectoryProvider() throws IOException {
        File localStateDir = TempDirUtils.newFolder((Path)this.temporaryFolder);
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        int subtaskIdx = 0;
        return new LocalSnapshotDirectoryProviderImpl(localStateDir, jobID, jobVertexID, subtaskIdx);
    }
}

