/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.StoppedDispatcherLeaderProcess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DefaultDispatcherRunner
implements DispatcherRunner,
LeaderContender {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunner.class);
    private final Object lock = new Object();
    private final LeaderElection leaderElection;
    private final FatalErrorHandler fatalErrorHandler;
    private final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory;
    private final CompletableFuture<Void> terminationFuture;
    private final CompletableFuture<ApplicationStatus> shutDownFuture;
    private boolean running;
    private DispatcherLeaderProcess dispatcherLeaderProcess;
    private CompletableFuture<Void> previousDispatcherLeaderProcessTerminationFuture;

    private DefaultDispatcherRunner(LeaderElection leaderElection, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) {
        this.leaderElection = leaderElection;
        this.fatalErrorHandler = fatalErrorHandler;
        this.dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactory;
        this.terminationFuture = new CompletableFuture();
        this.shutDownFuture = new CompletableFuture();
        this.running = true;
        this.dispatcherLeaderProcess = StoppedDispatcherLeaderProcess.INSTANCE;
        this.previousDispatcherLeaderProcessTerminationFuture = CompletableFuture.completedFuture(null);
    }

    void start() throws Exception {
        this.leaderElection.startLeaderElection(this);
    }

    @Override
    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.running) {
                return this.terminationFuture;
            }
            this.running = false;
        }
        CompletableFuture<Void> stopLeaderElectionServiceFuture = this.stopLeaderElectionService();
        this.stopDispatcherLeaderProcess();
        FutureUtils.forward(this.previousDispatcherLeaderProcessTerminationFuture, this.terminationFuture);
        return FutureUtils.completeAll(Arrays.asList(this.terminationFuture, stopLeaderElectionServiceFuture));
    }

    @Override
    public void grantLeadership(UUID leaderSessionID) {
        this.runActionIfRunning(() -> {
            LOG.info("{} was granted leadership with leader id {}. Creating new {}.", new Object[]{this.getClass().getSimpleName(), leaderSessionID, DispatcherLeaderProcess.class.getSimpleName()});
            this.startNewDispatcherLeaderProcess(leaderSessionID);
        });
    }

    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
        this.stopDispatcherLeaderProcess();
        DispatcherLeaderProcess newDispatcherLeaderProcess = this.dispatcherLeaderProcess = this.createNewDispatcherLeaderProcess(leaderSessionID);
        FutureUtils.assertNoException((CompletableFuture)this.previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
    }

    private void stopDispatcherLeaderProcess() {
        CompletableFuture terminationFuture = this.dispatcherLeaderProcess.closeAsync();
        this.previousDispatcherLeaderProcessTerminationFuture = FutureUtils.completeAll(Arrays.asList(this.previousDispatcherLeaderProcessTerminationFuture, terminationFuture));
    }

    private DispatcherLeaderProcess createNewDispatcherLeaderProcess(UUID leaderSessionID) {
        DispatcherLeaderProcess newDispatcherLeaderProcess = this.dispatcherLeaderProcessFactory.create(leaderSessionID);
        this.forwardShutDownFuture(newDispatcherLeaderProcess);
        this.forwardConfirmLeaderSessionFuture(leaderSessionID, newDispatcherLeaderProcess);
        return newDispatcherLeaderProcess;
    }

    private void forwardShutDownFuture(DispatcherLeaderProcess newDispatcherLeaderProcess) {
        newDispatcherLeaderProcess.getShutDownFuture().whenComplete((applicationStatus, throwable) -> {
            Object object = this.lock;
            synchronized (object) {
                if (this.running && this.dispatcherLeaderProcess == newDispatcherLeaderProcess) {
                    if (throwable != null) {
                        this.shutDownFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        this.shutDownFuture.complete((ApplicationStatus)((Object)applicationStatus));
                    }
                }
            }
        });
    }

    private void forwardConfirmLeaderSessionFuture(UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {
        FutureUtils.assertNoException((CompletableFuture)newDispatcherLeaderProcess.getLeaderAddressFuture().thenCompose(leaderAddress -> this.leaderElection.confirmLeadershipAsync(leaderSessionID, (String)leaderAddress)));
    }

    @Override
    public void revokeLeadership() {
        this.runActionIfRunning(() -> {
            LOG.info("{} was revoked the leadership with leader id {}. Stopping the {}.", new Object[]{this.getClass().getSimpleName(), this.dispatcherLeaderProcess.getLeaderSessionId(), DispatcherLeaderProcess.class.getSimpleName()});
            this.stopDispatcherLeaderProcess();
        });
    }

    private CompletableFuture<Void> stopLeaderElectionService() {
        try {
            this.leaderElection.close();
            return FutureUtils.completedVoidFuture();
        }
        catch (Exception e) {
            return FutureUtils.completedExceptionally((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runActionIfRunning(Runnable runnable) {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                runnable.run();
            } else {
                LOG.debug("Ignoring action because {} has already been stopped.", (Object)this.getClass().getSimpleName());
            }
        }
    }

    @Override
    public void handleError(Exception exception) {
        this.fatalErrorHandler.onFatalError((Throwable)new FlinkException(String.format("Exception during leader election of %s occurred.", this.getClass().getSimpleName()), (Throwable)exception));
    }

    public static DispatcherRunner create(LeaderElection leaderElection, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
        DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
        dispatcherRunner.start();
        return dispatcherRunner;
    }
}

