/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AsyncCheckpointRunnable
implements Runnable,
Closeable {
    public static final Logger LOG = LoggerFactory.getLogger(AsyncCheckpointRunnable.class);
    private final String taskName;
    private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
    private final boolean isTaskDeployedAsFinished;
    private final boolean isTaskFinished;
    private final Supplier<Boolean> isTaskRunning;
    private final Environment taskEnvironment;
    private final CompletableFuture<Void> finishedFuture = new CompletableFuture();
    private final AsyncExceptionHandler asyncExceptionHandler;
    private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
    private final CheckpointMetaData checkpointMetaData;
    private final CheckpointMetricsBuilder checkpointMetrics;
    private final long asyncConstructionNanos;
    private final AtomicReference<AsyncCheckpointState> asyncCheckpointState = new AtomicReference<AsyncCheckpointState>(AsyncCheckpointState.RUNNING);

    public boolean isRunning() {
        return this.asyncCheckpointState.get() == AsyncCheckpointState.RUNNING;
    }

    AsyncCheckpointRunnable(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetrics, long asyncConstructionNanos, String taskName, Consumer<AsyncCheckpointRunnable> unregister, Environment taskEnvironment, AsyncExceptionHandler asyncExceptionHandler, boolean isTaskDeployedAsFinished, boolean isTaskFinished, Supplier<Boolean> isTaskRunning) {
        this.operatorSnapshotsInProgress = (Map)Preconditions.checkNotNull(operatorSnapshotsInProgress);
        this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
        this.checkpointMetrics = (CheckpointMetricsBuilder)Preconditions.checkNotNull((Object)checkpointMetrics);
        this.asyncConstructionNanos = asyncConstructionNanos;
        this.taskName = (String)Preconditions.checkNotNull((Object)taskName);
        this.unregisterConsumer = unregister;
        this.taskEnvironment = (Environment)Preconditions.checkNotNull((Object)taskEnvironment);
        this.asyncExceptionHandler = (AsyncExceptionHandler)Preconditions.checkNotNull((Object)asyncExceptionHandler);
        this.isTaskDeployedAsFinished = isTaskDeployedAsFinished;
        this.isTaskFinished = isTaskFinished;
        this.isTaskRunning = isTaskRunning;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        long asyncStartNanos = System.nanoTime();
        long asyncStartDelayMillis = (asyncStartNanos - this.asyncConstructionNanos) / 1000000L;
        LOG.debug("{} - started executing asynchronous part of checkpoint {}. Asynchronous start delay: {} ms", new Object[]{this.taskName, this.checkpointMetaData.getCheckpointId(), asyncStartDelayMillis});
        FileSystemSafetyNet.initializeSafetyNetForThread();
        try {
            SnapshotsFinalizeResult snapshotsFinalizeResult = this.isTaskDeployedAsFinished ? this.finalizedFinishedSnapshots() : this.finalizeNonFinishedSnapshots();
            long asyncEndNanos = System.nanoTime();
            long asyncDurationMillis = (asyncEndNanos - this.asyncConstructionNanos) / 1000000L;
            this.checkpointMetrics.setBytesPersistedDuringAlignment(snapshotsFinalizeResult.bytesPersistedDuringAlignment);
            this.checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
            if (this.asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
                this.reportCompletedSnapshotStates(snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates, snapshotsFinalizeResult.localTaskOperatorSubtaskStates, asyncDurationMillis);
            } else {
                LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", (Object)this.taskName, (Object)this.checkpointMetaData.getCheckpointId());
            }
            this.finishedFuture.complete(null);
        }
        catch (Exception e) {
            LOG.info("{} - asynchronous part of checkpoint {} could not be completed.", new Object[]{this.taskName, this.checkpointMetaData.getCheckpointId(), e});
            this.handleExecutionException(e);
            this.finishedFuture.completeExceptionally(e);
        }
        finally {
            this.unregisterConsumer.accept(this);
            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
        }
    }

    private SnapshotsFinalizeResult finalizedFinishedSnapshots() throws Exception {
        for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : this.operatorSnapshotsInProgress.entrySet()) {
            OperatorSnapshotFutures snapshotInProgress = entry.getValue();
            snapshotInProgress.getInputChannelStateFuture().get();
            snapshotInProgress.getResultSubpartitionStateFuture().get();
        }
        return new SnapshotsFinalizeResult(TaskStateSnapshot.FINISHED_ON_RESTORE, TaskStateSnapshot.FINISHED_ON_RESTORE, 0L);
    }

    private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws Exception {
        TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size(), this.isTaskFinished);
        TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size(), this.isTaskFinished);
        long bytesPersistedDuringAlignment = 0L;
        for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : this.operatorSnapshotsInProgress.entrySet()) {
            OperatorID operatorID = entry.getKey();
            OperatorSnapshotFutures snapshotInProgress = entry.getValue();
            OperatorSnapshotFinalizer finalizedSnapshots = OperatorSnapshotFinalizer.create(snapshotInProgress);
            jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, finalizedSnapshots.getJobManagerOwnedState());
            localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, finalizedSnapshots.getTaskLocalState());
            bytesPersistedDuringAlignment += finalizedSnapshots.getJobManagerOwnedState().getResultSubpartitionState().getStateSize();
            bytesPersistedDuringAlignment += finalizedSnapshots.getJobManagerOwnedState().getInputChannelState().getStateSize();
        }
        return new SnapshotsFinalizeResult(jobManagerTaskOperatorSubtaskStates, localTaskOperatorSubtaskStates, bytesPersistedDuringAlignment);
    }

    private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot, TaskStateSnapshot localTaskStateSnapshot, long asyncDurationMillis) {
        boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
        boolean hasLocalState = localTaskStateSnapshot.hasState();
        Preconditions.checkState((hasAckState || !hasLocalState ? 1 : 0) != 0, (Object)"Found cached state but no corresponding primary state is reported to the job manager. This indicates a problem.");
        this.taskEnvironment.getTaskStateManager().reportTaskStateSnapshots(this.checkpointMetaData, this.checkpointMetrics.setBytesPersistedOfThisCheckpoint(acknowledgedTaskStateSnapshot.getCheckpointedSize()).setTotalBytesPersisted(acknowledgedTaskStateSnapshot.getStateSize()).build(), hasAckState ? acknowledgedTaskStateSnapshot : null, hasLocalState ? localTaskStateSnapshot : null);
        LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.taskName, this.checkpointMetaData.getCheckpointId(), asyncDurationMillis});
        LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", new Object[]{this.taskName, this.checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot});
    }

    private void reportAbortedSnapshotStats(long stateSize, long checkpointedSize) {
        CheckpointMetrics metrics = this.checkpointMetrics.setTotalBytesPersisted(stateSize).setBytesPersistedOfThisCheckpoint(checkpointedSize).buildIncomplete();
        LOG.trace("{} - report failed checkpoint stats: {} {}", new Object[]{this.taskName, this.checkpointMetaData.getCheckpointId(), metrics});
        this.taskEnvironment.getTaskStateManager().reportIncompleteTaskStateSnapshots(this.checkpointMetaData, metrics);
    }

    private void handleExecutionException(Exception e) {
        boolean didCleanup = false;
        AsyncCheckpointState currentState = this.asyncCheckpointState.get();
        while (AsyncCheckpointState.DISCARDED != currentState) {
            if (this.asyncCheckpointState.compareAndSet(currentState, AsyncCheckpointState.DISCARDED)) {
                didCleanup = true;
                try {
                    this.cleanup();
                }
                catch (Exception cleanupException) {
                    e.addSuppressed(cleanupException);
                }
                Exception checkpointException = new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.taskName + ".", e);
                if (this.isTaskRunning.get().booleanValue()) {
                    try {
                        Optional underlyingCheckpointException = ExceptionUtils.findThrowable((Throwable)checkpointException, CheckpointException.class);
                        CheckpointFailureReason reportedFailureReason = underlyingCheckpointException.map(exception -> exception.getCheckpointFailureReason()).orElse(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
                        this.taskEnvironment.declineCheckpoint(this.checkpointMetaData.getCheckpointId(), new CheckpointException(reportedFailureReason, (Throwable)checkpointException));
                    }
                    catch (Exception unhandled) {
                        AsynchronousException asyncException = new AsynchronousException(unhandled);
                        this.asyncExceptionHandler.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
                    }
                } else {
                    LOG.info("Ignore decline of checkpoint {} as task is not running anymore.", (Object)this.checkpointMetaData.getCheckpointId());
                }
                currentState = AsyncCheckpointState.DISCARDED;
                continue;
            }
            currentState = this.asyncCheckpointState.get();
        }
        if (!didCleanup) {
            LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", (Throwable)e);
        }
    }

    @Override
    public void close() {
        if (this.asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.DISCARDED)) {
            try {
                Tuple2<Long, Long> tuple = this.cleanup();
                this.reportAbortedSnapshotStats((Long)tuple.f0, (Long)tuple.f1);
            }
            catch (Exception cleanupException) {
                LOG.warn("Could not properly clean up the async checkpoint runnable.", (Throwable)cleanupException);
            }
        } else {
            this.logFailedCleanupAttempt();
        }
    }

    long getCheckpointId() {
        return this.checkpointMetaData.getCheckpointId();
    }

    public CompletableFuture<Void> getFinishedFuture() {
        return this.finishedFuture;
    }

    private Tuple2<Long, Long> cleanup() throws Exception {
        LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.taskName);
        Exception exception = null;
        long stateSize = 0L;
        long checkpointedSize = 0L;
        for (OperatorSnapshotFutures operatorSnapshotResult : this.operatorSnapshotsInProgress.values()) {
            if (operatorSnapshotResult == null) continue;
            try {
                Tuple2<Long, Long> tuple2 = operatorSnapshotResult.cancel();
                stateSize += ((Long)tuple2.f0).longValue();
                checkpointedSize += ((Long)tuple2.f1).longValue();
            }
            catch (Exception cancelException) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)cancelException, exception);
            }
        }
        if (null != exception) {
            throw exception;
        }
        return Tuple2.of((Object)stateSize, (Object)checkpointedSize);
    }

    private void logFailedCleanupAttempt() {
        LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", (Object)this.taskName, (Object)this.checkpointMetaData.getCheckpointId());
    }

    private static class SnapshotsFinalizeResult {
        final TaskStateSnapshot jobManagerTaskOperatorSubtaskStates;
        final TaskStateSnapshot localTaskOperatorSubtaskStates;
        final long bytesPersistedDuringAlignment;

        public SnapshotsFinalizeResult(TaskStateSnapshot jobManagerTaskOperatorSubtaskStates, TaskStateSnapshot localTaskOperatorSubtaskStates, long bytesPersistedDuringAlignment) {
            this.jobManagerTaskOperatorSubtaskStates = jobManagerTaskOperatorSubtaskStates;
            this.localTaskOperatorSubtaskStates = localTaskOperatorSubtaskStates;
            this.bytesPersistedDuringAlignment = bytesPersistedDuringAlignment;
        }
    }

    static enum AsyncCheckpointState {
        RUNNING,
        DISCARDED,
        COMPLETED;

    }
}

