/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.Canceling;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.ExecutingTest;
import org.apache.flink.runtime.scheduler.adaptive.Failing;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.MockStateWithExecutionGraphContext;
import org.apache.flink.runtime.scheduler.adaptive.Restarting;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint;
import org.apache.flink.runtime.scheduler.adaptive.TestingOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={TestLoggerExtension.class})
class StopWithSavepointTest {
    private static final Logger LOG = LoggerFactory.getLogger(StopWithSavepointTest.class);
    private static final String SAVEPOINT_PATH = "test://savepoint/path";

    StopWithSavepointTest() {
    }

    @Test
    void testFinishedOnSuccessfulStopWithSavepoint() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            sws.onGloballyTerminalState(JobStatus.FINISHED);
            ctx.triggerExecutors();
            ctx.setExpectFinished(WaitingForResourcesTest.assertNonNull());
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            AssertionsForClassTypes.assertThat((String)((String)sws.getOperationFuture().get())).isEqualTo(SAVEPOINT_PATH);
        }
    }

    @Test
    void testJobFailedAndSavepointOperationSucceeds() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
            ctx.triggerExecutors();
            ctx.setExpectFailing(failingArguments -> {
                AssertionsForClassTypes.assertThat((Object)failingArguments.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.FAILED);
                AssertionsForClassTypes.assertThat((Throwable)failingArguments.getFailureCause()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(StopWithSavepointStoppingException.class)});
            });
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            AssertionsForClassTypes.assertThat((CompletableFuture)sws.getOperationFuture()).isCompletedExceptionally();
        }
    }

    @Test
    void testJobFailedAndSavepointOperationFails() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> {
                AssertionsForClassTypes.assertThat((Object)failingArguments.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.FAILED);
                AssertionsForClassTypes.assertThat((Throwable)failingArguments.getFailureCause()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkException.class)});
            });
            mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
            savepointFuture.completeExceptionally(new RuntimeException());
            ctx.triggerExecutors();
            AssertionsForClassTypes.assertThat((CompletableFuture)sws.getOperationFuture()).isCompletedExceptionally();
        }
    }

    @Test
    void testJobFinishedBeforeSavepointFuture() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectFinished(WaitingForResourcesTest.assertNonNull());
            mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED);
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            AssertionsForClassTypes.assertThat((String)((String)sws.getOperationFuture().get())).isEqualTo(SAVEPOINT_PATH);
        }
    }

    @Test
    void testTransitionToCancellingOnCancel() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            sws.cancel();
        }
    }

    @Test
    void testTransitionToFinishedOnSuspend() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx);
            ctx.setExpectFinished(archivedExecutionGraph -> AssertionsForClassTypes.assertThat((Object)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.SUSPENDED));
            sws.suspend((Throwable)new RuntimeException());
        }
    }

    @Test
    void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            sws.handleGlobalFailure((Throwable)new RuntimeException());
        }
    }

    @Test
    void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> AssertionsForClassTypes.assertThat((Throwable)failingArguments.getFailureCause()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RuntimeException.class)}));
            sws.handleGlobalFailure((Throwable)new RuntimeException());
        }
    }

    @Test
    void testFailingOnUpdateTaskExecutionStateWithNoRestart() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, executionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> AssertionsForClassTypes.assertThat((Throwable)failingArguments.getFailureCause()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RuntimeException.class)}));
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            executionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            AssertionsForClassTypes.assertThat((boolean)sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
        }
    }

    @Test
    void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, executionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            executionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            AssertionsForClassTypes.assertThat((boolean)sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
        }
    }

    @Test
    void testExceptionalOperationFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion() throws Exception {
        StopWithSavepoint sws;
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            sws = StopWithSavepointTest.createStopWithSavepoint(ctx);
            ctx.setStopWithSavepoint(sws);
            sws.onLeave(Canceling.class);
        }
        AssertionsForClassTypes.assertThat((CompletableFuture)sws.getOperationFuture()).isCompletedExceptionally();
    }

    @Test
    void testExceptionalSavepointCompletionLeadsToExceptionalOperationFutureCompletion() throws Exception {
        StopWithSavepoint sws;
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectExecuting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.completeExceptionally(new RuntimeException("Test error"));
        }
        AssertionsForClassTypes.assertThat((CompletableFuture)sws.getOperationFuture()).isCompletedExceptionally();
    }

    @Test
    void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception {
        StopWithSavepoint sws;
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectExecuting(executingArguments -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)AssertionsForClassTypes.assertThat((Object)executingArguments.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.RUNNING);
            });
            savepointFuture.completeExceptionally(new RuntimeException("Test error"));
        }
        AssertionsForClassTypes.assertThat((CompletableFuture)sws.getOperationFuture()).isCompletedExceptionally();
    }

    @Test
    void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            executionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            AssertionsForClassTypes.assertThat((boolean)sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
        }
    }

    @Test
    void testOnFailureWaitsForSavepointCompletion() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            sws.onFailure((Throwable)new Exception("task failure"));
            ctx.triggerExecutors();
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
        }
    }

    @Test
    void testConcurrentSavepointFailureAndGloballyTerminalStateCauseRestart() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            sws.onFailure((Throwable)new Exception("task failure"));
            ctx.triggerExecutors();
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.completeExceptionally(new Exception("savepoint failure"));
            ctx.triggerExecutors();
        }
    }

    @Test
    void testEnsureCheckpointSchedulerIsStartedAgain() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            AssertionsForClassTypes.assertThat((boolean)mockStopWithSavepointOperations.isCheckpointSchedulerStarted()).isFalse();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = StopWithSavepointTest.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectExecuting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.completeExceptionally(new RuntimeException("Test error"));
            ctx.triggerExecutors();
            AssertionsForClassTypes.assertThat((boolean)mockStopWithSavepointOperations.isCheckpointSchedulerStarted()).isTrue();
        }
    }

    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx) {
        return StopWithSavepointTest.createStopWithSavepoint(ctx, new MockCheckpointScheduling(), new StateTrackingMockExecutionGraph(), new CompletableFuture<String>());
    }

    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, CompletableFuture<String> savepointFuture) {
        return StopWithSavepointTest.createStopWithSavepoint(ctx, new MockCheckpointScheduling(), new StateTrackingMockExecutionGraph(), savepointFuture);
    }

    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, ExecutionGraph executionGraph, CompletableFuture<String> savepointFuture) {
        return StopWithSavepointTest.createStopWithSavepoint(ctx, new MockCheckpointScheduling(), executionGraph, savepointFuture);
    }

    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, ExecutionGraph executionGraph) {
        return StopWithSavepointTest.createStopWithSavepoint(ctx, executionGraph, new CompletableFuture<String>());
    }

    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, CheckpointScheduling checkpointScheduling, CompletableFuture<String> savepointFuture) {
        return StopWithSavepointTest.createStopWithSavepoint(ctx, checkpointScheduling, new StateTrackingMockExecutionGraph(), savepointFuture);
    }

    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, CheckpointScheduling checkpointScheduling, ExecutionGraph executionGraph, CompletableFuture<String> savepointFuture) {
        ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, LOG, (Executor)ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor());
        TestingOperatorCoordinatorHandler operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
        executionGraph.transitionToRunning();
        return new StopWithSavepoint((StopWithSavepoint.Context)ctx, executionGraph, executionGraphHandler, (OperatorCoordinatorHandler)operatorCoordinatorHandler, checkpointScheduling, LOG, ClassLoader.getSystemClassLoader(), savepointFuture, new ArrayList());
    }

    private static class MockCheckpointScheduling
    implements CheckpointScheduling {
        private boolean checkpointSchedulerStarted = false;

        private MockCheckpointScheduling() {
        }

        public void startCheckpointScheduler() {
            this.checkpointSchedulerStarted = true;
        }

        public void stopCheckpointScheduler() {
            this.checkpointSchedulerStarted = false;
        }

        boolean isCheckpointSchedulerStarted() {
            return this.checkpointSchedulerStarted;
        }
    }

    private static class MockStopWithSavepointContext
    extends MockStateWithExecutionGraphContext
    implements StopWithSavepoint.Context {
        private Function<Throwable, FailureResult> howToHandleFailure;
        private final StateValidator<ExecutingTest.FailingArguments> failingStateValidator = new StateValidator("failing");
        private final StateValidator<ExecutingTest.RestartingArguments> restartingStateValidator = new StateValidator("restarting");
        private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator = new StateValidator("cancelling");
        private final StateValidator<ExecutingTest.CancellingArguments> executingStateTransition = new StateValidator("executing");
        private StopWithSavepoint state;

        private MockStopWithSavepointContext() {
        }

        public void setStopWithSavepoint(StopWithSavepoint sws) {
            this.state = sws;
        }

        public void setExpectFailing(Consumer<ExecutingTest.FailingArguments> asserter) {
            this.failingStateValidator.expectInput(asserter);
        }

        public void setExpectRestarting(Consumer<ExecutingTest.RestartingArguments> asserter) {
            this.restartingStateValidator.expectInput(asserter);
        }

        public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
            this.cancellingStateValidator.expectInput(asserter);
        }

        public void setExpectExecuting(Consumer<ExecutingTest.CancellingArguments> asserter) {
            this.executingStateTransition.expectInput(asserter);
        }

        public void setHowToHandleFailure(Function<Throwable, FailureResult> function) {
            this.howToHandleFailure = function;
        }

        public FailureResult howToHandleFailure(Throwable failure) {
            return this.howToHandleFailure.apply(failure);
        }

        private void simulateTransitionToState(Class<? extends State> target) {
            Preconditions.checkNotNull((Object)this.state, (String)"StopWithSavepoint state must be set via setStopWithSavepoint() to call onLeave() on leaving the state");
            this.state.onLeave(target);
        }

        public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
            if (this.hadStateTransition) {
                throw new IllegalStateException("Only one state transition is allowed.");
            }
            this.simulateTransitionToState(Canceling.class);
            this.cancellingStateValidator.validateInput(new ExecutingTest.CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        public void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, List<ExceptionHistoryEntry> failureCollection) {
            if (this.hadStateTransition) {
                throw new IllegalStateException("Only one state transition is allowed.");
            }
            this.simulateTransitionToState(Restarting.class);
            this.restartingStateValidator.validateInput(new ExecutingTest.RestartingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, backoffTime));
            this.hadStateTransition = true;
        }

        public void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause, List<ExceptionHistoryEntry> failureCollection) {
            if (this.hadStateTransition) {
                throw new IllegalStateException("Only one state transition is allowed.");
            }
            this.simulateTransitionToState(Failing.class);
            this.failingStateValidator.validateInput(new ExecutingTest.FailingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, failureCause));
            this.hadStateTransition = true;
        }

        public void goToExecuting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
            if (this.hadStateTransition) {
                throw new IllegalStateException("Only one state transition is allowed.");
            }
            this.simulateTransitionToState(Executing.class);
            this.executingStateTransition.validateInput(new ExecutingTest.CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        @Override
        public boolean isState(State expectedState) {
            return !this.hadStateTransition;
        }

        public ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration delay) {
            if (!delay.isZero()) {
                throw new UnsupportedOperationException("Currently only immediate execution is supported");
            }
            return this.getMainThreadExecutor().schedule(() -> {
                if (this.isState(state)) {
                    runnable.run();
                }
            }, delay.toMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public void close() throws Exception {
            super.close();
            this.failingStateValidator.close();
            this.restartingStateValidator.close();
            this.cancellingStateValidator.close();
            this.executingStateTransition.close();
        }
    }
}

