/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiFunctionWithException;

public class MockSubtaskCheckpointCoordinatorBuilder {
    private String taskName = "mock-task";
    private CheckpointStorageWorkerView checkpointStorage;
    private Environment environment;
    private AsyncExceptionHandler asyncExceptionHandler;
    private StreamTaskActionExecutor actionExecutor = StreamTaskActionExecutor.IMMEDIATE;
    private ExecutorService executorService = Executors.newDirectExecutorService();
    private BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot = (channelStateWriter, aLong) -> FutureUtils.completedVoidFuture();
    private boolean unalignedCheckpointEnabled;
    private int maxRecordAbortedCheckpoints = 10;
    private boolean enableCheckpointAfterTasksFinished = true;

    public MockSubtaskCheckpointCoordinatorBuilder setEnvironment(Environment environment) {
        this.environment = environment;
        return this;
    }

    public MockSubtaskCheckpointCoordinatorBuilder setPrepareInputSnapshot(BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot) {
        this.prepareInputSnapshot = prepareInputSnapshot;
        return this;
    }

    public MockSubtaskCheckpointCoordinatorBuilder setExecutor(ExecutorService executor) {
        this.executorService = executor;
        return this;
    }

    public MockSubtaskCheckpointCoordinatorBuilder setMaxRecordAbortedCheckpoints(int maxRecordAbortedCheckpoints) {
        this.maxRecordAbortedCheckpoints = maxRecordAbortedCheckpoints;
        return this;
    }

    public MockSubtaskCheckpointCoordinatorBuilder setUnalignedCheckpointEnabled(boolean unalignedCheckpointEnabled) {
        this.unalignedCheckpointEnabled = unalignedCheckpointEnabled;
        return this;
    }

    public MockSubtaskCheckpointCoordinatorBuilder setEnableCheckpointAfterTasksFinished(boolean enableCheckpointAfterTasksFinished) {
        this.enableCheckpointAfterTasksFinished = enableCheckpointAfterTasksFinished;
        return this;
    }

    SubtaskCheckpointCoordinator build() throws IOException {
        if (this.environment == null) {
            this.environment = MockEnvironment.builder().build();
        }
        if (this.checkpointStorage == null) {
            this.checkpointStorage = new MemoryBackendCheckpointStorageAccess(this.environment.getJobID(), null, null, 1024);
        }
        if (this.asyncExceptionHandler == null) {
            this.asyncExceptionHandler = new NonHandleAsyncException();
        }
        return new SubtaskCheckpointCoordinatorImpl(this.checkpointStorage, this.taskName, this.actionExecutor, this.executorService, this.environment, this.asyncExceptionHandler, this.unalignedCheckpointEnabled, this.enableCheckpointAfterTasksFinished, this.prepareInputSnapshot, this.maxRecordAbortedCheckpoints, (callable, duration) -> () -> {});
    }

    private static class NonHandleAsyncException
    implements AsyncExceptionHandler {
        private NonHandleAsyncException() {
        }

        public void handleAsyncException(String message, Throwable exception) {
        }
    }
}

