/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.executiongraph.GraphConversionException;
import org.apache.flink.runtime.executiongraph.InternalJobStatus;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.DummyInstance;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.DeploymentManager;
import org.apache.flink.runtime.jobmanager.EventCollector;
import org.apache.flink.runtime.jobmanager.JobManagerUtils;
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulingException;
import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager;
import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.profiling.JobManagerProfiler;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.taskmanager.AbstractTaskResult;
import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
import org.apache.flink.runtime.taskmanager.TaskCancelResult;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskKillResult;
import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.topology.NetworkTopology;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

public class JobManager
implements DeploymentManager,
ExtendedManagementProtocol,
InputSplitProviderProtocol,
JobManagerProtocol,
ChannelLookupProtocol,
JobStatusListener,
AccumulatorProtocol {
    private static final Log LOG = LogFactory.getLog(JobManager.class);
    private final Server jobManagerServer;
    private final JobManagerProfiler profiler;
    private final EventCollector eventCollector;
    private final ArchiveListener archive;
    private final InputSplitManager inputSplitManager;
    private final DefaultScheduler scheduler;
    private AccumulatorManager accumulatorManager;
    private InstanceManager instanceManager;
    private final int recommendedClientPollingInterval;
    private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
    private static final int FAILURE_RETURN_CODE = 1;
    private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
    private volatile boolean isShutDown;
    private WebInfoServer server;

    public JobManager(ExecutionMode executionMode) throws Exception {
        String ipcAddressString = GlobalConfiguration.getString((String)"jobmanager.rpc.address", null);
        InetAddress ipcAddress = null;
        if (ipcAddressString != null) {
            try {
                ipcAddress = InetAddress.getByName(ipcAddressString);
            }
            catch (UnknownHostException e) {
                throw new Exception("Cannot convert " + ipcAddressString + " to an IP address: " + e.getMessage(), e);
            }
        }
        int ipcPort = GlobalConfiguration.getInteger((String)"jobmanager.rpc.port", (int)6123);
        this.recommendedClientPollingInterval = GlobalConfiguration.getInteger((String)"jobclient.polling.interval", (int)2);
        this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
        int archived_items = GlobalConfiguration.getInteger((String)"jobmanager.web.history", (int)5);
        if (archived_items > 0) {
            this.archive = new MemoryArchivist(archived_items);
            this.eventCollector.registerArchivist(this.archive);
        } else {
            this.archive = null;
        }
        this.accumulatorManager = new AccumulatorManager(Math.min(1, archived_items));
        this.inputSplitManager = new InputSplitManager();
        InetSocketAddress rpcServerAddress = new InetSocketAddress(ipcAddress, ipcPort);
        try {
            int handlerCount = GlobalConfiguration.getInteger((String)"jobmanager.rpc.numhandler", (int)8);
            this.jobManagerServer = RPC.getServer(this, rpcServerAddress.getHostName(), rpcServerAddress.getPort(), handlerCount);
            this.jobManagerServer.start();
        }
        catch (IOException e) {
            throw new Exception("Cannot start RPC server: " + e.getMessage(), e);
        }
        LOG.info((Object)("Starting job manager in " + (Object)((Object)executionMode) + " mode"));
        String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
        LOG.info((Object)("Trying to load " + instanceManagerClassName + " as instance manager"));
        this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
        if (this.instanceManager == null) {
            throw new Exception("Unable to load instance manager " + instanceManagerClassName);
        }
        String schedulerClassName = JobManagerUtils.getSchedulerClassName(executionMode);
        LOG.info((Object)("Trying to load " + schedulerClassName + " as scheduler"));
        this.scheduler = JobManagerUtils.loadScheduler(schedulerClassName, this, this.instanceManager);
        if (this.scheduler == null) {
            throw new Exception("Unable to load scheduler " + schedulerClassName);
        }
        if (GlobalConfiguration.getBoolean((String)"jobmanager.profiling.enable", (boolean)false)) {
            String profilerClassName = GlobalConfiguration.getString((String)"jobmanager.profiling.classname", (String)"org.apache.flink.runtime.profiling.impl.JobManagerProfilerImpl");
            this.profiler = ProfilingUtils.loadJobManagerProfiler(profilerClassName, ipcAddress);
            if (this.profiler == null) {
                throw new Exception("Cannot load profiler");
            }
        } else {
            this.profiler = null;
            LOG.debug((Object)"Profiler disabled");
        }
    }

    public void shutdown() {
        if (!this.isShutdownInProgress.compareAndSet(false, true)) {
            return;
        }
        if (this.instanceManager != null) {
            this.instanceManager.shutdown();
        }
        if (this.profiler != null) {
            this.profiler.shutdown();
        }
        if (this.jobManagerServer != null) {
            this.jobManagerServer.stop();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.debug((Object)e);
            }
        }
        if (this.eventCollector != null) {
            this.eventCollector.shutdown();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        this.isShutDown = true;
        LOG.debug((Object)"Shutdown of job manager completed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        Object w;
        if (System.getProperty("log4j.configuration") == null) {
            Logger root = Logger.getRootLogger();
            root.removeAllAppenders();
            PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
            ConsoleAppender appender = new ConsoleAppender((Layout)layout, "System.err");
            root.addAppender((Appender)appender);
            root.setLevel(Level.INFO);
        }
        try {
            JobManager jobManager = JobManager.initialize(args);
            jobManager.startInfoServer();
        }
        catch (Exception e) {
            LOG.fatal((Object)e.getMessage(), (Throwable)e);
            System.exit(1);
        }
        Object object = w = new Object();
        synchronized (object) {
            try {
                w.wait();
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
    }

    public static JobManager initialize(String[] args) throws Exception {
        OptionBuilder.withArgName((String)"config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription((String)"Specify configuration directory.");
        Option configDirOpt = OptionBuilder.create((String)"configDir");
        OptionBuilder.withArgName((String)"execution mode");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription((String)"Specify execution mode.");
        Option executionModeOpt = OptionBuilder.create((String)"executionMode");
        Options options = new Options();
        options.addOption(configDirOpt);
        options.addOption(executionModeOpt);
        GnuParser parser = new GnuParser();
        CommandLine line = null;
        try {
            line = parser.parse(options, args);
        }
        catch (ParseException e) {
            LOG.error((Object)("CLI Parsing failed. Reason: " + e.getMessage()));
            System.exit(1);
        }
        String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
        String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");
        ExecutionMode executionMode = null;
        if ("local".equals(executionModeName)) {
            executionMode = ExecutionMode.LOCAL;
        } else if ("cluster".equals(executionModeName)) {
            executionMode = ExecutionMode.CLUSTER;
        } else {
            System.err.println("Unrecognized execution mode: " + executionModeName);
            System.exit(1);
        }
        EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
        GlobalConfiguration.loadConfiguration((String)configDir);
        JobManager jobManager = new JobManager(executionMode);
        Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
        if (configDir != null && new File(configDir).isDirectory()) {
            infoserverConfig.setString("flink.base.dir.path", configDir + "/..");
        }
        GlobalConfiguration.includeConfiguration((Configuration)infoserverConfig);
        return jobManager;
    }

    @Override
    public JobSubmissionResult submitJob(JobGraph job) throws IOException {
        try {
            ExecutionGraph eg;
            AbstractJobVertex jv;
            if (job == null) {
                return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Submitted job " + job.getName() + " is not null"));
            }
            if ((jv = job.findVertexWithNullEdges()) != null) {
                JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Vertex " + jv.getName() + " has at least one null edge");
                return result;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Submitted job " + job.getName() + " has no null edges"));
            }
            if (!job.isWeaklyConnected()) {
                JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Job graph is not weakly connected");
                return result;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The graph of job " + job.getName() + " is weakly connected"));
            }
            if (!job.isAcyclic()) {
                JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Job graph is not a DAG");
                return result;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The graph of job " + job.getName() + " is acyclic"));
            }
            if ((jv = job.areVertexDegreesCorrect()) != null) {
                JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Degree of vertex " + jv.getName() + " is incorrect");
                return result;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("All vertices of job " + job.getName() + " have the correct degree"));
            }
            if (!job.isInstanceDependencyChainAcyclic()) {
                JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "The dependency chain for instance sharing contains a cycle");
                return result;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"The dependency chain for instance sharing is acyclic");
            }
            boolean jobRunsWithProfiling = false;
            if (this.profiler != null && job.getJobConfiguration().getBoolean("job.profiling.enable", true)) {
                jobRunsWithProfiling = true;
            }
            LOG.info((Object)("Creating initial execution graph from job graph " + job.getName()));
            try {
                eg = new ExecutionGraph(job, 1);
            }
            catch (GraphConversionException e) {
                if (e.getCause() == null) {
                    return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException((Throwable)e));
                }
                Throwable t = e.getCause();
                if (t instanceof FileNotFoundException) {
                    return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, t.getMessage());
                }
                return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException((Throwable)t));
            }
            if (this.eventCollector != null) {
                this.eventCollector.registerJob(eg, jobRunsWithProfiling, System.currentTimeMillis());
            }
            if (jobRunsWithProfiling) {
                this.profiler.registerProfilingJob(eg);
                if (this.eventCollector != null) {
                    this.profiler.registerForProfilingData(eg.getJobID(), this.eventCollector);
                }
            }
            this.inputSplitManager.registerJob(eg);
            eg.registerJobStatusListener(this);
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)("Scheduling job " + job.getName()));
            }
            try {
                this.scheduler.scheduleJob(eg);
            }
            catch (SchedulingException e) {
                this.unregisterJob(eg);
                JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException((Throwable)e));
                return result;
            }
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
        }
        catch (Throwable t) {
            LOG.error((Object)"Job submission failed.", t);
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException((Throwable)t));
        }
    }

    public InstanceManager getInstanceManager() {
        return this.instanceManager;
    }

    private void unregisterJob(ExecutionGraph executionGraph) {
        block5: {
            if (this.profiler != null && executionGraph.getJobConfiguration().getBoolean("job.profiling.enable", true)) {
                this.profiler.unregisterProfilingJob(executionGraph);
                if (this.eventCollector != null) {
                    this.profiler.unregisterFromProfilingData(executionGraph.getJobID(), this.eventCollector);
                }
            }
            if (this.inputSplitManager != null) {
                this.inputSplitManager.unregisterJob(executionGraph);
            }
            try {
                LibraryCacheManager.unregister(executionGraph.getJobID());
            }
            catch (IOException ioe) {
                if (!LOG.isWarnEnabled()) break block5;
                LOG.warn((Object)ioe);
            }
        }
    }

    @Override
    public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo) {
        if (this.instanceManager != null) {
            Runnable heartBeatRunnable = new Runnable(){

                @Override
                public void run() {
                    JobManager.this.instanceManager.reportHeartBeat(instanceConnectionInfo);
                }
            };
            this.executorService.execute(heartBeatRunnable);
        }
    }

    @Override
    public RegisterTaskManagerResult registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo, final HardwareDescription hardwareDescription, final IntegerRecord numberOfSlots) {
        if (this.instanceManager != null) {
            Runnable registerTaskManagerRunnable = new Runnable(){

                @Override
                public void run() {
                    JobManager.this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots.getValue());
                }
            };
            this.executorService.execute(registerTaskManagerRunnable);
            return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
        }
        return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.FAILURE);
    }

    @Override
    public void updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
        ExecutionGraph eg;
        if (executionState == null) {
            LOG.error((Object)"Received call to updateTaskExecutionState with executionState == null");
            return;
        }
        if (executionState.getExecutionState() == ExecutionState.FAILED) {
            LOG.error((Object)executionState.getDescription());
        }
        if ((eg = this.scheduler.getExecutionGraphByID(executionState.getJobID())) == null) {
            LOG.error((Object)("Cannot find execution graph for ID " + executionState.getJobID() + " to change state to " + (Object)((Object)executionState.getExecutionState())));
            return;
        }
        ExecutionVertex vertex = eg.getVertexByID(executionState.getID());
        if (vertex == null) {
            LOG.error((Object)("Cannot find vertex with ID " + executionState.getID() + " of job " + eg.getJobID() + " to change state to " + (Object)((Object)executionState.getExecutionState())));
            return;
        }
        vertex.updateExecutionStateAsynchronously(executionState.getExecutionState(), executionState.getDescription());
    }

    @Override
    public JobCancelResult cancelJob(JobID jobID) throws IOException {
        LOG.info((Object)("Trying to cancel job with ID " + jobID));
        final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
        if (eg == null) {
            return new JobCancelResult(AbstractJobResult.ReturnCode.ERROR, "Cannot find job with ID " + jobID);
        }
        Runnable cancelJobRunnable = new Runnable(){

            @Override
            public void run() {
                eg.updateJobStatus(InternalJobStatus.CANCELING, "Job canceled by user");
                TaskCancelResult cancelResult = JobManager.this.cancelJob(eg);
                if (cancelResult != null) {
                    LOG.error((Object)cancelResult.getDescription());
                }
            }
        };
        eg.executeCommand(cancelJobRunnable);
        LOG.info((Object)("Cancel of job " + jobID + " successfully triggered"));
        return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
    }

    private TaskCancelResult cancelJob(ExecutionGraph eg) {
        TaskCancelResult errorResult = null;
        ExecutionGraphIterator it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(), false, true);
        while (it.hasNext()) {
            ExecutionVertex vertex = (ExecutionVertex)it.next();
            TaskCancelResult result = vertex.cancelTask();
            if (result.getReturnCode() == AbstractTaskResult.ReturnCode.SUCCESS) continue;
            errorResult = result;
        }
        return errorResult;
    }

    @Override
    public JobProgressResult getJobProgress(JobID jobID) throws IOException {
        if (this.eventCollector == null) {
            return new JobProgressResult(AbstractJobResult.ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null);
        }
        SerializableArrayList<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
        this.eventCollector.getEventsForJob(jobID, eventList, false);
        return new JobProgressResult(AbstractJobResult.ReturnCode.SUCCESS, null, eventList);
    }

    @Override
    public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
        ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
        if (eg == null) {
            LOG.error((Object)("Cannot find execution graph to job ID " + jobID));
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        InternalJobStatus jobStatus = eg.getJobStatus();
        if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
            return ConnectionInfoLookupResponse.createJobIsAborting();
        }
        ExecutionEdge edge = eg.getEdgeByID(sourceChannelID);
        if (edge == null) {
            LOG.error((Object)("Cannot find execution edge associated with ID " + sourceChannelID));
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        if (sourceChannelID.equals(edge.getInputChannelID())) {
            ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
            Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
            if (assignedInstance == null) {
                LOG.error((Object)("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex() + " but no instance assigned"));
                return ConnectionInfoLookupResponse.createReceiverNotReady();
            }
            ExecutionState executionState = connectedVertex.getExecutionState();
            if (executionState == ExecutionState.FINISHED) {
                return ConnectionInfoLookupResponse.createReceiverNotFound();
            }
            if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
                return ConnectionInfoLookupResponse.createReceiverNotReady();
            }
            if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
            }
            InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
            InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
            return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
        }
        final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
        ExecutionState executionState = targetVertex.getExecutionState();
        if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
            if (executionState == ExecutionState.ASSIGNED) {
                Runnable command = new Runnable(){

                    @Override
                    public void run() {
                        JobManager.this.scheduler.deployAssignedVertices(targetVertex);
                    }
                };
                eg.executeCommand(command);
            }
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
        if (assignedInstance == null) {
            LOG.error((Object)("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned"));
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
            return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
        }
        InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
        InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
        return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
    }

    @Override
    public ManagementGraph getManagementGraph(JobID jobID) throws IOException {
        ManagementGraph mg = this.eventCollector.getManagementGraph(jobID);
        if (mg == null) {
            if (this.archive != null) {
                mg = this.archive.getManagementGraph(jobID);
            }
            if (mg == null) {
                throw new IOException("Cannot find job with ID " + jobID);
            }
        }
        return mg;
    }

    @Override
    public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
        if (this.instanceManager != null) {
            return this.instanceManager.getNetworkTopology(jobID);
        }
        return null;
    }

    @Override
    public IntegerRecord getRecommendedPollingInterval() throws IOException {
        return new IntegerRecord(this.recommendedClientPollingInterval);
    }

    @Override
    public List<RecentJobEvent> getRecentJobs() throws IOException {
        SerializableArrayList<RecentJobEvent> eventList = new SerializableArrayList<RecentJobEvent>();
        if (this.eventCollector == null) {
            throw new IOException("No instance of the event collector found");
        }
        this.eventCollector.getRecentJobs(eventList);
        return eventList;
    }

    @Override
    public List<AbstractEvent> getEvents(JobID jobID) throws IOException {
        SerializableArrayList<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
        if (this.eventCollector == null) {
            throw new IOException("No instance of the event collector found");
        }
        this.eventCollector.getEventsForJob(jobID, eventList, true);
        return eventList;
    }

    @Override
    public void killTask(JobID jobID, ManagementVertexID id) throws IOException {
        ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
        if (eg == null) {
            LOG.error((Object)("Cannot find execution graph for job " + jobID));
            return;
        }
        final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
        if (vertex == null) {
            LOG.error((Object)("Cannot find execution vertex with ID " + id));
            return;
        }
        LOG.info((Object)("Killing task " + vertex + " of job " + jobID));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                TaskKillResult result = vertex.killTask();
                if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
                    LOG.error((Object)result.getDescription());
                }
            }
        };
        eg.executeCommand(runnable);
    }

    @Override
    public void killInstance(StringRecord instanceName) throws IOException {
        final Instance instance = this.instanceManager.getInstanceByName(instanceName.toString());
        if (instance == null) {
            LOG.error((Object)("Cannot find instance with name " + instanceName + " to kill it"));
            return;
        }
        LOG.info((Object)("Killing task manager on instance " + instance));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    instance.killTaskManager();
                }
                catch (IOException ioe) {
                    LOG.error((Object)ioe);
                }
            }
        };
        this.executorService.execute(runnable);
    }

    public boolean isShutDown() {
        return this.isShutDown;
    }

    @Override
    public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage) {
        LOG.info((Object)("Status of job " + executionGraph.getJobName() + "(" + executionGraph.getJobID() + ")" + " changed to " + (Object)((Object)newJobStatus)));
        if (newJobStatus == InternalJobStatus.FAILING) {
            this.cancelJob(executionGraph);
        }
        if (newJobStatus == InternalJobStatus.CANCELED || newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED) {
            this.unregisterJob(executionGraph);
        }
    }

    @Override
    public void logBufferUtilization(JobID jobID) throws IOException {
        ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
        if (eg == null) {
            return;
        }
        final HashSet<Instance> allocatedInstance = new HashSet<Instance>();
        ExecutionGraphIterator it = new ExecutionGraphIterator(eg, true);
        while (it.hasNext()) {
            ExecutionVertex vertex = (ExecutionVertex)it.next();
            ExecutionState state = vertex.getExecutionState();
            if (state != ExecutionState.RUNNING && state != ExecutionState.FINISHING) continue;
            Instance instance = vertex.getAllocatedResource().getInstance();
            if (instance instanceof DummyInstance) {
                LOG.error((Object)("Found instance of type DummyInstance for vertex " + vertex.getName() + " (state " + (Object)((Object)state) + ")"));
                continue;
            }
            allocatedInstance.add(instance);
        }
        Runnable requestRunnable = new Runnable(){

            @Override
            public void run() {
                Iterator it2 = allocatedInstance.iterator();
                try {
                    while (it2.hasNext()) {
                        ((Instance)it2.next()).logBufferUtilization();
                    }
                }
                catch (IOException ioe) {
                    LOG.error((Object)ioe);
                }
            }
        };
        this.executorService.execute(requestRunnable);
    }

    @Override
    public int getAvailableSlots() {
        return this.getInstanceManager().getNumberOfSlots();
    }

    @Override
    public void deploy(final JobID jobID, final Instance instance, final List<ExecutionVertex> verticesToBeDeployed) {
        if (verticesToBeDeployed.isEmpty()) {
            LOG.error((Object)"Method 'deploy' called but list of vertices to be deployed is empty");
            return;
        }
        for (ExecutionVertex vertex : verticesToBeDeployed) {
            if (vertex.getExecutionState() != ExecutionState.READY) {
                LOG.error((Object)("Expected vertex " + vertex + " to be in state READY but it is in state " + (Object)((Object)vertex.getExecutionState())));
            }
            vertex.updateExecutionState(ExecutionState.STARTING, null);
        }
        Runnable deploymentRunnable = new Runnable(){

            @Override
            public void run() {
                try {
                    instance.checkLibraryAvailability(jobID);
                }
                catch (IOException ioe) {
                    LOG.error((Object)("Cannot check library availability: " + StringUtils.stringifyException((Throwable)ioe)));
                }
                SerializableArrayList<TaskDeploymentDescriptor> submissionList = new SerializableArrayList<TaskDeploymentDescriptor>();
                for (ExecutionVertex vertex : verticesToBeDeployed) {
                    submissionList.add(vertex.constructDeploymentDescriptor());
                    LOG.info((Object)("Starting task " + vertex + " on " + vertex.getAllocatedResource().getInstance()));
                }
                List<TaskSubmissionResult> submissionResultList = null;
                try {
                    submissionResultList = instance.submitTasks(submissionList);
                }
                catch (IOException ioe) {
                    String errorMsg = StringUtils.stringifyException((Throwable)ioe);
                    for (ExecutionVertex vertex : verticesToBeDeployed) {
                        vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, errorMsg);
                    }
                }
                if (verticesToBeDeployed.size() != submissionResultList.size()) {
                    LOG.error((Object)"size of submission result list does not match size of list with vertices to be deployed");
                }
                int count = 0;
                for (TaskSubmissionResult tsr : submissionResultList) {
                    ExecutionVertex vertex;
                    if (!(vertex = (ExecutionVertex)verticesToBeDeployed.get(count++)).getID().equals(tsr.getVertexID())) {
                        LOG.error((Object)"Expected different order of objects in task result list");
                        vertex = null;
                        for (ExecutionVertex candVertex : verticesToBeDeployed) {
                            if (!tsr.getVertexID().equals(candVertex.getID())) continue;
                            vertex = candVertex;
                            break;
                        }
                        if (vertex == null) {
                            LOG.error((Object)("Cannot find execution vertex for vertex ID " + tsr.getVertexID()));
                            continue;
                        }
                    }
                    if (tsr.getReturnCode() == AbstractTaskResult.ReturnCode.SUCCESS) continue;
                    vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, tsr.getDescription());
                }
            }
        };
        this.executorService.execute(deploymentRunnable);
    }

    @Override
    public InputSplitWrapper requestNextInputSplit(JobID jobID, ExecutionVertexID vertexID, IntegerRecord sequenceNumber) throws IOException {
        ExecutionGraph graph = this.scheduler.getExecutionGraphByID(jobID);
        if (graph == null) {
            LOG.error((Object)("Cannot find execution graph to job ID " + jobID));
            return null;
        }
        ExecutionVertex vertex = graph.getVertexByID(vertexID);
        if (vertex == null) {
            LOG.error((Object)("Cannot find execution vertex for vertex ID " + vertexID));
            return null;
        }
        return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertex, sequenceNumber.getValue()));
    }

    public void startInfoServer() {
        Configuration config = GlobalConfiguration.getConfiguration();
        try {
            int port = config.getInteger("jobmanager.web.port", 8081);
            this.server = new WebInfoServer(config, port, this);
            this.server.start();
        }
        catch (FileNotFoundException e) {
            LOG.error((Object)e.getMessage(), (Throwable)e);
        }
        catch (Exception e) {
            LOG.error((Object)("Cannot instantiate info server: " + e.getMessage()), (Throwable)e);
        }
    }

    public List<RecentJobEvent> getOldJobs() throws IOException {
        if (this.archive == null) {
            throw new IOException("No instance of the event collector found");
        }
        return this.archive.getJobs();
    }

    public ArchiveListener getArchive() {
        return this.archive;
    }

    public int getNumberOfTaskTrackers() {
        return this.instanceManager.getNumberOfTaskTrackers();
    }

    public Map<InstanceConnectionInfo, Instance> getInstances() {
        return this.instanceManager.getInstances();
    }

    @Override
    public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException {
        this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(), accumulatorEvent.getAccumulators(LibraryCacheManager.getClassLoader(accumulatorEvent.getJobID())));
    }

    @Override
    public AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException {
        return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID));
    }

    public Map<String, Accumulator<?, ?>> getAccumulators(JobID jobID) {
        return this.accumulatorManager.getJobAccumulators(jobID);
    }
}

