/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezLocalResource;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerImpl;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;

public class TaskAttemptListenerImpTezDag
extends AbstractService
implements TezTaskUmbilicalProtocol,
TaskAttemptListener {
    private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(null, true, null, null, false);
    private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpTezDag.class);
    private final AppContext context;
    protected final TaskHeartbeatHandler taskHeartbeatHandler;
    protected final ContainerHeartbeatHandler containerHeartbeatHandler;
    private final JobTokenSecretManager jobTokenSecretManager;
    private InetSocketAddress address;
    private Server server;
    private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToInfoMap = new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
    private ConcurrentHashMap<ContainerId, ContainerInfo> registeredContainers = new ConcurrentHashMap();

    public TaskAttemptListenerImpTezDag(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, JobTokenSecretManager jobTokenSecretManager) {
        super(TaskAttemptListenerImpTezDag.class.getName());
        this.context = context;
        this.jobTokenSecretManager = jobTokenSecretManager;
        this.taskHeartbeatHandler = thh;
        this.containerHeartbeatHandler = chh;
    }

    public void serviceStart() {
        this.startRpcServer();
    }

    protected void startRpcServer() {
        Configuration conf = this.getConfig();
        try {
            this.server = new RPC.Builder(conf).setProtocol(TezTaskUmbilicalProtocol.class).setBindAddress("0.0.0.0").setPort(0).setInstance((Object)this).setNumHandlers(conf.getInt("tez.am.task.listener.thread-count", 30)).setSecretManager((SecretManager)this.jobTokenSecretManager).build();
            if (conf.getBoolean("hadoop.security.authorization", false)) {
                this.refreshServiceAcls(conf, new TezAMPolicyProvider());
            }
            this.server.start();
            this.address = NetUtils.getConnectAddress((Server)this.server);
        }
        catch (IOException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) {
        this.server.refreshServiceAcl(configuration, policyProvider);
    }

    public void serviceStop() {
        this.stopRpcServer();
    }

    protected void stopRpcServer() {
        if (this.server != null) {
            this.server.stop();
        }
    }

    @Override
    public InetSocketAddress getAddress() {
        return this.address;
    }

    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
        return 19L;
    }

    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
        return ProtocolSignature.getProtocolSignature((VersionedProtocol)this, (String)protocol, (long)clientVersion, (int)clientMethodsHash);
    }

    public ContainerTask getTask(ContainerContext containerContext) throws IOException {
        ContainerTask task = null;
        if (containerContext == null || containerContext.getContainerIdentifier() == null) {
            LOG.info((Object)"Invalid task request with an empty containerContext or containerId");
            task = TASK_FOR_INVALID_JVM;
        } else {
            ContainerId containerId = ConverterUtils.toContainerId((String)containerContext.getContainerIdentifier());
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Container with id: " + containerId + " asked for a task"));
            }
            if (!this.registeredContainers.containsKey(containerId)) {
                if (this.context.getAllContainers().get(containerId) == null) {
                    LOG.info((Object)("Container with id: " + containerId + " is invalid and will be killed"));
                } else {
                    LOG.info((Object)("Container with id: " + containerId + " is valid, but no longer registered, and will be killed"));
                }
                task = TASK_FOR_INVALID_JVM;
            } else {
                this.pingContainerHeartbeatHandler(containerId);
                AMContainerTask taskContext = this.pullTaskAttemptContext(containerId);
                if (taskContext.shouldDie()) {
                    LOG.info((Object)("No more tasks for container with id : " + containerId + ". Asking it to die"));
                    task = TASK_FOR_INVALID_JVM;
                } else if (taskContext.getTask() == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("No task currently assigned to Container with id: " + containerId));
                    }
                } else {
                    this.registerTaskAttempt(taskContext.getTask().getTaskAttemptID(), containerId);
                    task = new ContainerTask(taskContext.getTask(), false, this.convertLocalResourceMap(taskContext.getAdditionalResources()), taskContext.getCredentials(), taskContext.haveCredentialsChanged());
                    this.context.getEventHandler().handle((Event)new TaskAttemptEventStartedRemotely(taskContext.getTask().getTaskAttemptID(), containerId, this.context.getApplicationACLs()));
                    LOG.info((Object)("Container with id: " + containerId + " given task: " + taskContext.getTask().getTaskAttemptID()));
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("getTask returning task: " + task));
        }
        return task;
    }

    public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
        LOG.info((Object)("Commit go/no-go request from " + taskAttemptId.toString()));
        this.taskHeartbeatHandler.progressing(taskAttemptId);
        this.pingContainerHeartbeatHandler(taskAttemptId);
        DAG job = this.context.getCurrentDAG();
        Task task = job.getVertex(taskAttemptId.getTaskID().getVertexID()).getTask(taskAttemptId.getTaskID());
        return task.canCommit(taskAttemptId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
        ContainerId containerId = (ContainerId)this.attemptToInfoMap.get(attemptId);
        if (containerId == null) {
            LOG.warn((Object)("Unregister task attempt: " + attemptId + " from unknown container"));
            return;
        }
        ContainerInfo containerInfo = this.registeredContainers.get(containerId);
        if (containerInfo == null) {
            LOG.warn((Object)("Unregister task attempt: " + attemptId + " from non-registered container: " + containerId));
            return;
        }
        ContainerInfo containerInfo2 = containerInfo;
        synchronized (containerInfo2) {
            containerInfo.currentAttemptId = null;
            this.attemptToInfoMap.remove(attemptId);
        }
    }

    public AMContainerTask pullTaskAttemptContext(ContainerId containerId) {
        AMContainerImpl container = (AMContainerImpl)this.context.getAllContainers().get(containerId);
        return container.pullTaskContext();
    }

    @Override
    public void registerRunningContainer(ContainerId containerId) {
        ContainerInfo oldInfo;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("ContainerId: " + containerId + " registered with TaskAttemptListener"));
        }
        if ((oldInfo = this.registeredContainers.put(containerId, new ContainerInfo(containerId))) != null) {
            throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerTaskAttempt(TezTaskAttemptID attemptId, ContainerId containerId) {
        ContainerInfo containerInfo = this.registeredContainers.get(containerId);
        if (containerInfo == null) {
            throw new TezUncheckedException("Registering task attempt: " + attemptId + " to unknown container: " + containerId);
        }
        ContainerInfo containerInfo2 = containerInfo;
        synchronized (containerInfo2) {
            if (containerInfo.currentAttemptId != null) {
                throw new TezUncheckedException("Registering task attempt: " + attemptId + " to container: " + containerId + " with existing assignment to: " + containerInfo.currentAttemptId);
            }
            containerInfo.currentAttemptId = attemptId;
            ContainerId containerIdFromMap = this.attemptToInfoMap.put(attemptId, containerId);
            if (containerIdFromMap != null) {
                throw new TezUncheckedException("Registering task attempt: " + attemptId + " to container: " + containerId + " when already assigned to: " + containerIdFromMap);
            }
        }
    }

    @Override
    public void unregisterRunningContainer(ContainerId containerId) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Unregistering Container from TaskAttemptListener: " + containerId));
        }
        this.registeredContainers.remove(containerId);
    }

    private void pingContainerHeartbeatHandler(ContainerId containerId) {
        this.containerHeartbeatHandler.pinged(containerId);
    }

    private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
        ContainerId containerId = (ContainerId)this.attemptToInfoMap.get(taskAttemptId);
        if (containerId != null) {
            this.containerHeartbeatHandler.pinged(containerId);
        } else {
            LOG.warn((Object)("Handling communication from attempt: " + taskAttemptId + ", ContainerId not known for this attempt"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException {
        ContainerInfo containerInfo;
        ContainerId containerId = ConverterUtils.toContainerId((String)request.getContainerIdentifier());
        long requestId = request.getRequestId();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Received heartbeat from container, request=" + request));
        }
        if ((containerInfo = this.registeredContainers.get(containerId)) == null) {
            TezHeartbeatResponse response = new TezHeartbeatResponse();
            response.setLastRequestId(requestId);
            response.setShouldDie();
            return response;
        }
        ContainerInfo containerInfo2 = containerInfo;
        synchronized (containerInfo2) {
            this.pingContainerHeartbeatHandler(containerId);
            if (containerInfo.lastRequestId == requestId) {
                LOG.warn((Object)("Old sequenceId received: " + requestId + ", Re-sending last response to client"));
                return containerInfo.lastReponse;
            }
            TezHeartbeatResponse response = new TezHeartbeatResponse();
            response.setLastRequestId(requestId);
            TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
            if (taskAttemptID != null) {
                ContainerId containerIdFromMap = (ContainerId)this.attemptToInfoMap.get(taskAttemptID);
                if (containerIdFromMap == null || !containerIdFromMap.equals((Object)containerId)) {
                    throw new TezException("Attempt " + taskAttemptID + " is not recognized for heartbeat");
                }
                if (containerInfo.lastRequestId + 1L != requestId) {
                    throw new TezException("Container " + containerId + " has invalid request id. Expected: " + containerInfo.lastRequestId + 1 + " and actual: " + requestId);
                }
                List inEvents = request.getEvents();
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Ping from " + taskAttemptID.toString() + " events: " + (inEvents != null ? inEvents.size() : -1)));
                }
                if (inEvents != null && !inEvents.isEmpty()) {
                    TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
                    this.context.getEventHandler().handle((Event)new VertexEventRouteEvent(vertexId, inEvents));
                }
                this.taskHeartbeatHandler.pinged(taskAttemptID);
                List<TezEvent> outEvents = this.context.getCurrentDAG().getVertex(taskAttemptID.getTaskID().getVertexID()).getTask(taskAttemptID.getTaskID()).getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getMaxEvents());
                response.setEvents(outEvents);
            }
            containerInfo.lastRequestId = requestId;
            containerInfo.lastReponse = response;
            return response;
        }
    }

    private Map<String, TezLocalResource> convertLocalResourceMap(Map<String, LocalResource> ylrs) throws IOException {
        HashMap tlrs = Maps.newHashMap();
        if (ylrs != null) {
            for (Map.Entry<String, LocalResource> ylrEntry : ylrs.entrySet()) {
                TezLocalResource tlr;
                try {
                    tlr = TezConverterUtils.convertYarnLocalResourceToTez((LocalResource)ylrEntry.getValue());
                }
                catch (URISyntaxException e) {
                    throw new IOException(e);
                }
                tlrs.put(ylrEntry.getKey(), tlr);
            }
        }
        return tlrs;
    }

    class ContainerInfo {
        ContainerId containerId;
        long lastRequestId;
        TezHeartbeatResponse lastReponse;
        TezTaskAttemptID currentAttemptId;

        ContainerInfo(ContainerId containerId) {
            this.containerId = containerId;
            this.lastReponse = null;
            this.lastRequestId = 0L;
            this.currentAttemptId = null;
        }
    }
}

