/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.profiling.impl;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.profiling.JobManagerProfiler;
import org.apache.flink.runtime.profiling.ProfilingException;
import org.apache.flink.runtime.profiling.ProfilingListener;
import org.apache.flink.runtime.profiling.impl.JobProfilingData;
import org.apache.flink.runtime.profiling.impl.ProfilerImplProtocol;
import org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
import org.apache.flink.runtime.profiling.impl.types.InternalInputGateProfilingData;
import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
import org.apache.flink.runtime.profiling.impl.types.InternalOutputGateProfilingData;
import org.apache.flink.runtime.profiling.impl.types.InternalProfilingData;
import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer;
import org.apache.flink.runtime.profiling.types.InputGateProfilingEvent;
import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
import org.apache.flink.runtime.profiling.types.OutputGateProfilingEvent;
import org.apache.flink.runtime.profiling.types.SingleInstanceProfilingEvent;
import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent;
import org.apache.flink.util.StringUtils;

public class JobManagerProfilerImpl
implements JobManagerProfiler,
ProfilerImplProtocol {
    private static final Log LOG = LogFactory.getLog(JobManagerProfilerImpl.class);
    private static final String RPC_NUM_HANDLER_KEY = "jobmanager.profiling.rpc.numhandler";
    private static final int DEFAULT_NUM_HANLDER = 3;
    private final Server profilingServer;
    private final Map<JobID, List<ProfilingListener>> registeredListeners = new HashMap<JobID, List<ProfilingListener>>();
    private final Map<JobID, JobProfilingData> registeredJobs = new HashMap<JobID, JobProfilingData>();

    public JobManagerProfilerImpl(InetAddress jobManagerbindAddress) throws ProfilingException {
        int handlerCount = GlobalConfiguration.getInteger((String)RPC_NUM_HANDLER_KEY, (int)3);
        int rpcPort = GlobalConfiguration.getInteger((String)"jobmanager.profiling.rpc.port", (int)6124);
        InetSocketAddress rpcServerAddress = new InetSocketAddress(jobManagerbindAddress, rpcPort);
        RPC.Server profilingServerTmp = null;
        try {
            profilingServerTmp = RPC.getServer(this, rpcServerAddress.getHostName(), rpcServerAddress.getPort(), handlerCount);
            profilingServerTmp.start();
        }
        catch (IOException ioe) {
            throw new ProfilingException("Cannot start profiling RPC server: " + StringUtils.stringifyException((Throwable)ioe));
        }
        this.profilingServer = profilingServerTmp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerProfilingJob(ExecutionGraph executionGraph) {
        Map<JobID, JobProfilingData> map = this.registeredJobs;
        synchronized (map) {
            this.registeredJobs.put(executionGraph.getJobID(), new JobProfilingData(executionGraph));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterProfilingJob(ExecutionGraph executionGraph) {
        Map<JobID, Object> map = this.registeredListeners;
        synchronized (map) {
            this.registeredListeners.remove(executionGraph.getJobID());
        }
        map = this.registeredJobs;
        synchronized (map) {
            this.registeredJobs.remove(executionGraph.getJobID());
        }
    }

    @Override
    public void shutdown() {
        if (this.profilingServer != null) {
            LOG.debug((Object)"Stopping profiling RPC server");
            this.profilingServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchThreadData(long timestamp, InternalExecutionVertexThreadProfilingData profilingData) {
        long profilingStart = this.getProfilingStart(profilingData.getJobID());
        if (profilingStart < 0L && LOG.isDebugEnabled()) {
            LOG.debug((Object)("Received profiling data for unregistered job " + profilingData.getJobID()));
            return;
        }
        Map<JobID, List<ProfilingListener>> map = this.registeredListeners;
        synchronized (map) {
            List<ProfilingListener> jobListeners = this.registeredListeners.get(profilingData.getJobID());
            if (jobListeners == null) {
                return;
            }
            ThreadProfilingEvent threadProfilingEvent = new ThreadProfilingEvent(profilingData.getUserTime(), profilingData.getSystemTime(), profilingData.getBlockedTime(), profilingData.getWaitedTime(), profilingData.getExecutionVertexID().toManagementVertexID(), profilingData.getProfilingInterval(), profilingData.getJobID(), timestamp, timestamp - profilingStart);
            Iterator<ProfilingListener> it = jobListeners.iterator();
            while (it.hasNext()) {
                it.next().processProfilingEvents(threadProfilingEvent);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchInstanceData(long timestamp, InternalInstanceProfilingData profilingData) {
        Map<JobID, JobProfilingData> map = this.registeredJobs;
        synchronized (map) {
            for (JobID jobID : this.registeredJobs.keySet()) {
                JobProfilingData jobProfilingData = this.registeredJobs.get(jobID);
                if (!jobProfilingData.instanceAllocatedByJob(profilingData)) continue;
                SingleInstanceProfilingEvent singleInstanceProfilingEvent = new SingleInstanceProfilingEvent(profilingData.getProfilingInterval(), profilingData.getIOWaitCPU(), profilingData.getIdleCPU(), profilingData.getUserCPU(), profilingData.getSystemCPU(), profilingData.getHardIrqCPU(), profilingData.getSoftIrqCPU(), profilingData.getTotalMemory(), profilingData.getFreeMemory(), profilingData.getBufferedMemory(), profilingData.getCachedMemory(), profilingData.getCachedSwapMemory(), profilingData.getReceivedBytes(), profilingData.getTransmittedBytes(), jobID, timestamp, timestamp - jobProfilingData.getProfilingStart(), profilingData.getInstanceConnectionInfo().toString());
                Map<JobID, List<ProfilingListener>> map2 = this.registeredListeners;
                synchronized (map2) {
                    List<ProfilingListener> jobListeners = this.registeredListeners.get(jobID);
                    if (jobListeners == null) {
                        continue;
                    }
                    InstanceSummaryProfilingEvent instanceSummary = jobProfilingData.getInstanceSummaryProfilingData(timestamp);
                    for (ProfilingListener profilingListener : jobListeners) {
                        profilingListener.processProfilingEvents(singleInstanceProfilingEvent);
                        if (instanceSummary == null) continue;
                        profilingListener.processProfilingEvents(instanceSummary);
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchInputGateData(long timestamp, InternalInputGateProfilingData profilingData) {
        long profilingStart = this.getProfilingStart(profilingData.getJobID());
        if (profilingStart < 0L) {
            LOG.error((Object)("Received profiling data for unregistered job " + profilingData.getJobID()));
            return;
        }
        Map<JobID, List<ProfilingListener>> map = this.registeredListeners;
        synchronized (map) {
            List<ProfilingListener> jobListeners = this.registeredListeners.get(profilingData.getJobID());
            if (jobListeners == null) {
                return;
            }
            InputGateProfilingEvent inputGateProfilingEvent = new InputGateProfilingEvent(profilingData.getGateIndex(), profilingData.getNoRecordsAvailableCounter(), profilingData.getExecutionVertexID().toManagementVertexID(), profilingData.getProfilingInterval(), profilingData.getJobID(), timestamp, timestamp - profilingStart);
            Iterator<ProfilingListener> it = jobListeners.iterator();
            while (it.hasNext()) {
                it.next().processProfilingEvents(inputGateProfilingEvent);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchOutputGateData(long timestamp, InternalOutputGateProfilingData profilingData) {
        long profilingStart = this.getProfilingStart(profilingData.getJobID());
        if (profilingStart < 0L) {
            LOG.error((Object)("Received profiling data for unregistered job " + profilingData.getJobID()));
            return;
        }
        Map<JobID, List<ProfilingListener>> map = this.registeredListeners;
        synchronized (map) {
            List<ProfilingListener> jobListeners = this.registeredListeners.get(profilingData.getJobID());
            if (jobListeners == null) {
                return;
            }
            OutputGateProfilingEvent outputGateProfilingEvent = new OutputGateProfilingEvent(profilingData.getGateIndex(), profilingData.getChannelCapacityExhaustedCounter(), profilingData.getExecutionVertexID().toManagementVertexID(), profilingData.getProfilingInterval(), profilingData.getJobID(), timestamp, timestamp - profilingStart);
            Iterator<ProfilingListener> it = jobListeners.iterator();
            while (it.hasNext()) {
                it.next().processProfilingEvents(outputGateProfilingEvent);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getProfilingStart(JobID jobID) {
        Map<JobID, JobProfilingData> map = this.registeredJobs;
        synchronized (map) {
            JobProfilingData profilingData = this.registeredJobs.get(jobID);
            if (profilingData == null) {
                return -1L;
            }
            return profilingData.getProfilingStart();
        }
    }

    @Override
    public void reportProfilingData(ProfilingDataContainer profilingDataContainer) {
        long timestamp = System.currentTimeMillis();
        Iterator<InternalProfilingData> dataIterator = profilingDataContainer.getIterator();
        while (dataIterator.hasNext()) {
            InternalProfilingData internalProfilingData = dataIterator.next();
            if (internalProfilingData instanceof InternalExecutionVertexThreadProfilingData) {
                this.dispatchThreadData(timestamp, (InternalExecutionVertexThreadProfilingData)internalProfilingData);
                continue;
            }
            if (internalProfilingData instanceof InternalInstanceProfilingData) {
                this.dispatchInstanceData(timestamp, (InternalInstanceProfilingData)internalProfilingData);
                continue;
            }
            if (internalProfilingData instanceof InternalInputGateProfilingData) {
                this.dispatchInputGateData(timestamp, (InternalInputGateProfilingData)internalProfilingData);
                continue;
            }
            if (internalProfilingData instanceof InternalOutputGateProfilingData) {
                this.dispatchOutputGateData(timestamp, (InternalOutputGateProfilingData)internalProfilingData);
                continue;
            }
            LOG.error((Object)("Received unknown profiling data: " + internalProfilingData.getClass().getName()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerForProfilingData(JobID jobID, ProfilingListener profilingListener) {
        Map<JobID, List<ProfilingListener>> map = this.registeredListeners;
        synchronized (map) {
            List<ProfilingListener> jobListeners = this.registeredListeners.get(jobID);
            if (jobListeners == null) {
                jobListeners = new ArrayList<ProfilingListener>();
                this.registeredListeners.put(jobID, jobListeners);
            }
            jobListeners.add(profilingListener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterFromProfilingData(JobID jobID, ProfilingListener profilingListener) {
        Map<JobID, List<ProfilingListener>> map = this.registeredListeners;
        synchronized (map) {
            List<ProfilingListener> jobListeners = this.registeredListeners.get(jobID);
            if (jobListeners == null) {
                return;
            }
            jobListeners.remove(profilingListener);
            if (jobListeners.isEmpty()) {
                this.registeredListeners.remove(jobID);
            }
        }
    }
}

