/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.SubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.types.Either;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

public class ExecutionJobVertex
implements AccessExecutionJobVertex,
Archiveable<ArchivedExecutionJobVertex> {
    private static final Logger LOG = DefaultExecutionGraph.LOG;
    private final Object stateMonitor;
    private final InternalExecutionGraphAccessor graph;
    private final JobVertex jobVertex;
    private final ExecutionVertex[] taskVertices;
    private final IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final VertexParallelismInformation parallelismInfo;
    private final SlotSharingGroup slotSharingGroup;
    @Nullable
    private final CoLocationGroup coLocationGroup;
    private final InputSplit[] inputSplits;
    private final ResourceProfile resourceProfile;
    private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey;
    private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
    private InputSplitAssigner splitAssigner;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public ExecutionJobVertex(InternalExecutionGraphAccessor graph, JobVertex jobVertex, int maxPriorAttemptsHistoryLength, Time timeout, long createTimestamp, VertexParallelismInformation parallelismInfo, SubtaskAttemptNumberStore initialAttemptCounts) throws JobException {
        block17: {
            int i;
            this.stateMonitor = new Object();
            this.taskInformationOrBlobKey = null;
            if (graph == null || jobVertex == null) {
                throw new NullPointerException();
            }
            this.graph = graph;
            this.jobVertex = jobVertex;
            this.parallelismInfo = parallelismInfo;
            if (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {
                throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), this.parallelismInfo.getParallelism(), this.parallelismInfo.getMaxParallelism()));
            }
            this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
            this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];
            this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
            this.slotSharingGroup = (SlotSharingGroup)Preconditions.checkNotNull((Object)jobVertex.getSlotSharingGroup());
            this.coLocationGroup = jobVertex.getCoLocationGroup();
            this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
            for (i = 0; i < jobVertex.getProducedDataSets().size(); ++i) {
                IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
                this.producedDataSets[i] = new IntermediateResult(result.getId(), this, this.parallelismInfo.getParallelism(), result.getResultType());
            }
            for (i = 0; i < this.parallelismInfo.getParallelism(); ++i) {
                ExecutionVertex vertex;
                this.taskVertices[i] = vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength, initialAttemptCounts.getAttemptCount(i));
            }
            for (IntermediateResult ir : this.producedDataSets) {
                if (ir.getNumberOfAssignedPartitions() == this.parallelismInfo.getParallelism()) continue;
                throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
            }
            List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders = this.getJobVertex().getOperatorCoordinators();
            if (coordinatorProviders.isEmpty()) {
                this.operatorCoordinators = Collections.emptyList();
            } else {
                ArrayList<OperatorCoordinatorHolder> coordinators = new ArrayList<OperatorCoordinatorHolder>(coordinatorProviders.size());
                try {
                    for (SerializedValue<OperatorCoordinator.Provider> provider : coordinatorProviders) {
                        coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));
                    }
                }
                catch (Exception | LinkageError e) {
                    IOUtils.closeAllQuietly(coordinators);
                    throw new JobException("Cannot instantiate the coordinator for operator " + this.getName(), e);
                }
                this.operatorCoordinators = Collections.unmodifiableList(coordinators);
            }
            try {
                InputSplitSource<?> splitSource = jobVertex.getInputSplitSource();
                if (splitSource != null) {
                    Thread currentThread = Thread.currentThread();
                    ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
                    currentThread.setContextClassLoader(graph.getUserClassLoader());
                    try {
                        this.inputSplits = splitSource.createInputSplits(this.parallelismInfo.getParallelism());
                        if (this.inputSplits != null) {
                            this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
                        }
                        break block17;
                    }
                    finally {
                        currentThread.setContextClassLoader(oldContextClassLoader);
                    }
                }
                this.inputSplits = null;
            }
            catch (Throwable t) {
                throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
            }
        }
    }

    public List<OperatorIDPair> getOperatorIDs() {
        return this.jobVertex.getOperatorIDs();
    }

    public void setMaxParallelism(int maxParallelism) {
        this.parallelismInfo.setMaxParallelism(maxParallelism);
    }

    public InternalExecutionGraphAccessor getGraph() {
        return this.graph;
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    @Override
    public String getName() {
        return this.getJobVertex().getName();
    }

    @Override
    public int getParallelism() {
        return this.parallelismInfo.getParallelism();
    }

    @Override
    public int getMaxParallelism() {
        return this.parallelismInfo.getMaxParallelism();
    }

    @Override
    public ResourceProfile getResourceProfile() {
        return this.resourceProfile;
    }

    public boolean canRescaleMaxParallelism(int desiredMaxParallelism) {
        return this.parallelismInfo.canRescaleMaxParallelism(desiredMaxParallelism);
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    @Override
    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        return this.splitAssigner;
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    @Nullable
    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
        return this.operatorCoordinators;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
        Object object = this.stateMonitor;
        synchronized (object) {
            if (this.taskInformationOrBlobKey == null) {
                BlobWriter blobWriter = this.graph.getBlobWriter();
                TaskInformation taskInformation = new TaskInformation(this.jobVertex.getID(), this.jobVertex.getName(), this.parallelismInfo.getParallelism(), this.parallelismInfo.getMaxParallelism(), this.jobVertex.getInvokableClassName(), this.jobVertex.getConfiguration());
                this.taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(taskInformation, this.getJobId(), blobWriter);
            }
            return this.taskInformationOrBlobKey;
        }
    }

    @Override
    public ExecutionState getAggregateState() {
        int[] num = new int[ExecutionState.values().length];
        for (ExecutionVertex vertex : this.taskVertices) {
            int n = vertex.getExecutionState().ordinal();
            num[n] = num[n] + 1;
        }
        return ExecutionJobVertex.getAggregateJobVertexState(num, this.parallelismInfo.getParallelism());
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), inputs.size()));
        }
        for (int num = 0; num < inputs.size(); ++num) {
            IntermediateResult ires;
            JobEdge edge = inputs.get(num);
            if (LOG.isDebugEnabled()) {
                if (edge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
                }
            }
            if ((ires = intermediateDataSets.get(edge.getSourceId())) == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + edge.getSourceId());
            }
            this.inputs.add(ires);
            EdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());
        }
    }

    public void cancel() {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.cancel();
        }
    }

    public CompletableFuture<Void> cancelWithFuture() {
        return FutureUtils.waitForAll(this.mapExecutionVertices(ExecutionVertex::cancel));
    }

    public CompletableFuture<Void> suspend() {
        return FutureUtils.waitForAll(this.mapExecutionVertices(ExecutionVertex::suspend));
    }

    @Nonnull
    private Collection<CompletableFuture<?>> mapExecutionVertices(Function<ExecutionVertex, CompletableFuture<?>> mapFunction) {
        return Arrays.stream(this.getTaskVertices()).map(mapFunction).collect(Collectors.toList());
    }

    public void fail(Throwable t) {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.fail(t);
        }
    }

    @Override
    public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.taskVertices) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
    }

    public ArchivedExecutionJobVertex archive() {
        return new ArchivedExecutionJobVertex(this);
    }

    public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
        if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
            throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
        }
        if (verticesPerState[ExecutionState.FAILED.ordinal()] > 0) {
            return ExecutionState.FAILED;
        }
        if (verticesPerState[ExecutionState.CANCELING.ordinal()] > 0) {
            return ExecutionState.CANCELING;
        }
        if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
            return ExecutionState.CANCELED;
        }
        if (verticesPerState[ExecutionState.INITIALIZING.ordinal()] > 0) {
            return ExecutionState.INITIALIZING;
        }
        if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
            return ExecutionState.RUNNING;
        }
        if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
            return verticesPerState[ExecutionState.FINISHED.ordinal()] == parallelism ? ExecutionState.FINISHED : ExecutionState.RUNNING;
        }
        return ExecutionState.CREATED;
    }
}

