/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AllVerticesIterator;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionGraph {
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private final JobID jobID;
    private final String jobName;
    private final Configuration jobConfiguration;
    private final ClassLoader userClassLoader;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final Map<ChannelID, ExecutionEdge> edges = new HashMap<ChannelID, ExecutionEdge>();
    private final ExecutorService executor;
    private final List<BlobKey> requiredJarFiles;
    private final List<JobStatusListener> jobStatusListeners;
    private final List<ExecutionListener> executionListeners;
    private final long[] stateTimestamps;
    private final Object progressLock = new Object();
    private int nextVertexToFinish;
    private int numberOfRetriesLeft;
    private long delayBeforeRetrying;
    private volatile JobStatus state = JobStatus.CREATED;
    private volatile Throwable failureCause;
    private Scheduler scheduler;
    private boolean allowQueuedScheduling = true;

    public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
        this(jobId, jobName, jobConfig, new ArrayList<BlobKey>(), null);
    }

    public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, List<BlobKey> requiredJarFiles, ExecutorService executor) {
        this(jobId, jobName, jobConfig, requiredJarFiles, Thread.currentThread().getContextClassLoader(), null);
    }

    public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, List<BlobKey> requiredJarFiles, ClassLoader userClassLoader, ExecutorService executor) {
        if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
            throw new NullPointerException();
        }
        this.jobID = jobId;
        this.jobName = jobName;
        this.jobConfiguration = jobConfig;
        this.userClassLoader = userClassLoader;
        this.executor = executor;
        this.tasks = new ConcurrentHashMap();
        this.intermediateResults = new ConcurrentHashMap();
        this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
        this.currentExecutions = new ConcurrentHashMap();
        this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
        this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.requiredJarFiles = requiredJarFiles;
    }

    public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
        if (numberOfRetriesLeft < -1) {
            throw new IllegalArgumentException();
        }
        this.numberOfRetriesLeft = numberOfRetriesLeft;
    }

    public int getNumberOfRetriesLeft() {
        return this.numberOfRetriesLeft;
    }

    public void setDelayBeforeRetrying(long delayBeforeRetrying) {
        if (delayBeforeRetrying < 0L) {
            throw new IllegalArgumentException("Delay before retry must be non-negative.");
        }
        this.delayBeforeRetrying = delayBeforeRetrying;
    }

    public long getDelayBeforeRetrying() {
        return this.delayBeforeRetrying;
    }

    public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d vertices and %d intermediate results.", topologiallySorted.size(), this.tasks.size(), this.intermediateResults.size()));
        }
        long createTimestamp = System.currentTimeMillis();
        for (AbstractJobVertex jobVertex : topologiallySorted) {
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, createTimestamp);
            ejv.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask));
            }
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet == null) continue;
                throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet));
            }
            this.verticesInCreationOrder.add(ejv);
        }
    }

    public List<BlobKey> getRequiredJarFiles() {
        return this.requiredJarFiles;
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public String getJobName() {
        return this.jobName;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public ExecutionJobVertex getJobVertex(JobVertexID id) {
        return this.tasks.get(id);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int numElements = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>(){

            @Override
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>(){
                    private int pos = 0;

                    @Override
                    public boolean hasNext() {
                        return this.pos < numElements;
                    }

                    @Override
                    public ExecutionJobVertex next() {
                        if (this.hasNext()) {
                            return (ExecutionJobVertex)ExecutionGraph.this.verticesInCreationOrder.get(this.pos++);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>(){

            @Override
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    public long getStatusTimestamp(JobStatus status) {
        return this.stateTimestamps[status.ordinal()];
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean allowed) {
        this.allowQueuedScheduling = allowed;
    }

    public void scheduleForExecution(Scheduler scheduler) throws JobException {
        if (scheduler == null) {
            throw new IllegalArgumentException("Scheduler must not be null.");
        }
        if (this.scheduler != null && this.scheduler != scheduler) {
            throw new IllegalArgumentException("Cann not use different schedulers for the same job");
        }
        if (this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            this.scheduler = scheduler;
            for (ExecutionJobVertex ejv : this.tasks.values()) {
                if (!ejv.getJobVertex().isInputVertex()) continue;
                ejv.scheduleAll(scheduler, this.allowQueuedScheduling);
            }
        } else {
            throw new IllegalStateException("Job may only be scheduled from state " + (Object)((Object)JobStatus.CREATED));
        }
    }

    public void cancel() {
        JobStatus current;
        while ((current = this.state) == JobStatus.RUNNING || current == JobStatus.CREATED) {
            if (!this.transitionState(current, JobStatus.CANCELLING)) continue;
            for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                ejv.cancel();
            }
            return;
        }
    }

    public void fail(Throwable t) {
        JobStatus current;
        do {
            if ((current = this.state) != JobStatus.FAILED && current != JobStatus.FAILING) continue;
            return;
        } while (!this.transitionState(current, JobStatus.FAILING, t));
        this.failureCause = t;
        for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
            ejv.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForJobEnd(long timeout) throws InterruptedException {
        Object object = this.progressLock;
        synchronized (object) {
            long deadline;
            long now = System.currentTimeMillis();
            long l = deadline = timeout == 0L ? Long.MAX_VALUE : now + timeout;
            while (now < deadline && !this.state.isTerminalState()) {
                this.progressLock.wait(deadline - now);
                now = System.currentTimeMillis();
            }
        }
    }

    public void waitForJobEnd() throws InterruptedException {
        this.waitForJobEnd(0L);
    }

    private boolean transitionState(JobStatus current, JobStatus newState) {
        return this.transitionState(current, newState, null);
    }

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        if (STATE_UPDATER.compareAndSet(this, current, newState)) {
            this.stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
            this.notifyJobStatusChange(newState, error);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void jobVertexInFinalState(ExecutionJobVertex ev) {
        Object object = this.progressLock;
        synchronized (object) {
            int nextPos = this.nextVertexToFinish;
            if (nextPos >= this.verticesInCreationOrder.size()) {
                return;
            }
            if (this.verticesInCreationOrder.get(nextPos) == ev) {
                while (++nextPos < this.verticesInCreationOrder.size() && this.verticesInCreationOrder.get(nextPos).isInFinalState()) {
                }
                this.nextVertexToFinish = nextPos;
                if (nextPos == this.verticesInCreationOrder.size()) {
                    JobStatus current;
                    while (!((current = this.state) == JobStatus.RUNNING && this.transitionState(current, JobStatus.FINISHED) || current == JobStatus.CANCELLING && this.transitionState(current, JobStatus.CANCELED))) {
                        if (current == JobStatus.FAILING) {
                            if (this.numberOfRetriesLeft > 0 && this.transitionState(current, JobStatus.RESTARTING)) {
                                --this.numberOfRetriesLeft;
                                this.execute(new Runnable(){

                                    @Override
                                    public void run() {
                                        try {
                                            Thread.sleep(ExecutionGraph.this.delayBeforeRetrying);
                                        }
                                        catch (InterruptedException interruptedException) {
                                            // empty catch block
                                        }
                                        ExecutionGraph.this.restart();
                                    }
                                });
                                break;
                            }
                            if (this.numberOfRetriesLeft <= 0 && this.transitionState(current, JobStatus.FAILED, this.failureCause)) break;
                        }
                        if (current != JobStatus.CANCELED && current != JobStatus.CREATED && current != JobStatus.FINISHED) continue;
                        this.fail(new Exception("ExecutionGraph went into final state from state " + (Object)((Object)current)));
                    }
                    this.progressLock.notifyAll();
                }
            }
        }
    }

    public boolean updateState(TaskExecutionState state) {
        Execution attempt = this.currentExecutions.get(state.getID());
        if (attempt != null) {
            switch (state.getExecutionState()) {
                case FINISHED: {
                    attempt.markFinished();
                    return true;
                }
                case CANCELED: {
                    attempt.cancelingComplete();
                    return true;
                }
                case FAILED: {
                    attempt.markFailed(state.getError());
                    return true;
                }
            }
            attempt.fail(new Exception("TaskManager sent illegal state update: " + (Object)((Object)state.getExecutionState())));
            return false;
        }
        return false;
    }

    public ConnectionInfoLookupResponse lookupConnectionInfoAndDeployReceivers(InstanceConnectionInfo caller, ChannelID sourceChannelID) {
        ExecutionEdge edge = this.edges.get(sourceChannelID);
        if (edge == null) {
            LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
            this.fail(new Exception("Channels are not correctly registered"));
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        if (sourceChannelID.equals(edge.getInputChannelId())) {
            ExecutionVertex targetVertex = edge.getSource().getProducer();
            ExecutionState executionState = targetVertex.getExecutionState();
            if (executionState == ExecutionState.RUNNING) {
                Instance location = targetVertex.getCurrentAssignedResource().getInstance();
                if (location.getInstanceConnectionInfo().equals(caller)) {
                    return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelId());
                }
                InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
                InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
                int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
            }
            if (executionState == ExecutionState.FINISHED) {
                LOG.error("Receiver " + targetVertex + " set to FINISHED even though data is pending");
                this.fail(new Exception("Channels are not correctly registered"));
                return ConnectionInfoLookupResponse.createReceiverNotFound();
            }
            if (executionState == ExecutionState.FAILED || executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING) {
                return ConnectionInfoLookupResponse.createJobIsAborting();
            }
            LOG.error("Channel lookup (backwards) - sender " + targetVertex + " found in inconsistent state " + (Object)((Object)executionState));
            this.fail(new Exception("Channels are not correctly registered"));
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        ExecutionVertex targetVertex = edge.getTarget();
        ExecutionState executionState = targetVertex.getExecutionState();
        if (executionState == ExecutionState.RUNNING) {
            Instance location = targetVertex.getCurrentAssignedResource().getInstance();
            if (location.getInstanceConnectionInfo().equals(caller)) {
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelId());
            }
            InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
            InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
            int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
            return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
        }
        if (executionState == ExecutionState.DEPLOYING || executionState == ExecutionState.SCHEDULED) {
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        if (executionState == ExecutionState.CREATED) {
            try {
                edge.getTarget().scheduleForExecution(this.scheduler, false);
                return ConnectionInfoLookupResponse.createReceiverNotReady();
            }
            catch (JobException e) {
                this.fail(new Exception("Cannot schedule the receivers, not enough resources", e));
                return ConnectionInfoLookupResponse.createJobIsAborting();
            }
        }
        if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING || executionState == ExecutionState.FAILED) {
            return ConnectionInfoLookupResponse.createJobIsAborting();
        }
        String message = "Channel lookup (forward) - receiver " + targetVertex + " found in inconsistent state " + (Object)((Object)executionState);
        LOG.error(message);
        this.fail(new Exception(message));
        return ConnectionInfoLookupResponse.createReceiverNotFound();
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    void registerExecution(Execution exec) {
        Execution previous = this.currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
        if (previous != null) {
            this.fail(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
        }
    }

    void deregisterExecution(Execution exec) {
        Execution contained = this.currentExecutions.remove(exec.getAttemptId());
        if (contained != null && contained != exec) {
            this.fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
        }
    }

    void registerExecutionEdge(ExecutionEdge edge) {
        ChannelID target = edge.getInputChannelId();
        ChannelID source = edge.getOutputChannelId();
        this.edges.put(source, edge);
        this.edges.put(target, edge);
    }

    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        this.jobStatusListeners.add(jobStatusListener);
    }

    public void registerExecutionListener(ExecutionListener executionListener) {
        this.executionListeners.add(executionListener);
    }

    private void notifyJobStatusChange(JobStatus newState, Throwable error) {
        if (this.jobStatusListeners.size() > 0) {
            String message = error == null ? null : ExceptionUtils.stringifyException((Throwable)error);
            for (JobStatusListener listener : this.jobStatusListeners) {
                try {
                    listener.jobStatusHasChanged(this, newState, message);
                }
                catch (Throwable t) {
                    LOG.error("Notification of job status change caused an error.", t);
                }
            }
        }
    }

    void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable error) {
        if (this.executionListeners.size() > 0) {
            String message = error == null ? null : ExceptionUtils.stringifyException((Throwable)error);
            for (ExecutionListener listener : this.executionListeners) {
                try {
                    listener.executionStateChanged(this.jobID, vertexId, subtask, executionId, newExecutionState, message);
                }
                catch (Throwable t) {
                    LOG.error("Notification of execution state change caused an error.", t);
                }
            }
        }
        if (newExecutionState == ExecutionState.FAILED) {
            this.fail(error);
        }
    }

    public void execute(Runnable action) {
        if (this.executor != null) {
            this.executor.submit(action);
        } else {
            action.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restart() {
        try {
            if (this.state == JobStatus.FAILED) {
                this.transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
            }
            Object object = this.progressLock;
            synchronized (object) {
                if (this.state != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                if (this.scheduler == null) {
                    throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
                }
                this.currentExecutions.clear();
                this.edges.clear();
                for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
                    jv.resetForNewExecution();
                }
                for (int i = 0; i < this.stateTimestamps.length; ++i) {
                    this.stateTimestamps[i] = 0L;
                }
                this.nextVertexToFinish = 0;
                this.transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
            }
            this.scheduleForExecution(this.scheduler);
        }
        catch (Throwable t) {
            this.fail(t);
        }
    }
}

