/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public abstract class FileMergingSnapshotManagerTestBase {
    final String tmId = "Testing";
    final JobID jobID = new JobID();
    final OperatorID operatorID = new OperatorID(289347923L, 75893479L);
    FileMergingSnapshotManager.SubtaskKey subtaskKey1;
    FileMergingSnapshotManager.SubtaskKey subtaskKey2;
    Path checkpointBaseDir;
    Path sharedStateDir;
    Path taskOwnedStateDir;
    int writeBufferSize;

    abstract FileMergingType getFileMergingType();

    @BeforeEach
    public void setup(@TempDir java.nio.file.Path tempFolder) {
        this.subtaskKey1 = new FileMergingSnapshotManager.SubtaskKey(this.jobID, this.operatorID, (TaskInfo)new TaskInfoImpl("TestingTask", 128, 0, 128, 3));
        this.subtaskKey2 = new FileMergingSnapshotManager.SubtaskKey(this.jobID, this.operatorID, (TaskInfo)new TaskInfoImpl("TestingTask", 128, 1, 128, 3));
        this.checkpointBaseDir = new Path(tempFolder.toString(), this.jobID.toHexString());
        this.sharedStateDir = new Path(this.checkpointBaseDir, "shared");
        this.taskOwnedStateDir = new Path(this.checkpointBaseDir, "taskowned");
        this.writeBufferSize = 4096;
    }

    @Test
    void testCreateFileMergingSnapshotManager() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            String expectManagerId = String.format("job_%s_tm_%s", this.jobID, "Testing");
            Assertions.assertThat((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE)).isEqualTo((Object)new Path(this.taskOwnedStateDir, expectManagerId));
            Assertions.assertThat((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED)).isEqualTo((Object)new Path(this.sharedStateDir, this.subtaskKey1.getManagedDirName()));
        }
    }

    @Test
    public void testSpecialCharactersInPath() throws IOException {
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        if (!fs.exists(this.checkpointBaseDir)) {
            fs.mkdirs(this.checkpointBaseDir);
            fs.mkdirs(this.sharedStateDir);
            fs.mkdirs(this.taskOwnedStateDir);
        }
        try (FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder(this.jobID, new ResourceID("localhost:53424-,;:$&+=?/[]@#qqq"), this.getFileMergingType()).setMetricGroup((TaskManagerJobMetricGroup)new UnregisteredMetricGroups.UnregisteredTaskManagerJobMetricGroup()).build();){
            fmsm.initFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), this.checkpointBaseDir, this.sharedStateDir, this.taskOwnedStateDir, this.writeBufferSize);
            Assertions.assertThat((Object)fmsm).isNotNull();
            fmsm.registerSubtaskForSharedStates(new FileMergingSnapshotManager.SubtaskKey(this.jobID.toString(), ",;:$&+=?/[]@#www", 0, 1));
        }
    }

    @Test
    void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile physicalFile1 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isTrue();
            LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 0L, 10L, this.subtaskKey1);
            Assertions.assertThat((Object)logicalFile1.getSubtaskKey()).isEqualTo((Object)this.subtaskKey1);
            Assertions.assertThat((Object)logicalFile1.getPhysicalFile()).isEqualTo((Object)physicalFile1);
            Assertions.assertThat((long)logicalFile1.getStartOffset()).isZero();
            Assertions.assertThat((long)logicalFile1.getLength()).isEqualTo(10L);
            Assertions.assertThat((int)physicalFile1.getRefCount()).isOne();
            Assertions.assertThat((boolean)logicalFile1.isDiscarded()).isFalse();
            logicalFile1.advanceLastCheckpointId(2L);
            Assertions.assertThat((long)logicalFile1.getLastUsedCheckpointID()).isEqualTo(2L);
            logicalFile1.advanceLastCheckpointId(1L);
            Assertions.assertThat((long)logicalFile1.getLastUsedCheckpointID()).isEqualTo(2L);
            logicalFile1.discardWithCheckpointId(1L);
            Assertions.assertThat((boolean)logicalFile1.isDiscarded()).isFalse();
            logicalFile1.discardWithCheckpointId(2L);
            Assertions.assertThat((boolean)logicalFile1.isDiscarded()).isTrue();
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isTrue();
            Assertions.assertThat((boolean)physicalFile1.isDeleted()).isFalse();
            Assertions.assertThat((int)physicalFile1.getRefCount()).isZero();
            physicalFile1.close();
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isFalse();
            Assertions.assertThat((boolean)physicalFile1.isDeleted()).isTrue();
            PhysicalFile physicalFile2 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile2, 0L, 10L, this.subtaskKey1);
            Assertions.assertThat((Object)logicalFile2.getPhysicalFile()).isEqualTo((Object)physicalFile2);
            Assertions.assertThat((long)logicalFile2.getStartOffset()).isZero();
            Assertions.assertThat((long)logicalFile2.getLength()).isEqualTo(10L);
            Assertions.assertThat((int)physicalFile2.getRefCount()).isOne();
            logicalFile2.advanceLastCheckpointId(2L);
            Assertions.assertThat((boolean)physicalFile2.isOpen()).isTrue();
            Assertions.assertThat((boolean)physicalFile2.isDeleted()).isFalse();
            physicalFile2.close();
            Assertions.assertThat((boolean)physicalFile2.isOpen()).isFalse();
            Assertions.assertThat((boolean)physicalFile2.isDeleted()).isFalse();
            Assertions.assertThat((int)physicalFile2.getRefCount()).isOne();
            logicalFile2.discardWithCheckpointId(2L);
            Assertions.assertThat((boolean)logicalFile2.isDiscarded()).isTrue();
            Assertions.assertThat((boolean)physicalFile2.isDeleted()).isTrue();
            Assertions.assertThat((int)physicalFile2.getRefCount()).isZero();
        }
    }

    @Test
    void testSizeStatsInPhysicalFile() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile physicalFile = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((long)physicalFile.getSize()).isZero();
            physicalFile.incSize(123L);
            Assertions.assertThat((long)physicalFile.getSize()).isEqualTo(123L);
            physicalFile.incSize(456L);
            Assertions.assertThat((long)physicalFile.getSize()).isEqualTo(579L);
        }
    }

    @Test
    void testSpaceStat() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile physicalFile1 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isTrue();
            LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 0L, 123L, this.subtaskKey1);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileSize.get()).isEqualTo(123L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileSize.get()).isEqualTo(123L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1L);
            Assertions.assertThat((long)physicalFile1.getSize()).isEqualTo(123L);
            LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile1, 0L, 456L, this.subtaskKey1);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileSize.get()).isEqualTo(579L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileSize.get()).isEqualTo(579L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)physicalFile1.getSize()).isEqualTo(579L);
            logicalFile1.discardWithCheckpointId(1L);
            fmsm.discardSingleLogicalFile(logicalFile1, 1L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileSize.get()).isEqualTo(579L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileSize.get()).isEqualTo(456L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1L);
            physicalFile1.close();
            fmsm.discardSingleLogicalFile(logicalFile2, 1L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileSize.get()).isEqualTo(0L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileSize.get()).isEqualTo(0L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(0L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(0L);
        }
    }

    @Test
    public void testReusedFileWriting() throws Exception {
        long checkpointId = 1L;
        int streamNum = 10;
        int perStreamWriteNum = 128;
        byte[] bytes = new byte[streamNum * perStreamWriteNum];
        Random rd = new Random();
        rd.nextBytes(bytes);
        int byteIndex = 0;
        SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum];
        try (FileMergingSnapshotManager fmsm = this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            for (int i = 0; i < streamNum; ++i) {
                FileMergingCheckpointStateOutputStream stream = fmsm.createCheckpointStateOutputStream(this.subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
                try {
                    closeableRegistry.registerCloseable((AutoCloseable)stream);
                    for (int j = 0; j < perStreamWriteNum; ++j) {
                        stream.write((int)bytes[byteIndex++]);
                    }
                    handles[i] = stream.closeAndGetHandle();
                    continue;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            byteIndex = 0;
            Path filePath = null;
            for (SegmentFileStateHandle handle : handles) {
                int readValue;
                Path thisFilePath = handle.getFilePath();
                Assertions.assertThat((filePath == null || filePath.equals((Object)thisFilePath) ? 1 : 0) != 0).isTrue();
                filePath = thisFilePath;
                FSDataInputStream is = handle.openInputStream();
                closeableRegistry.registerCloseable((AutoCloseable)is);
                while ((readValue = is.read()) != -1) {
                    Assertions.assertThat((byte)((byte)readValue)).isEqualTo(bytes[byteIndex++]);
                }
            }
        }
    }

    @Test
    public void testConcurrentWriting() throws Exception {
        long checkpointId = 1L;
        int numThreads = 12;
        int perStreamWriteNum = 128;
        HashSet<CompletableFuture<SegmentFileStateHandle>> futures = new HashSet<CompletableFuture<SegmentFileStateHandle>>();
        try (FileMergingSnapshotManager fmsm = this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            for (int i = 0; i < numThreads; ++i) {
                futures.add(CompletableFuture.supplyAsync(() -> {
                    FileMergingCheckpointStateOutputStream stream = fmsm.createCheckpointStateOutputStream(this.subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
                    try {
                        closeableRegistry.registerCloseable((AutoCloseable)stream);
                        for (int j = 0; j < perStreamWriteNum; ++j) {
                            stream.write(j);
                        }
                        return stream.closeAndGetHandle();
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }));
            }
            for (Future future : futures) {
                int readValue;
                SegmentFileStateHandle segmentFileStateHandle = (SegmentFileStateHandle)future.get();
                FSDataInputStream is = segmentFileStateHandle.openInputStream();
                closeableRegistry.registerCloseable((AutoCloseable)is);
                int expected = 0;
                while ((readValue = is.read()) != -1) {
                    Assertions.assertThat((int)readValue).isEqualTo(expected++);
                }
            }
        }
    }

    @Test
    public void testConcurrentFileReusingWithBlockingPool() throws Exception {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir, 32L, PhysicalFilePool.Type.BLOCKING, Float.MAX_VALUE);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            PhysicalFile file1 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file1);
            PhysicalFile file2 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file2).isEqualTo((Object)file1);
            file2.incSize(fmsm.maxPhysicalFileSize);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file2);
            PhysicalFile file3 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file3).isNotEqualTo((Object)file2);
            PhysicalFile file4 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.EXCLUSIVE);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file4);
            PhysicalFile file5 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file5).isEqualTo((Object)file4);
            file5.incSize(fmsm.maxPhysicalFileSize);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file5);
            PhysicalFile file6 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file6).isNotEqualTo((Object)file5);
        }
    }

    @Test
    public void testReuseCallbackAndAdvanceWatermark() throws Exception {
        long checkpointId = 1L;
        int streamNum = 20;
        int perStreamWriteNum = 128;
        byte[] bytes = new byte[streamNum * perStreamWriteNum];
        Random rd = new Random();
        rd.nextBytes(bytes);
        int byteIndex = 0;
        SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum];
        try (FileMergingSnapshotManager fmsm = this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            for (int i = 0; i < streamNum; ++i) {
                FileMergingCheckpointStateOutputStream stream = fmsm.createCheckpointStateOutputStream(this.subtaskKey1, checkpointId, CheckpointedStateScope.SHARED);
                try {
                    closeableRegistry.registerCloseable((AutoCloseable)stream);
                    for (int j = 0; j < perStreamWriteNum; ++j) {
                        stream.write((int)bytes[byteIndex++]);
                    }
                    handles[i] = stream.closeAndGetHandle();
                    continue;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            for (long cp = checkpointId + 1L; cp <= 10L; ++cp) {
                ArrayList<SegmentFileStateHandle> reuse = new ArrayList<SegmentFileStateHandle>();
                int j = 0;
                while ((long)j <= 10L - cp) {
                    reuse.add(handles[j]);
                    ++j;
                }
                fmsm.reusePreviousStateHandle(cp, reuse);
                for (SegmentFileStateHandle handle : reuse) {
                    Assertions.assertThat((long)((FileMergingSnapshotManagerBase)fmsm).getLogicalFile(handle.getLogicalFileId()).getLastUsedCheckpointID()).isEqualTo(cp);
                }
                fmsm.notifyCheckpointSubsumed(this.subtaskKey1, cp - 1L);
                for (j = 10 - (int)cp + 1; j < streamNum; ++j) {
                    Assertions.assertThat((Object)((FileMergingSnapshotManagerBase)fmsm).getLogicalFile(handles[j].getLogicalFileId())).isNull();
                }
            }
        }
    }

    @Test
    public void testRestore() throws Exception {
        FileMergingSnapshotManager.SpaceStat oldSpaceStat;
        TaskStateSnapshot taskStateSnapshot;
        long checkpointId = 222L;
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            CloseableRegistry closeableRegistry = new CloseableRegistry();
            Object object = null;
            try {
                fmsm.notifyCheckpointStart(this.subtaskKey1, checkpointId);
                HashMap<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID = new HashMap<OperatorID, OperatorSubtaskState>();
                subtaskStatesByOperatorID.put(this.operatorID, this.buildOperatorSubtaskState(checkpointId, (FileMergingSnapshotManager)fmsm, closeableRegistry));
                taskStateSnapshot = new TaskStateSnapshot(subtaskStatesByOperatorID);
                oldSpaceStat = fmsm.spaceStat;
                fmsm.notifyCheckpointComplete(this.subtaskKey1, checkpointId);
            }
            catch (Throwable subtaskStatesByOperatorID) {
                object = subtaskStatesByOperatorID;
                throw subtaskStatesByOperatorID;
            }
            finally {
                if (closeableRegistry != null) {
                    if (object != null) {
                        try {
                            closeableRegistry.close();
                        }
                        catch (Throwable subtaskStatesByOperatorID) {
                            ((Throwable)object).addSuppressed(subtaskStatesByOperatorID);
                        }
                    } else {
                        closeableRegistry.close();
                    }
                }
            }
        }
        Assertions.assertThat((Object)taskStateSnapshot).isNotNull();
        fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);
        var6_3 = null;
        try {
            TaskInfoImpl taskInfo = new TaskInfoImpl("test restore", 128, this.subtaskKey1.subtaskIndex, this.subtaskKey1.parallelism, 0);
            for (Map.Entry entry : taskStateSnapshot.getSubtaskStateMappings()) {
                SubtaskFileMergingManagerRestoreOperation restoreOperation = new SubtaskFileMergingManagerRestoreOperation(checkpointId, (FileMergingSnapshotManager)fmsm, this.jobID, (TaskInfo)taskInfo, (OperatorID)entry.getKey(), (OperatorSubtaskState)entry.getValue());
                restoreOperation.restore();
            }
            TreeMap stateFiles = fmsm.getUploadedStates();
            Assertions.assertThat((int)stateFiles.size()).isEqualTo(1);
            Set restoreFileSet = (Set)stateFiles.get(checkpointId);
            Assertions.assertThat((Collection)restoreFileSet).isNotNull();
            Assertions.assertThat((int)restoreFileSet.size()).isEqualTo(4);
            Assertions.assertThat((Object)fmsm.spaceStat).isEqualTo((Object)oldSpaceStat);
            for (LogicalFile file : restoreFileSet) {
                Assertions.assertThat((Object)fmsm.getLogicalFile(file.getFileId())).isEqualTo((Object)file);
            }
            Set physicalFileSet = restoreFileSet.stream().map(LogicalFile::getPhysicalFile).map(PhysicalFile::getFilePath).collect(Collectors.toSet());
            fmsm.notifyCheckpointSubsumed(this.subtaskKey1, checkpointId);
            for (Path path : physicalFileSet) {
                Assertions.assertThat((boolean)path.getFileSystem().exists(path)).isFalse();
            }
        }
        catch (Throwable throwable) {
            var6_3 = throwable;
            throw throwable;
        }
        finally {
            if (fmsm != null) {
                if (var6_3 != null) {
                    try {
                        fmsm.close();
                    }
                    catch (Throwable throwable) {
                        var6_3.addSuppressed(throwable);
                    }
                } else {
                    fmsm.close();
                }
            }
        }
    }

    @Test
    public void testManagedDirCleanup() throws Exception {
        Path exclusiveDir;
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        Path sharedDirOfSubtask1 = new Path(this.sharedStateDir, this.subtaskKey1.getManagedDirName());
        Path sharedDirOfSubtask2 = new Path(this.sharedStateDir, this.subtaskKey2.getManagedDirName());
        this.emptyCheckpointBaseDir();
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir, 32L, PhysicalFilePool.Type.BLOCKING, Float.MAX_VALUE);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask1)).isTrue();
            Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask2)).isTrue();
            exclusiveDir = new Path(this.taskOwnedStateDir, fmsm.getId());
            Assertions.assertThat((boolean)fs.exists(exclusiveDir)).isTrue();
        }
        Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask1)).isFalse();
        Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask2)).isFalse();
        Assertions.assertThat((boolean)fs.exists(exclusiveDir)).isFalse();
        this.emptyCheckpointBaseDir();
        fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir, 32L, PhysicalFilePool.Type.BLOCKING, Float.MAX_VALUE);
        var6_5 = null;
        try {
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            fmsm.notifyCheckpointStart(this.subtaskKey1, 1L);
            fmsm.notifyCheckpointStart(this.subtaskKey2, 1L);
            fmsm.notifyCheckpointAborted(this.subtaskKey1, 1L);
            fmsm.notifyCheckpointAborted(this.subtaskKey2, 1L);
            Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask1)).isTrue();
            Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask2)).isTrue();
            exclusiveDir = new Path(this.taskOwnedStateDir, fmsm.getId());
            Assertions.assertThat((boolean)fs.exists(exclusiveDir)).isTrue();
        }
        catch (Throwable throwable) {
            var6_5 = throwable;
            throw throwable;
        }
        finally {
            if (fmsm != null) {
                if (var6_5 != null) {
                    try {
                        fmsm.close();
                    }
                    catch (Throwable throwable) {
                        var6_5.addSuppressed(throwable);
                    }
                } else {
                    fmsm.close();
                }
            }
        }
        Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask1)).isFalse();
        Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask2)).isFalse();
        Assertions.assertThat((boolean)fs.exists(exclusiveDir)).isFalse();
        this.emptyCheckpointBaseDir();
        fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir, 32L, PhysicalFilePool.Type.BLOCKING, Float.MAX_VALUE);
        var6_5 = null;
        try {
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            fmsm.notifyCheckpointStart(this.subtaskKey1, 1L);
            fmsm.notifyCheckpointStart(this.subtaskKey2, 1L);
            fmsm.notifyCheckpointComplete(this.subtaskKey1, 1L);
            fmsm.notifyCheckpointComplete(this.subtaskKey2, 1L);
            Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask1)).isTrue();
            Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask2)).isTrue();
            exclusiveDir = new Path(this.taskOwnedStateDir, fmsm.getId());
            Assertions.assertThat((boolean)fs.exists(exclusiveDir)).isTrue();
        }
        catch (Throwable throwable) {
            var6_5 = throwable;
            throw throwable;
        }
        finally {
            if (fmsm != null) {
                if (var6_5 != null) {
                    try {
                        fmsm.close();
                    }
                    catch (Throwable throwable) {
                        var6_5.addSuppressed(throwable);
                    }
                } else {
                    fmsm.close();
                }
            }
        }
        Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask1)).isTrue();
        Assertions.assertThat((boolean)fs.exists(sharedDirOfSubtask2)).isTrue();
        Assertions.assertThat((boolean)fs.exists(exclusiveDir)).isTrue();
    }

    private void emptyCheckpointBaseDir() throws IOException {
        FileSystem fs = this.checkpointBaseDir.getFileSystem();
        FileStatus[] sub = fs.listStatus(this.checkpointBaseDir);
        if (sub != null) {
            for (FileStatus subFile : sub) {
                fs.delete(subFile.getPath(), true);
            }
        }
    }

    private OperatorSubtaskState buildOperatorSubtaskState(long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) throws Exception {
        IncrementalRemoteKeyedStateHandle keyedStateHandle1 = new IncrementalRemoteKeyedStateHandle(UUID.randomUUID(), new KeyGroupRange(0, 8), checkpointId, Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)this.buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry), (String)"localPath")), Collections.emptyList(), null);
        KeyGroupsStateHandle keyedStateHandle2 = new KeyGroupsStateHandle(new KeyGroupRangeOffsets(0, 8), (StreamStateHandle)this.buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry));
        FileMergingOperatorStreamStateHandle operatorStateHandle1 = new FileMergingOperatorStreamStateHandle(null, null, Collections.emptyMap(), (StreamStateHandle)this.buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry));
        FileMergingOperatorStreamStateHandle operatorStateHandle2 = new FileMergingOperatorStreamStateHandle(null, null, Collections.emptyMap(), (StreamStateHandle)this.buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry));
        return OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)keyedStateHandle1).setRawKeyedState((KeyedStateHandle)keyedStateHandle2).setManagedOperatorState((OperatorStateHandle)operatorStateHandle1).setRawOperatorState((OperatorStateHandle)operatorStateHandle2).build();
    }

    private SegmentFileStateHandle buildOneSegmentFileHandle(long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) throws Exception {
        FileMergingCheckpointStateOutputStream outputStream = this.writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry);
        return outputStream.closeAndGetHandle();
    }

    FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException {
        return this.createFileMergingSnapshotManager(checkpointBaseDir, 0x2000000L, PhysicalFilePool.Type.NON_BLOCKING, 2.0f);
    }

    FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type filePoolType, float spaceAmplification) throws IOException {
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        if (!fs.exists(checkpointBaseDir)) {
            fs.mkdirs(checkpointBaseDir);
            fs.mkdirs(this.sharedStateDir);
            fs.mkdirs(this.taskOwnedStateDir);
        }
        FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder(this.jobID, new ResourceID("Testing"), this.getFileMergingType()).setMaxFileSize(maxFileSize).setFilePoolType(filePoolType).setMaxSpaceAmplification(spaceAmplification).setMetricGroup((TaskManagerJobMetricGroup)new UnregisteredMetricGroups.UnregisteredTaskManagerJobMetricGroup()).build();
        fmsm.initFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), checkpointBaseDir, this.sharedStateDir, this.taskOwnedStateDir, this.writeBufferSize);
        Assertions.assertThat((Object)fmsm).isNotNull();
        return fmsm;
    }

    FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) throws IOException {
        return this.writeCheckpointAndGetStream(this.subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE, fmsm, closeableRegistry, 32);
    }

    FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) throws IOException {
        return this.writeCheckpointAndGetStream(subtaskKey, checkpointId, scope, fmsm, closeableRegistry, 32);
    }

    FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry, int numBytes) throws IOException {
        FileMergingCheckpointStateOutputStream stream = fmsm.createCheckpointStateOutputStream(subtaskKey, checkpointId, scope);
        closeableRegistry.registerCloseable((AutoCloseable)stream);
        for (int i = 0; i < numBytes; ++i) {
            stream.write(i);
        }
        return stream;
    }

    void assertFileInManagedDir(FileMergingSnapshotManager fmsm, SegmentFileStateHandle stateHandle) {
        Assertions.assertThat((boolean)(fmsm instanceof FileMergingSnapshotManagerBase)).isTrue();
        Assertions.assertThat((Object)stateHandle).isNotNull();
        Path filePath = stateHandle.getFilePath();
        Assertions.assertThat((Object)filePath).isNotNull();
        Assertions.assertThat((boolean)((FileMergingSnapshotManagerBase)fmsm).isResponsibleForFile(filePath)).isTrue();
    }

    boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException {
        Assertions.assertThat((Object)stateHandle).isNotNull();
        Path filePath = stateHandle.getFilePath();
        Assertions.assertThat((Object)filePath).isNotNull();
        return filePath.getFileSystem().exists(filePath);
    }
}

