/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.gridmix.Gridmix;
import org.apache.hadoop.mapred.gridmix.Statistics;
import org.apache.hadoop.mapreduce.Job;

class JobMonitor
implements Gridmix.Component<Job> {
    public static final Log LOG = LogFactory.getLog(JobMonitor.class);
    private final Queue<Job> mJobs;
    private final MonitorThread mThread = new MonitorThread();
    private final BlockingQueue<Job> runningJobs = new LinkedBlockingQueue<Job>();
    private final long pollDelayMillis;
    private Statistics statistics;
    private boolean graceful = false;
    private boolean shutdown = false;

    public JobMonitor(Statistics statistics) {
        this(5, TimeUnit.SECONDS, statistics);
    }

    public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) {
        this.mJobs = new LinkedList<Job>();
        this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
        this.statistics = statistics;
    }

    @Override
    public void add(Job job) throws InterruptedException {
        this.runningJobs.put(job);
    }

    public void submissionFailed(Job job) {
        LOG.info((Object)("Job submission failed notification for job " + job.getJobID()));
        this.statistics.add(job);
    }

    protected void onSuccess(Job job) {
        LOG.info((Object)(job.getJobName() + " (" + job.getJobID() + ")" + " success"));
    }

    protected void onFailure(Job job) {
        LOG.info((Object)(job.getJobName() + " (" + job.getJobID() + ")" + " failure"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<Job> getRemainingJobs() {
        if (this.mThread.isAlive()) {
            LOG.warn((Object)"Internal error: Polling running monitor for jobs");
        }
        Queue<Job> queue = this.mJobs;
        synchronized (queue) {
            return new ArrayList<Job>(this.mJobs);
        }
    }

    @Override
    public void start() {
        this.mThread.start();
    }

    @Override
    public void join(long millis) throws InterruptedException {
        this.mThread.join(millis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abort() {
        Queue<Job> queue = this.mJobs;
        synchronized (queue) {
            this.graceful = false;
            this.shutdown = true;
        }
        this.mThread.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        Queue<Job> queue = this.mJobs;
        synchronized (queue) {
            this.graceful = true;
            this.shutdown = true;
        }
        this.mThread.interrupt();
    }

    private class MonitorThread
    extends Thread {
        public MonitorThread() {
            super("GridmixJobMonitor");
        }

        public void process(Job job) throws IOException, InterruptedException {
            if (job.isSuccessful()) {
                JobMonitor.this.onSuccess(job);
            } else {
                JobMonitor.this.onFailure(job);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block18: while (true) {
                try {
                    while (true) {
                        boolean shutdown;
                        boolean graceful;
                        Queue queue = JobMonitor.this.mJobs;
                        synchronized (queue) {
                            graceful = JobMonitor.this.graceful;
                            shutdown = JobMonitor.this.shutdown;
                            JobMonitor.this.runningJobs.drainTo(JobMonitor.this.mJobs);
                        }
                        if (shutdown) {
                            if (!graceful) {
                                while (!JobMonitor.this.runningJobs.isEmpty()) {
                                    queue = JobMonitor.this.mJobs;
                                    synchronized (queue) {
                                        JobMonitor.this.runningJobs.drainTo(JobMonitor.this.mJobs);
                                    }
                                }
                                break block18;
                            }
                            if (JobMonitor.this.mJobs.isEmpty()) break block18;
                        }
                        while (!JobMonitor.this.mJobs.isEmpty()) {
                            Job job;
                            Queue queue2;
                            block27: {
                                queue2 = JobMonitor.this.mJobs;
                                synchronized (queue2) {
                                    job = (Job)JobMonitor.this.mJobs.poll();
                                }
                                try {
                                    if (job.isComplete()) {
                                        this.process(job);
                                        JobMonitor.this.statistics.add(job);
                                    }
                                    break block27;
                                }
                                catch (IOException e) {
                                    if (e.getCause() instanceof ClosedByInterruptException) {
                                        Thread.currentThread().interrupt();
                                        break block27;
                                    }
                                    LOG.warn((Object)("Lost job " + (null == job.getJobName() ? "<unknown>" : job.getJobName())), (Throwable)e);
                                }
                                continue;
                            }
                            queue2 = JobMonitor.this.mJobs;
                            synchronized (queue2) {
                                if (!JobMonitor.this.mJobs.offer(job)) {
                                    LOG.error((Object)("Lost job " + (null == job.getJobName() ? "<unknown>" : job.getJobName())));
                                }
                                break;
                            }
                        }
                        try {
                            TimeUnit.MILLISECONDS.sleep(JobMonitor.this.pollDelayMillis);
                            continue block18;
                        }
                        catch (InterruptedException e) {
                            shutdown = true;
                            continue;
                        }
                        break;
                    }
                }
                catch (Throwable e) {
                    LOG.warn((Object)"Unexpected exception: ", e);
                    continue;
                }
                break;
            }
        }
    }
}

