/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.task;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.runtime.task.ErrorReporter;

public class TaskReporter {
    private static final Logger LOG = Logger.getLogger(TaskReporter.class);
    private final TezTaskUmbilicalProtocol umbilical;
    private final long pollInterval;
    private final long sendCounterInterval;
    private final int maxEventsToGet;
    private final AtomicLong requestCounter;
    private final String containerIdStr;
    private final ListeningExecutorService heartbeatExecutor;
    @VisibleForTesting
    HeartbeatCallable currentCallable;

    public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
        this.umbilical = umbilical;
        this.pollInterval = amPollInterval;
        this.sendCounterInterval = sendCounterInterval;
        this.maxEventsToGet = maxEventsToGet;
        this.requestCounter = requestCounter;
        this.containerIdStr = containerIdStr;
        ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
        this.heartbeatExecutor = MoreExecutors.listeningDecorator((ExecutorService)executor);
    }

    public synchronized void registerTask(LogicalIOProcessorRuntimeTask task, ErrorReporter errorReporter) {
        this.currentCallable = new HeartbeatCallable(task, this.umbilical, this.pollInterval, this.sendCounterInterval, this.maxEventsToGet, this.requestCounter, this.containerIdStr);
        ListenableFuture future = this.heartbeatExecutor.submit((Callable)this.currentCallable);
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new HeartbeatCallback(errorReporter));
    }

    public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
        this.currentCallable.markComplete();
        this.currentCallable = null;
    }

    public void shutdown() {
        this.heartbeatExecutor.shutdownNow();
    }

    public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
        return this.currentCallable.taskSucceeded(taskAttemptID);
    }

    public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException {
        return this.currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
    }

    public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
        this.currentCallable.addEvents(taskAttemptID, events);
    }

    public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
        return this.umbilical.canCommit(taskAttemptID);
    }

    private static class HeartbeatCallback
    implements FutureCallback<Boolean> {
        private final ErrorReporter errorReporter;

        HeartbeatCallback(ErrorReporter errorReporter) {
            this.errorReporter = errorReporter;
        }

        public void onSuccess(Boolean result) {
            if (!result.booleanValue()) {
                this.errorReporter.shutdownRequested();
            }
        }

        public void onFailure(Throwable t) {
            this.errorReporter.reportError(t);
        }
    }

    private static class HeartbeatCallable
    implements Callable<Boolean> {
        private static final int LOG_COUNTER_START_INTERVAL = 5000;
        private static final float LOG_COUNTER_BACKOFF = 1.3f;
        private final LogicalIOProcessorRuntimeTask task;
        private EventMetaData updateEventMetadata;
        private final TezTaskUmbilicalProtocol umbilical;
        private final long pollInterval;
        private final long sendCounterInterval;
        private final int maxEventsToGet;
        private final String containerIdStr;
        private final AtomicLong requestCounter;
        private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue();
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition condition = this.lock.newCondition();
        private int nonOobHeartbeatCounter = 0;
        private int nextHeartbeatNumToLog = 0;
        private int prevCounterSendHeartbeatNum = 0;

        public HeartbeatCallable(LogicalIOProcessorRuntimeTask task, TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
            this.pollInterval = amPollInterval;
            this.sendCounterInterval = sendCounterInterval;
            this.maxEventsToGet = maxEventsToGet;
            this.requestCounter = requestCounter;
            this.containerIdStr = containerIdStr;
            this.task = task;
            this.umbilical = umbilical;
            this.updateEventMetadata = new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, task.getVertexName(), "", task.getTaskAttemptID());
            this.nextHeartbeatNumToLog = Math.max(1, (int)(5000.0f / (amPollInterval == 0L ? 1.0E-6f : (float)amPollInterval)));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Boolean call() throws Exception {
            while (!this.task.isTaskDone() && !this.task.hadFatalError()) {
                boolean result = this.heartbeat(null);
                if (!result) {
                    LOG.info((Object)"Asked to die via task heartbeat");
                    return false;
                }
                this.lock.lock();
                try {
                    boolean interrupted = this.condition.await(this.pollInterval, TimeUnit.MILLISECONDS);
                    if (interrupted) continue;
                    ++this.nonOobHeartbeatCounter;
                }
                finally {
                    this.lock.unlock();
                }
            }
            int pendingEventCount = this.eventsToSend.size();
            if (pendingEventCount > 0) {
                LOG.warn((Object)("Exiting TaskReporter therad with pending queue size=" + pendingEventCount));
            }
            return true;
        }

        private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException, TezException {
            if (eventsArg != null) {
                this.eventsToSend.addAll(eventsArg);
            }
            TezEvent updateEvent = null;
            ArrayList<TezEvent> events = new ArrayList<TezEvent>();
            this.eventsToSend.drainTo(events);
            if (!this.task.isTaskDone() && !this.task.hadFatalError()) {
                TezCounters counters = null;
                if ((long)(this.nonOobHeartbeatCounter - this.prevCounterSendHeartbeatNum) * this.pollInterval >= this.sendCounterInterval) {
                    counters = this.task.getCounters();
                    this.prevCounterSendHeartbeatNum = this.nonOobHeartbeatCounter;
                }
                updateEvent = new TezEvent((Event)new TaskStatusUpdateEvent(counters, this.task.getProgress()), this.updateEventMetadata);
                events.add(updateEvent);
            }
            long requestId = this.requestCounter.incrementAndGet();
            TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, this.containerIdStr, this.task.getTaskAttemptID(), this.task.getEventCounter(), this.maxEventsToGet);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Sending heartbeat to AM, request=" + request));
            }
            this.maybeLogCounters();
            TezHeartbeatResponse response = this.umbilical.heartbeat(request);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Received heartbeat response from AM, response=" + response));
            }
            if (response.shouldDie()) {
                LOG.info((Object)"Received should die response from AM");
                return false;
            }
            if (response.getLastRequestId() != requestId) {
                throw new TezException("AM and Task out of sync, responseReqId=" + response.getLastRequestId() + ", expectedReqId=" + requestId);
            }
            if (this.task.isTaskDone() || this.task.hadFatalError()) {
                if (response.getEvents() != null && !response.getEvents().isEmpty()) {
                    LOG.warn((Object)("Current task already complete, Ignoring all event in heartbeat response, eventCount=" + response.getEvents().size()));
                }
            } else if (response.getEvents() != null && !response.getEvents().isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Routing events from heartbeat response to task, currentTaskAttemptId=" + this.task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()));
                }
                this.task.handleEvents((Collection)response.getEvents());
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void markComplete() {
            this.lock.lock();
            try {
                this.condition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }

        private void maybeLogCounters() {
            if (LOG.isDebugEnabled() && this.nonOobHeartbeatCounter == this.nextHeartbeatNumToLog) {
                LOG.debug((Object)("Counters: " + this.task.getCounters().toShortString()));
                this.nextHeartbeatNumToLog = (int)((float)this.nextHeartbeatNumToLog * 1.3f);
            }
        }

        private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
            TezEvent statusUpdateEvent = new TezEvent((Event)new TaskStatusUpdateEvent(this.task.getCounters(), this.task.getProgress()), this.updateEventMetadata);
            TezEvent taskCompletedEvent = new TezEvent((Event)new TaskAttemptCompletedEvent(), this.updateEventMetadata);
            return this.heartbeat(Lists.newArrayList((Object[])new TezEvent[]{statusUpdateEvent, taskCompletedEvent}));
        }

        private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException {
            TezEvent statusUpdateEvent = new TezEvent((Event)new TaskStatusUpdateEvent(this.task.getCounters(), this.task.getProgress()), this.updateEventMetadata);
            diagnostics = diagnostics == null ? ExceptionUtils.getStackTrace((Throwable)t) : diagnostics + ":" + ExceptionUtils.getStackTrace((Throwable)t);
            TezEvent taskAttemptFailedEvent = new TezEvent((Event)new TaskAttemptFailedEvent(diagnostics), srcMeta == null ? this.updateEventMetadata : srcMeta);
            return this.heartbeat(Lists.newArrayList((Object[])new TezEvent[]{statusUpdateEvent, taskAttemptFailedEvent}));
        }

        private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
            if (events != null && !events.isEmpty()) {
                this.eventsToSend.addAll(events);
            }
        }
    }
}

