/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.speculation.legacy;

import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.ProgressHelper;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacyTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.TaskRuntimeEstimator;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LegacySpeculator
extends AbstractService {
    private static final long ON_SCHEDULE = Long.MIN_VALUE;
    private static final long ALREADY_SPECULATING = -9223372036854775807L;
    private static final long TOO_NEW = -9223372036854775806L;
    private static final long PROGRESS_IS_GOOD = -9223372036854775805L;
    private static final long NOT_RUNNING = -9223372036854775804L;
    private static final long TOO_LATE_TO_SPECULATE = -9223372036854775803L;
    private final long soonestRetryAfterNoSpeculate;
    private final long soonestRetryAfterSpeculate;
    private final double proportionRunningTasksSpeculatable;
    private final double proportionTotalTasksSpeculatable;
    private final int minimumAllowedSpeculativeTasks;
    private static final int VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION = 1;
    private static final Logger LOG = LoggerFactory.getLogger(LegacySpeculator.class);
    private final ConcurrentMap<TezTaskID, Boolean> runningTasks = new ConcurrentHashMap<TezTaskID, Boolean>();
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentMap<TezTaskAttemptID, TaskAttemptHistoryStatistics> runningTaskAttemptStatistics = new ConcurrentHashMap<TezTaskAttemptID, TaskAttemptHistoryStatistics>();
    private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9000L;
    private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
    private Vertex vertex;
    private TaskRuntimeEstimator estimator;
    private final long taskTimeout;
    private final Clock clock;
    private Thread speculationBackgroundThread = null;
    private volatile boolean stopped = false;

    @VisibleForTesting
    public int getMinimumAllowedSpeculativeTasks() {
        return this.minimumAllowedSpeculativeTasks;
    }

    @VisibleForTesting
    public double getProportionTotalTasksSpeculatable() {
        return this.proportionTotalTasksSpeculatable;
    }

    @VisibleForTesting
    public double getProportionRunningTasksSpeculatable() {
        return this.proportionRunningTasksSpeculatable;
    }

    @VisibleForTesting
    public long getSoonestRetryAfterNoSpeculate() {
        return this.soonestRetryAfterNoSpeculate;
    }

    @VisibleForTesting
    public long getSoonestRetryAfterSpeculate() {
        return this.soonestRetryAfterSpeculate;
    }

    public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) {
        this(conf, context.getClock(), vertex);
    }

    public LegacySpeculator(Configuration conf, Clock clock, Vertex vertex) {
        this(conf, LegacySpeculator.getEstimator(conf, vertex), clock, vertex);
    }

    private static TaskRuntimeEstimator getEstimator(Configuration conf, Vertex vertex) {
        TaskRuntimeEstimator estimator;
        Class estimatorClass = conf.getClass("tez.am.task.estimator.class", LegacyTaskRuntimeEstimator.class, TaskRuntimeEstimator.class);
        try {
            Constructor estimatorConstructor = estimatorClass.getConstructor(new Class[0]);
            estimator = (TaskRuntimeEstimator)estimatorConstructor.newInstance(new Object[0]);
            estimator.contextualize(conf, vertex);
        }
        catch (NoSuchMethodException e) {
            LOG.error("Can't make a speculation runtime estimator", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (IllegalAccessException e) {
            LOG.error("Can't make a speculation runtime estimator", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (InstantiationException e) {
            LOG.error("Can't make a speculation runtime estimator", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (InvocationTargetException e) {
            LOG.error("Can't make a speculation runtime estimator", (Throwable)e);
            throw new RuntimeException(e);
        }
        return estimator;
    }

    protected void serviceStart() throws Exception {
        this.lock.writeLock().lock();
        try {
            assert (this.speculationBackgroundThread == null);
            if (this.speculationBackgroundThread == null) {
                this.speculationBackgroundThread = new Thread(this.createThread(), "DefaultSpeculator background processing");
                this.speculationBackgroundThread.start();
            }
            super.serviceStart();
        }
        catch (Exception e) {
            LOG.warn("Speculator thread could not launch", (Throwable)e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public boolean isStarted() {
        boolean result = false;
        this.lock.readLock().lock();
        try {
            if (this.speculationBackgroundThread != null) {
                result = this.getServiceState().equals((Object)Service.STATE.STARTED);
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        return result;
    }

    public LegacySpeculator(Configuration conf, TaskRuntimeEstimator estimator, Clock clock, Vertex vertex) {
        super(LegacySpeculator.class.getName());
        this.vertex = vertex;
        this.estimator = estimator;
        this.clock = clock;
        this.taskTimeout = conf.getLong("tez.am.legacy.speculative.single.task.vertex.timeout", -1L);
        this.soonestRetryAfterNoSpeculate = conf.getLong("tez.am.soonest.retry.after.no.speculate", 1000L);
        this.soonestRetryAfterSpeculate = conf.getLong("tez.am.soonest.retry.after.speculate", 15000L);
        this.proportionRunningTasksSpeculatable = conf.getDouble("tez.am.proportion.running.tasks.speculatable", 0.1);
        this.proportionTotalTasksSpeculatable = conf.getDouble("tez.am.proportion.total.tasks.speculatable", 0.01);
        this.minimumAllowedSpeculativeTasks = conf.getInt("tez.am.minimum.allowed.speculative.tasks", 10);
    }

    protected void serviceStop() throws Exception {
        this.lock.writeLock().lock();
        try {
            this.stopped = true;
            if (this.speculationBackgroundThread != null) {
                this.speculationBackgroundThread.interrupt();
            }
            super.serviceStop();
            this.speculationBackgroundThread = null;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public Runnable createThread() {
        return new Runnable(){

            @Override
            public void run() {
                while (!LegacySpeculator.this.stopped && !Thread.currentThread().isInterrupted()) {
                    long backgroundRunStartTime = LegacySpeculator.this.clock.getTime();
                    try {
                        int speculations = LegacySpeculator.this.computeSpeculations();
                        long nextRecompTime = speculations > 0 ? LegacySpeculator.this.soonestRetryAfterSpeculate : LegacySpeculator.this.soonestRetryAfterNoSpeculate;
                        long wait = Math.max(nextRecompTime, LegacySpeculator.this.clock.getTime() - backgroundRunStartTime);
                        if (speculations > 0) {
                            LOG.info("We launched " + speculations + " speculations.  Waiting " + wait + " milliseconds before next evaluation.");
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Waiting {} milliseconds before next evaluation.", (Object)wait);
                        }
                        Thread.sleep(wait);
                    }
                    catch (InterruptedException ie) {
                        if (LegacySpeculator.this.stopped) continue;
                        LOG.warn("Speculator thread interrupted", (Throwable)ie);
                    }
                }
            }
        };
    }

    public void notifyAttemptStarted(TezTaskAttemptID taId, long timestamp) {
        this.estimator.enrollAttempt(taId, timestamp);
    }

    public void notifyAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState reportedState, long timestamp) {
        this.statusUpdate(taId, reportedState, timestamp);
    }

    private void statusUpdate(TezTaskAttemptID attemptID, TaskAttemptState reportedState, long timestamp) {
        TezTaskID taskID = attemptID.getTaskID();
        Task task = this.vertex.getTask(taskID);
        if (task == null) {
            return;
        }
        this.estimator.updateAttempt(attemptID, reportedState, timestamp);
        if (reportedState == TaskAttemptState.RUNNING) {
            this.runningTasks.putIfAbsent(taskID, Boolean.TRUE);
        } else {
            this.runningTasks.remove(taskID, Boolean.TRUE);
            if (reportedState == TaskAttemptState.STARTING) {
                this.runningTaskAttemptStatistics.remove(attemptID);
            }
        }
    }

    public void handle(SpeculatorEvent event) {
        SpeculatorEventTaskAttemptStatusUpdate updateEvent = (SpeculatorEventTaskAttemptStatusUpdate)event;
        if (updateEvent.hasJustStarted()) {
            this.notifyAttemptStarted(updateEvent.getAttemptId(), updateEvent.getTimestamp());
        } else {
            this.notifyAttemptStatusUpdate(updateEvent.getAttemptId(), updateEvent.getTaskAttemptState(), updateEvent.getTimestamp());
        }
    }

    private long speculationValue(Task task, long now, boolean shouldUseTimeout) {
        Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
        TezTaskID taskID = task.getTaskId();
        long acceptableRuntime = Long.MIN_VALUE;
        long result = Long.MIN_VALUE;
        if (task.getState() == TaskState.SUCCEEDED) {
            this.mayHaveSpeculated.remove(taskID);
            return -9223372036854775804L;
        }
        if (!this.mayHaveSpeculated.contains(taskID) && !shouldUseTimeout && (acceptableRuntime = this.estimator.thresholdRuntime(taskID)) == Long.MAX_VALUE) {
            return Long.MIN_VALUE;
        }
        int numberRunningAttempts = 0;
        for (TaskAttempt taskAttempt : attempts.values()) {
            TaskAttemptState taskAttemptState = taskAttempt.getState();
            if (taskAttemptState != TaskAttemptState.RUNNING && taskAttemptState != TaskAttemptState.STARTING) continue;
            if (++numberRunningAttempts > 1) {
                return -9223372036854775807L;
            }
            TezTaskAttemptID runningTaskAttemptID = taskAttempt.getID();
            long taskAttemptStartTime = this.estimator.attemptEnrolledTime(runningTaskAttemptID);
            if (taskAttemptStartTime > now) {
                return -9223372036854775806L;
            }
            if (shouldUseTimeout) {
                if (now - taskAttemptStartTime > this.taskTimeout) {
                    result = Long.MAX_VALUE;
                    continue;
                }
                return Long.MIN_VALUE;
            }
            long estimatedRunTime = this.estimator.estimatedRuntime(runningTaskAttemptID);
            long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
            long estimatedReplacementEndTime = now + this.estimator.newAttemptEstimatedRuntime();
            float progress = taskAttempt.getProgress();
            TaskAttemptHistoryStatistics data = (TaskAttemptHistoryStatistics)this.runningTaskAttemptStatistics.get(runningTaskAttemptID);
            if (data == null) {
                this.runningTaskAttemptStatistics.put(runningTaskAttemptID, new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
            } else if (estimatedRunTime == data.getEstimatedRunTime() && progress == data.getProgress()) {
                if (data.notHeartbeatedInAWhile(now) || this.estimator.hasStagnatedProgress(runningTaskAttemptID, now)) {
                    this.statusUpdate(taskAttempt.getID(), taskAttempt.getState(), this.clock.getTime());
                }
            } else {
                data.setEstimatedRunTime(estimatedRunTime);
                data.setProgress(progress);
                data.resetHeartBeatTime(now);
            }
            if (estimatedEndTime < now) {
                return -9223372036854775805L;
            }
            if (estimatedReplacementEndTime >= estimatedEndTime) {
                return -9223372036854775803L;
            }
            result = estimatedEndTime - estimatedReplacementEndTime;
        }
        if (numberRunningAttempts == 0) {
            return -9223372036854775804L;
        }
        if (acceptableRuntime == Long.MIN_VALUE && !shouldUseTimeout && (acceptableRuntime = this.estimator.thresholdRuntime(taskID)) == Long.MAX_VALUE) {
            return Long.MIN_VALUE;
        }
        return result;
    }

    protected void addSpeculativeAttempt(TezTaskID taskID) {
        LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
        this.vertex.scheduleSpeculativeTask(taskID);
        this.mayHaveSpeculated.add(taskID);
    }

    int computeSpeculations() {
        int successes = 0;
        long now = this.clock.getTime();
        int numberSpeculationsAlready = 0;
        int numberRunningTasks = 0;
        Map<TezTaskID, Task> tasks = this.vertex.getTasks();
        int numberAllowedSpeculativeTasks = (int)Math.max((double)this.minimumAllowedSpeculativeTasks, this.proportionTotalTasksSpeculatable * (double)tasks.size());
        TezTaskID bestTaskID = null;
        long bestSpeculationValue = -1L;
        boolean shouldUseTimeout = tasks.size() <= 1 && this.taskTimeout >= 0L;
        for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
            long mySpeculationValue = this.speculationValue(taskEntry.getValue(), now, shouldUseTimeout);
            if (mySpeculationValue == -9223372036854775807L) {
                ++numberSpeculationsAlready;
            }
            if (mySpeculationValue != -9223372036854775804L) {
                ++numberRunningTasks;
            }
            if (mySpeculationValue <= bestSpeculationValue) continue;
            bestTaskID = taskEntry.getKey();
            bestSpeculationValue = mySpeculationValue;
        }
        numberAllowedSpeculativeTasks = (int)Math.max((double)numberAllowedSpeculativeTasks, this.proportionRunningTasksSpeculatable * (double)numberRunningTasks);
        if (bestTaskID != null && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
            this.addSpeculativeAttempt(bestTaskID);
            ++successes;
        }
        return successes;
    }

    static class TaskAttemptHistoryStatistics {
        private long estimatedRunTime;
        private float progress;
        private long lastHeartBeatTime;

        public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, long nonProgressStartTime) {
            this.estimatedRunTime = estimatedRunTime;
            this.progress = progress;
            this.resetHeartBeatTime(nonProgressStartTime);
        }

        public long getEstimatedRunTime() {
            return this.estimatedRunTime;
        }

        public float getProgress() {
            return this.progress;
        }

        public void setEstimatedRunTime(long estimatedRunTime) {
            this.estimatedRunTime = estimatedRunTime;
        }

        public void setProgress(float progress) {
            if (LOG.isDebugEnabled() && !ProgressHelper.isProgressWithinRange((float)progress)) {
                LOG.debug("Progress update: speculator received progress in invalid range={}", (Object)Float.valueOf(progress));
            }
            this.progress = progress;
        }

        public boolean notHeartbeatedInAWhile(long now) {
            if (now - this.lastHeartBeatTime <= 9000L) {
                return false;
            }
            this.resetHeartBeatTime(now);
            return true;
        }

        public void resetHeartBeatTime(long lastHeartBeatTime) {
            this.lastHeartBeatTime = lastHeartBeatTime;
        }
    }
}

