/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.net.InetAddress;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TaskExecutorLocalStateStoresManagerTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final int TOTAL_FLINK_MEMORY_MB = 1024;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreationFromConfig() throws Exception {
        Configuration config = new Configuration();
        File newFolder = temporaryFolder.newFolder();
        String tmpDir = newFolder.getAbsolutePath() + File.separator;
        String rootDirString = "__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", tmpDir);
        config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString);
        config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
        TaskManagerServices taskManagerServices = this.createTaskManagerServices(this.createTaskManagerServiceConfiguration(config));
        try {
            TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
            String[] split = rootDirString.split(",");
            File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
            for (int i = 0; i < split.length; ++i) {
                Assert.assertEquals((Object)new File(split[i], "localState"), (Object)rootDirectories[i]);
            }
            Assert.assertTrue((boolean)taskStateManager.isLocalRecoveryEnabled());
            Assert.assertEquals((Object)"localState", (Object)"localState");
            for (File rootDirectory : rootDirectories) {
                FileUtils.deleteFileOrDirectory((File)rootDirectory);
            }
        }
        finally {
            taskManagerServices.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreationFromConfigDefault() throws Exception {
        Configuration config = new Configuration();
        TaskManagerServicesConfiguration taskManagerServicesConfiguration = this.createTaskManagerServiceConfiguration(config);
        TaskManagerServices taskManagerServices = this.createTaskManagerServices(taskManagerServicesConfiguration);
        try {
            TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
            String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
            File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
            for (int i = 0; i < tmpDirPaths.length; ++i) {
                Assert.assertEquals((Object)new File(tmpDirPaths[i], "localState"), (Object)localStateRootDirectories[i]);
            }
            Assert.assertFalse((boolean)taskStateManager.isLocalRecoveryEnabled());
        }
        finally {
            taskManagerServices.shutDown();
        }
    }

    @Test
    public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        int subtaskIdx = 23;
        File[] rootDirs = new File[]{temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(true, rootDirs, Executors.directExecutor());
        TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, subtaskIdx);
        LocalRecoveryDirectoryProvider directoryProvider = taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider();
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((Object)new File(rootDirs[(i & Integer.MAX_VALUE) % rootDirs.length], storesManager.allocationSubDirString(allocationID)), (Object)directoryProvider.allocationBaseDirectory((long)i));
        }
        long chkId = 42L;
        File allocBaseDirChk42 = directoryProvider.allocationBaseDirectory(chkId);
        File subtaskSpecificCheckpointDirectory = directoryProvider.subtaskSpecificCheckpointDirectory(chkId);
        Assert.assertEquals((Object)new File(allocBaseDirChk42, "jid_" + jobID + File.separator + "vtx_" + jobVertexID + "_sti_" + subtaskIdx + File.separator + "chk_" + chkId), (Object)subtaskSpecificCheckpointDirectory);
        Assert.assertTrue((boolean)subtaskSpecificCheckpointDirectory.mkdirs());
        File testFile = new File(subtaskSpecificCheckpointDirectory, "test");
        Assert.assertTrue((boolean)testFile.createNewFile());
        Assert.assertEquals((Object)storesManager.isLocalRecoveryEnabled(), (Object)taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
        Assert.assertTrue((boolean)testFile.exists());
        storesManager.releaseLocalStateForAllocationId(allocationID);
        this.checkRootDirsClean(rootDirs);
        AllocationID otherAllocationID = new AllocationID();
        taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, otherAllocationID, jobVertexID, subtaskIdx);
        directoryProvider = taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider();
        File chkDir = directoryProvider.subtaskSpecificCheckpointDirectory(23L);
        Assert.assertTrue((boolean)chkDir.mkdirs());
        testFile = new File(chkDir, "test");
        Assert.assertTrue((boolean)testFile.createNewFile());
        storesManager.shutdown();
        this.checkRootDirsClean(rootDirs);
    }

    private void checkRootDirsClean(File[] rootDirs) {
        for (File rootDir : rootDirs) {
            Object[] files = rootDir.listFiles();
            if (files == null) continue;
            Assert.assertArrayEquals((Object[])new File[0], (Object[])files);
        }
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration config) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration((Configuration)config, (ResourceID)ResourceID.generate(), (String)InetAddress.getLocalHost().getHostName(), (boolean)true, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)config));
    }

    private TaskManagerServices createTaskManagerServices(TaskManagerServicesConfiguration config) throws Exception {
        return TaskManagerServices.fromConfiguration((TaskManagerServicesConfiguration)config, (PermanentBlobService)VoidPermanentBlobService.INSTANCE, (MetricGroup)UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), (ExecutorService)Executors.newDirectExecutorService(), throwable -> {});
    }
}

