/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobDispatcherITCase
extends TestLogger {
    private static final Duration TIMEOUT = Duration.ofMinutes(10L);
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    private Supplier<DispatcherResourceManagerComponentFactory> createJobModeDispatcherResourceManagerComponentFactorySupplier(Configuration configuration) {
        return () -> {
            try {
                return new DefaultDispatcherResourceManagerComponentFactory((DispatcherRunnerFactory)new DefaultDispatcherRunnerFactory((DispatcherLeaderProcessFactoryFactory)JobDispatcherLeaderProcessFactoryFactory.create((JobGraphRetriever)FileJobGraphRetriever.createFrom((Configuration)configuration, null))), (ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance(), (RestEndpointFactory)JobRestEndpointFactory.INSTANCE);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Test
    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)TIMEOUT);
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        TestingMiniClusterConfiguration clusterConfiguration = new TestingMiniClusterConfiguration.Builder().setConfiguration(configuration).build();
        EmbeddedHaServicesWithLeadershipControl haServices = new EmbeddedHaServicesWithLeadershipControl((Executor)TestingUtils.defaultExecutor());
        Configuration newConfiguration = new Configuration((Configuration)clusterConfiguration.getConfiguration());
        long checkpointInterval = 100L;
        JobID jobID = this.generateAndPersistJobGraph(newConfiguration, 100L, TEMPORARY_FOLDER.newFolder().toPath());
        AtLeastOneCheckpointInvokable.reset();
        try (TestingMiniCluster cluster = new TestingMiniCluster(clusterConfiguration, () -> haServices, this.createJobModeDispatcherResourceManagerComponentFactorySupplier(newConfiguration));){
            cluster.start();
            AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
            CompletableFuture firstJobResult = cluster.requestJobResult(jobID);
            haServices.revokeDispatcherLeadership();
            Assert.assertEquals((Object)ApplicationStatus.UNKNOWN, (Object)((JobResult)firstJobResult.get()).getApplicationStatus());
            haServices.grantDispatcherLeadership();
            JobDispatcherITCase.awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
            Assert.assertNotNull((Object)((AccessExecutionGraph)cluster.getExecutionGraph(jobID).get()).getCheckpointStatsSnapshot().getLatestRestoredCheckpoint());
        }
    }

    private JobID generateAndPersistJobGraph(Configuration configuration, long checkpointInterval, Path tmpPath) throws Exception {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
        jobVertex.setParallelism(1);
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(checkpointInterval).build();
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setJobCheckpointingSettings(checkpointingSettings).build();
        Path jobGraphPath = tmpPath.resolve((String)FileJobGraphRetriever.JOB_GRAPH_FILE_PATH.defaultValue());
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(jobGraphPath, StandardOpenOption.CREATE));){
            objectOutputStream.writeObject(jobGraph);
        }
        configuration.setString(FileJobGraphRetriever.JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString());
        return jobGraph.getJobID();
    }

    private static void awaitJobStatus(MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            try {
                return cluster.getJobStatus(jobId).get() == status;
            }
            catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent()) {
                    return false;
                }
                throw e;
            }
        }), deadline);
    }

    public static class AtLeastOneCheckpointInvokable
    extends AbstractInvokable {
        private static final long CANCEL_SIGNAL = -2L;
        private final BlockingQueue<Long> checkpointsToConfirm = new ArrayBlockingQueue<Long>(1);
        private static volatile CountDownLatch atLeastOneCheckpointCompleted;

        private static void reset() {
            atLeastOneCheckpointCompleted = new CountDownLatch(1);
        }

        public AtLeastOneCheckpointInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            long signal = this.checkpointsToConfirm.take();
            while (signal != -2L) {
                this.getEnvironment().acknowledgeCheckpoint(signal, new CheckpointMetrics());
                signal = this.checkpointsToConfirm.take();
            }
        }

        public Future<Void> cancel() throws Exception {
            this.checkpointsToConfirm.add(-2L);
            return FutureUtils.completedVoidFuture();
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            atLeastOneCheckpointCompleted.countDown();
            return CompletableFuture.completedFuture(null);
        }
    }
}

