/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.TernaryBoolean;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class StateBackendLoadingTest {
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    private final ClassLoader cl = this.getClass().getClassLoader();
    private final String backendKey = StateBackendOptions.STATE_BACKEND.key();

    @Test
    public void testNoStateBackendDefined() throws Exception {
        Assert.assertNull((Object)StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)new Configuration(), (ClassLoader)this.cl, null));
    }

    @Test
    public void testInstantiateHashMapStateBackendBackendByDefault() throws Exception {
        StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(null, (Configuration)new Configuration(), (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(backend instanceof HashMapStateBackend));
    }

    @Test
    public void testApplicationDefinedHasPrecedence() throws Exception {
        StateBackend appBackend = (StateBackend)Mockito.mock(StateBackend.class);
        Configuration config = new Configuration();
        config.setString(this.backendKey, "jobmanager");
        StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)appBackend, (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertEquals((Object)appBackend, (Object)backend);
    }

    @Test
    public void testLoadMemoryStateBackendNoParameters() throws Exception {
        Configuration config1 = new Configuration();
        config1.setString(this.backendKey, "jobmanager");
        Configuration config2 = new Configuration();
        config2.setString(this.backendKey, MemoryStateBackendFactory.class.getName());
        StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null);
        StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config2, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(backend1 instanceof MemoryStateBackend));
        Assert.assertTrue((boolean)(backend2 instanceof MemoryStateBackend));
    }

    @Test
    public void testLoadMemoryStateWithParameters() throws Exception {
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedCheckpointPath = new Path(checkpointDir);
        Path expectedSavepointPath = new Path(savepointDir);
        Configuration config1 = new Configuration();
        config1.setString(this.backendKey, "jobmanager");
        config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        Configuration config2 = new Configuration();
        config2.setString(this.backendKey, MemoryStateBackendFactory.class.getName());
        config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        MemoryStateBackend backend1 = (MemoryStateBackend)StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null);
        MemoryStateBackend backend2 = (MemoryStateBackend)StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config2, (ClassLoader)this.cl, null);
        Assert.assertNotNull((Object)backend1);
        Assert.assertNotNull((Object)backend2);
        Assert.assertEquals((Object)expectedCheckpointPath, (Object)backend1.getCheckpointPath());
        Assert.assertEquals((Object)expectedSavepointPath, (Object)backend1.getSavepointPath());
        Assert.assertEquals((Object)expectedCheckpointPath, (Object)backend2.getCheckpointPath());
        Assert.assertEquals((Object)expectedSavepointPath, (Object)backend2.getSavepointPath());
    }

    @Test
    public void testConfigureMemoryStateBackend() throws Exception {
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedCheckpointPath = new Path(checkpointDir);
        Path expectedSavepointPath = new Path(savepointDir);
        int maxSize = 100;
        MemoryStateBackend backend = new MemoryStateBackend(100);
        Configuration config = new Configuration();
        config.setString(this.backendKey, "filesystem");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)backend, (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(loadedBackend instanceof MemoryStateBackend));
        MemoryStateBackend memBackend = (MemoryStateBackend)loadedBackend;
        Assert.assertEquals((Object)expectedCheckpointPath, (Object)memBackend.getCheckpointPath());
        Assert.assertEquals((Object)expectedSavepointPath, (Object)memBackend.getSavepointPath());
        Assert.assertEquals((long)100L, (long)memBackend.getMaxStateSize());
    }

    @Test
    public void testConfigureMemoryStateBackendMixed() throws Exception {
        String appCheckpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedCheckpointPath = new Path(appCheckpointDir);
        Path expectedSavepointPath = new Path(savepointDir);
        MemoryStateBackend backend = new MemoryStateBackend(appCheckpointDir, null);
        Configuration config = new Configuration();
        config.setString(this.backendKey, "filesystem");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)backend, (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(loadedBackend instanceof MemoryStateBackend));
        MemoryStateBackend memBackend = (MemoryStateBackend)loadedBackend;
        Assert.assertEquals((Object)expectedCheckpointPath, (Object)memBackend.getCheckpointPath());
        Assert.assertEquals((Object)expectedSavepointPath, (Object)memBackend.getSavepointPath());
    }

    @Test
    public void testLoadFileSystemStateBackend() throws Exception {
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedCheckpointsPath = new Path(checkpointDir);
        Path expectedSavepointsPath = new Path(savepointDir);
        MemorySize threshold = MemorySize.parse((String)"900kb");
        int minWriteBufferSize = 1024;
        Configuration config1 = new Configuration();
        config1.setString(this.backendKey, "filesystem");
        config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)threshold);
        config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 1024);
        Configuration config2 = new Configuration();
        config2.setString(this.backendKey, FsStateBackendFactory.class.getName());
        config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)threshold);
        config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 1024);
        StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null);
        StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)config2, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(backend1 instanceof HashMapStateBackend));
        Assert.assertTrue((boolean)(backend2 instanceof FsStateBackend));
        HashMapStateBackend fs1 = (HashMapStateBackend)backend1;
        FsStateBackend fs2 = (FsStateBackend)backend2;
        Assert.assertEquals((Object)expectedCheckpointsPath, (Object)fs2.getCheckpointPath());
        Assert.assertEquals((Object)expectedSavepointsPath, (Object)fs2.getSavepointPath());
        Assert.assertEquals((long)threshold.getBytes(), (long)fs2.getMinFileSizeThreshold());
        Assert.assertEquals((long)Math.max(threshold.getBytes(), 1024L), (long)fs2.getWriteBufferSize());
    }

    @Test
    public void testLoadFileSystemStateBackendMixed() throws Exception {
        String appCheckpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedCheckpointsPath = new Path(new URI(appCheckpointDir));
        Path expectedSavepointsPath = new Path(savepointDir);
        int threshold = 1000000;
        int writeBufferSize = 4000000;
        FsStateBackend backend = new FsStateBackend(new URI(appCheckpointDir), null, 1000000, 4000000, TernaryBoolean.TRUE);
        Configuration config = new Configuration();
        config.setString(this.backendKey, "jobmanager");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.parse((String)"20"));
        config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000);
        StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)backend, (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(loadedBackend instanceof FsStateBackend));
        FsStateBackend fs = (FsStateBackend)loadedBackend;
        Assert.assertEquals((Object)expectedCheckpointsPath, (Object)fs.getCheckpointPath());
        Assert.assertEquals((Object)expectedSavepointsPath, (Object)fs.getSavepointPath());
        Assert.assertEquals((long)1000000L, (long)fs.getMinFileSizeThreshold());
        Assert.assertEquals((long)4000000L, (long)fs.getWriteBufferSize());
    }

    @Test
    public void testLoadingFails() throws Exception {
        Configuration config = new Configuration();
        config.setString(this.backendKey, "does.not.exist");
        try {
            StateBackendLoader.fromApplicationOrConfigOrDefault(null, (Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with an exception");
        }
        catch (DynamicCodeLoadingException dynamicCodeLoadingException) {
            // empty catch block
        }
        config.setString(this.backendKey, File.class.getName());
        try {
            StateBackendLoader.fromApplicationOrConfigOrDefault(null, (Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with an exception");
        }
        catch (DynamicCodeLoadingException dynamicCodeLoadingException) {
            // empty catch block
        }
        config.setString(this.backendKey, FailingFactory.class.getName());
        try {
            StateBackendLoader.fromApplicationOrConfigOrDefault(null, (Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with an exception");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testHighAvailabilityDefault() throws Exception {
        String haPersistenceDir = new Path(this.tmp.newFolder().toURI()).toString();
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, null);
        Path checkpointPath = new Path(this.tmp.newFolder().toURI().toString());
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, checkpointPath);
    }

    @Test
    public void testHighAvailabilityDefaultLocalPaths() throws Exception {
        String haPersistenceDir = new Path(this.tmp.newFolder().getAbsolutePath()).toString();
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, null);
        Path checkpointPath = new Path(this.tmp.newFolder().toURI().toString()).makeQualified(FileSystem.getLocalFileSystem());
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, checkpointPath);
    }

    private void testMemoryBackendHighAvailabilityDefault(String haPersistenceDir, Path checkpointPath) throws Exception {
        Configuration config1 = new Configuration();
        config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
        config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
        Configuration config2 = new Configuration();
        config2.setString(this.backendKey, "jobmanager");
        config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
        config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
        if (checkpointPath != null) {
            config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointPath.toUri().toString());
            config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointPath.toUri().toString());
        }
        MemoryStateBackend appBackend = new MemoryStateBackend();
        StateBackend loaded1 = StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)appBackend, (Configuration)config1, (ClassLoader)this.cl, null);
        StateBackend loaded2 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, (Configuration)config1, (ClassLoader)this.cl, null);
        StateBackend loaded3 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, (Configuration)config2, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(loaded1 instanceof MemoryStateBackend));
        Assert.assertTrue((boolean)(loaded2 instanceof HashMapStateBackend));
        Assert.assertTrue((boolean)(loaded3 instanceof MemoryStateBackend));
        MemoryStateBackend memBackend1 = (MemoryStateBackend)loaded1;
        MemoryStateBackend memBackend2 = (MemoryStateBackend)loaded3;
        Assert.assertNull((Object)memBackend1.getSavepointPath());
        if (checkpointPath != null) {
            Assert.assertNotNull((Object)memBackend1.getCheckpointPath());
            Assert.assertNotNull((Object)memBackend2.getCheckpointPath());
            Assert.assertEquals((Object)checkpointPath, (Object)memBackend1.getCheckpointPath());
            Assert.assertEquals((Object)checkpointPath, (Object)memBackend2.getCheckpointPath());
        } else {
            Assert.assertNull((Object)memBackend1.getCheckpointPath());
            Assert.assertNull((Object)memBackend2.getCheckpointPath());
        }
    }

    static final class FailingFactory
    implements StateBackendFactory<StateBackend> {
        FailingFactory() {
        }

        public StateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IOException {
            throw new IOException("fail!");
        }
    }
}

