/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TestRecovery;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Event;
import org.junit.Test;

/*
 * Exception performing whole class analysis ignored.
 */
public class TestRecovery {
    private static final Log LOG = LogFactory.getLog(TestRecovery.class);
    private static Path outputDir = new Path(new File("target", TestRecovery.class.getName()).getAbsolutePath() + "/" + "out");
    private static String partFile = "part-r-00000";
    private Text key1 = new Text("key1");
    private Text key2 = new Text("key2");
    private Text val1 = new Text("val1");
    private Text val2 = new Text("val2");

    @Test
    public void testCrashed() throws Exception {
        int runCount = 0;
        long am1StartTimeEst = System.currentTimeMillis();
        MRAppWithHistory app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        long jobStartTime = job.getReport().getStartTime();
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask1 = (Task)it.next();
        Task mapTask2 = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.RUNNING);
        app.waitForState(mapTask2, TaskState.RUNNING);
        TaskAttempt task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        TaskAttempt task2Attempt = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
        app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
        Assert.assertEquals((String)"Reduce Task state not correct", (Object)TaskState.RUNNING, (Object)reduceTask.getReport().getTaskState());
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt1.getID(), TaskAttemptEventType.TA_FAILMSG));
        app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
        int timeOut = 0;
        while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
            Thread.sleep(2000L);
            LOG.info((Object)"Waiting for next attempt to start");
        }
        Assert.assertEquals((int)2, (int)mapTask1.getAttempts().size());
        Iterator itr = mapTask1.getAttempts().values().iterator();
        itr.next();
        TaskAttempt task1Attempt2 = (TaskAttempt)itr.next();
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt2.getID(), TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
        app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
        timeOut = 0;
        while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
            Thread.sleep(2000L);
            LOG.info((Object)"Waiting for next attempt to start");
        }
        Assert.assertEquals((int)3, (int)mapTask1.getAttempts().size());
        itr = mapTask1.getAttempts().values().iterator();
        itr.next();
        itr.next();
        TaskAttempt task1Attempt3 = (TaskAttempt)itr.next();
        app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt3.getID(), TaskAttemptEventType.TA_KILL));
        app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
        timeOut = 0;
        while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
            Thread.sleep(2000L);
            LOG.info((Object)"Waiting for next attempt to start");
        }
        Assert.assertEquals((int)4, (int)mapTask1.getAttempts().size());
        itr = mapTask1.getAttempts().values().iterator();
        itr.next();
        itr.next();
        itr.next();
        TaskAttempt task1Attempt4 = (TaskAttempt)itr.next();
        app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt4.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        long task1StartTime = mapTask1.getReport().getStartTime();
        long task1FinishTime = mapTask1.getReport().getFinishTime();
        app.stop();
        long am2StartTimeEst = System.currentTimeMillis();
        app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        mapTask2 = (Task)it.next();
        reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        app.waitForState(mapTask2, TaskState.RUNNING);
        task2Attempt = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(((TaskAttempt)mapTask2.getAttempts().values().iterator().next()).getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask2, TaskState.SUCCEEDED);
        app.waitForState(reduceTask, TaskState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(((TaskAttempt)reduceTask.getAttempts().values().iterator().next()).getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        app.verifyCompleted();
        Assert.assertEquals((String)"Job Start time not correct", (long)jobStartTime, (long)job.getReport().getStartTime());
        Assert.assertEquals((String)"Task Start time not correct", (long)task1StartTime, (long)mapTask1.getReport().getStartTime());
        Assert.assertEquals((String)"Task Finish time not correct", (long)task1FinishTime, (long)mapTask1.getReport().getFinishTime());
        Assert.assertEquals((int)2, (int)job.getAMInfos().size());
        int attemptNum = 1;
        for (AMInfo amInfo : job.getAMInfos()) {
            Assert.assertEquals((int)attemptNum++, (int)amInfo.getAppAttemptId().getAttemptId());
            Assert.assertEquals((Object)amInfo.getAppAttemptId(), (Object)amInfo.getContainerId().getApplicationAttemptId());
            Assert.assertEquals((String)MRApp.NM_HOST, (String)amInfo.getNodeManagerHost());
            Assert.assertEquals((int)MRApp.NM_PORT, (int)amInfo.getNodeManagerPort());
            Assert.assertEquals((int)MRApp.NM_HTTP_PORT, (int)amInfo.getNodeManagerHttpPort());
        }
        long am1StartTimeReal = ((AMInfo)job.getAMInfos().get(0)).getStartTime();
        long am2StartTimeReal = ((AMInfo)job.getAMInfos().get(1)).getStartTime();
        Assert.assertTrue((am1StartTimeReal >= am1StartTimeEst && am1StartTimeReal <= am2StartTimeEst ? 1 : 0) != 0);
        Assert.assertTrue((am2StartTimeReal >= am2StartTimeEst && am2StartTimeReal <= System.currentTimeMillis() ? 1 : 0) != 0);
    }

    @Test
    public void testMultipleCrashes() throws Exception {
        int runCount = 0;
        MRAppWithHistory app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask1 = (Task)it.next();
        Task mapTask2 = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.RUNNING);
        app.waitForState(mapTask2, TaskState.RUNNING);
        TaskAttempt task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        TaskAttempt task2Attempt = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
        app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
        Assert.assertEquals((String)"Reduce Task state not correct", (Object)TaskState.RUNNING, (Object)reduceTask.getReport().getTaskState());
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        app.stop();
        app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        mapTask2 = (Task)it.next();
        reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        app.waitForState(mapTask2, TaskState.RUNNING);
        task2Attempt = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(((TaskAttempt)mapTask2.getAttempts().values().iterator().next()).getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask2, TaskState.SUCCEEDED);
        app.stop();
        app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        mapTask2 = (Task)it.next();
        reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        app.waitForState(mapTask2, TaskState.SUCCEEDED);
        app.waitForState(reduceTask, TaskState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(((TaskAttempt)reduceTask.getAttempts().values().iterator().next()).getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        app.verifyCompleted();
    }

    @Test
    public void testOutputRecovery() throws Exception {
        int runCount = 0;
        MRAppWithHistory app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask1 = (Task)it.next();
        Task reduceTask1 = (Task)it.next();
        app.waitForState(mapTask1, TaskState.RUNNING);
        TaskAttempt task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        Assert.assertEquals((int)5467, (int)task1Attempt1.getShufflePort());
        app.waitForState(reduceTask1, TaskState.RUNNING);
        TaskAttempt reduce1Attempt1 = (TaskAttempt)reduceTask1.getAttempts().values().iterator().next();
        this.writeOutput(reduce1Attempt1, conf);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduce1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(reduceTask1, TaskState.SUCCEEDED);
        app.stop();
        app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        reduceTask1 = (Task)it.next();
        Task reduceTask2 = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        Assert.assertEquals((int)5467, (int)task1Attempt1.getShufflePort());
        app.waitForState(reduceTask1, TaskState.SUCCEEDED);
        app.waitForState(reduceTask2, TaskState.RUNNING);
        TaskAttempt reduce2Attempt = (TaskAttempt)reduceTask2.getAttempts().values().iterator().next();
        app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduce2Attempt.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(reduceTask2, TaskState.SUCCEEDED);
        app.waitForState(job, JobState.SUCCEEDED);
        app.verifyCompleted();
        this.validateOutput();
    }

    @Test
    public void testOutputRecoveryMapsOnly() throws Exception {
        int runCount = 0;
        MRAppWithHistory app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask1 = (Task)it.next();
        Task mapTask2 = (Task)it.next();
        Task reduceTask1 = (Task)it.next();
        app.waitForState(mapTask1, TaskState.RUNNING);
        TaskAttempt task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
        this.writeBadOutput(task1Attempt1, conf);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        Assert.assertEquals((int)5467, (int)task1Attempt1.getShufflePort());
        app.stop();
        app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        mapTask2 = (Task)it.next();
        reduceTask1 = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        Assert.assertEquals((int)5467, (int)task1Attempt1.getShufflePort());
        app.waitForState(mapTask2, TaskState.RUNNING);
        TaskAttempt task2Attempt1 = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task2Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask2, TaskState.SUCCEEDED);
        Assert.assertEquals((int)5467, (int)task2Attempt1.getShufflePort());
        app.waitForState(reduceTask1, TaskState.RUNNING);
        TaskAttempt reduce1Attempt1 = (TaskAttempt)reduceTask1.getAttempts().values().iterator().next();
        this.writeOutput(reduce1Attempt1, conf);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduce1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(reduceTask1, TaskState.SUCCEEDED);
        app.waitForState(job, JobState.SUCCEEDED);
        app.verifyCompleted();
        this.validateOutput();
    }

    @Test
    public void testRecoveryWithOldCommiter() throws Exception {
        int runCount = 0;
        MRAppWithHistory app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.mapper.new-api", false);
        conf.setBoolean("mapred.reducer.new-api", false);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask1 = (Task)it.next();
        Task reduceTask1 = (Task)it.next();
        app.waitForState(mapTask1, TaskState.RUNNING);
        TaskAttempt task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        Assert.assertEquals((int)5467, (int)task1Attempt1.getShufflePort());
        app.waitForState(reduceTask1, TaskState.RUNNING);
        TaskAttempt reduce1Attempt1 = (TaskAttempt)reduceTask1.getAttempts().values().iterator().next();
        this.writeOutput(reduce1Attempt1, conf);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduce1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(reduceTask1, TaskState.SUCCEEDED);
        app.stop();
        app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", false);
        conf.setBoolean("mapred.reducer.new-api", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        reduceTask1 = (Task)it.next();
        Task reduceTask2 = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        task1Attempt1 = (TaskAttempt)mapTask1.getAttempts().values().iterator().next();
        Assert.assertEquals((int)5467, (int)task1Attempt1.getShufflePort());
        app.waitForState(reduceTask1, TaskState.SUCCEEDED);
        app.waitForState(reduceTask2, TaskState.RUNNING);
        TaskAttempt reduce2Attempt = (TaskAttempt)reduceTask2.getAttempts().values().iterator().next();
        app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduce2Attempt.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(reduceTask2, TaskState.SUCCEEDED);
        app.waitForState(job, JobState.SUCCEEDED);
        app.verifyCompleted();
        this.validateOutput();
    }

    @Test
    public void testSpeculative() throws Exception {
        int runCount = 0;
        long am1StartTimeEst = System.currentTimeMillis();
        MRAppWithHistory app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        long jobStartTime = job.getReport().getStartTime();
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask1 = (Task)it.next();
        Task mapTask2 = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.RUNNING);
        app.waitForState(mapTask2, TaskState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT));
        int timeOut = 0;
        while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
            Thread.sleep(1000L);
            LOG.info((Object)"Waiting for next attempt to start");
        }
        Iterator t1it = mapTask1.getAttempts().values().iterator();
        TaskAttempt task1Attempt1 = (TaskAttempt)t1it.next();
        TaskAttempt task1Attempt2 = (TaskAttempt)t1it.next();
        TaskAttempt task2Attempt = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        ContainerId t1a2contId = task1Attempt2.getAssignedContainerID();
        LOG.info((Object)t1a2contId.toString());
        LOG.info((Object)task1Attempt1.getID().toString());
        LOG.info((Object)task1Attempt2.getID().toString());
        app.getContext().getEventHandler().handle((Event)new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount));
        app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
        app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
        app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
        Assert.assertEquals((String)"Reduce Task state not correct", (Object)TaskState.RUNNING, (Object)reduceTask.getReport().getTaskState());
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(task1Attempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED);
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        long task1StartTime = mapTask1.getReport().getStartTime();
        long task1FinishTime = mapTask1.getReport().getFinishTime();
        app.stop();
        long am2StartTimeEst = System.currentTimeMillis();
        app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);
        conf.set("mapreduce.output.fileoutputformat.outputdir", outputDir.toString());
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"No of tasks not correct", (int)3, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask1 = (Task)it.next();
        mapTask2 = (Task)it.next();
        reduceTask = (Task)it.next();
        app.waitForState(mapTask1, TaskState.SUCCEEDED);
        app.waitForState(mapTask2, TaskState.RUNNING);
        task2Attempt = (TaskAttempt)mapTask2.getAttempts().values().iterator().next();
        app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(((TaskAttempt)mapTask2.getAttempts().values().iterator().next()).getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask2, TaskState.SUCCEEDED);
        app.waitForState(reduceTask, TaskState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(((TaskAttempt)reduceTask.getAttempts().values().iterator().next()).getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        app.verifyCompleted();
        Assert.assertEquals((String)"Job Start time not correct", (long)jobStartTime, (long)job.getReport().getStartTime());
        Assert.assertEquals((String)"Task Start time not correct", (long)task1StartTime, (long)mapTask1.getReport().getStartTime());
        Assert.assertEquals((String)"Task Finish time not correct", (long)task1FinishTime, (long)mapTask1.getReport().getFinishTime());
        Assert.assertEquals((int)2, (int)job.getAMInfos().size());
        int attemptNum = 1;
        for (AMInfo amInfo : job.getAMInfos()) {
            Assert.assertEquals((int)attemptNum++, (int)amInfo.getAppAttemptId().getAttemptId());
            Assert.assertEquals((Object)amInfo.getAppAttemptId(), (Object)amInfo.getContainerId().getApplicationAttemptId());
            Assert.assertEquals((String)MRApp.NM_HOST, (String)amInfo.getNodeManagerHost());
            Assert.assertEquals((int)MRApp.NM_PORT, (int)amInfo.getNodeManagerPort());
            Assert.assertEquals((int)MRApp.NM_HTTP_PORT, (int)amInfo.getNodeManagerHttpPort());
        }
        long am1StartTimeReal = ((AMInfo)job.getAMInfos().get(0)).getStartTime();
        long am2StartTimeReal = ((AMInfo)job.getAMInfos().get(1)).getStartTime();
        Assert.assertTrue((am1StartTimeReal >= am1StartTimeEst && am1StartTimeReal <= am2StartTimeEst ? 1 : 0) != 0);
        Assert.assertTrue((am2StartTimeReal >= am2StartTimeEst && am2StartTimeReal <= System.currentTimeMillis() ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception {
        TaskAttemptContextImpl tContext = new TaskAttemptContextImpl(conf, (TaskAttemptID)TypeConverter.fromYarn((TaskAttemptId)attempt.getID()));
        TextOutputFormat theOutputFormat = new TextOutputFormat();
        RecordWriter theRecordWriter = theOutputFormat.getRecordWriter((TaskAttemptContext)tContext);
        NullWritable nullWritable = NullWritable.get();
        try {
            theRecordWriter.write((Object)this.key2, (Object)this.val2);
            theRecordWriter.write(null, (Object)nullWritable);
            theRecordWriter.write(null, (Object)this.val2);
            theRecordWriter.write((Object)nullWritable, (Object)this.val1);
            theRecordWriter.write((Object)this.key1, (Object)nullWritable);
            theRecordWriter.write((Object)this.key2, null);
            theRecordWriter.write(null, null);
            theRecordWriter.write((Object)this.key1, (Object)this.val1);
        }
        finally {
            theRecordWriter.close((TaskAttemptContext)tContext);
        }
        OutputFormat outputFormat = (OutputFormat)ReflectionUtils.newInstance((Class)tContext.getOutputFormatClass(), (Configuration)conf);
        OutputCommitter committer = outputFormat.getOutputCommitter((TaskAttemptContext)tContext);
        committer.commitTask((TaskAttemptContext)tContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeOutput(TaskAttempt attempt, Configuration conf) throws Exception {
        TaskAttemptContextImpl tContext = new TaskAttemptContextImpl(conf, (TaskAttemptID)TypeConverter.fromYarn((TaskAttemptId)attempt.getID()));
        TextOutputFormat theOutputFormat = new TextOutputFormat();
        RecordWriter theRecordWriter = theOutputFormat.getRecordWriter((TaskAttemptContext)tContext);
        NullWritable nullWritable = NullWritable.get();
        try {
            theRecordWriter.write((Object)this.key1, (Object)this.val1);
            theRecordWriter.write(null, (Object)nullWritable);
            theRecordWriter.write(null, (Object)this.val1);
            theRecordWriter.write((Object)nullWritable, (Object)this.val2);
            theRecordWriter.write((Object)this.key2, (Object)nullWritable);
            theRecordWriter.write((Object)this.key1, null);
            theRecordWriter.write(null, null);
            theRecordWriter.write((Object)this.key2, (Object)this.val2);
        }
        finally {
            theRecordWriter.close((TaskAttemptContext)tContext);
        }
        OutputFormat outputFormat = (OutputFormat)ReflectionUtils.newInstance((Class)tContext.getOutputFormatClass(), (Configuration)conf);
        OutputCommitter committer = outputFormat.getOutputCommitter((TaskAttemptContext)tContext);
        committer.commitTask((TaskAttemptContext)tContext);
    }

    private void validateOutput() throws IOException {
        File expectedFile = new File(new Path(outputDir, partFile).toString());
        StringBuffer expectedOutput = new StringBuffer();
        expectedOutput.append(this.key1).append('\t').append(this.val1).append("\n");
        expectedOutput.append(this.val1).append("\n");
        expectedOutput.append(this.val2).append("\n");
        expectedOutput.append(this.key2).append("\n");
        expectedOutput.append(this.key1).append("\n");
        expectedOutput.append(this.key2).append('\t').append(this.val2).append("\n");
        String output = TestRecovery.slurp((File)expectedFile);
        Assert.assertEquals((String)output, (String)expectedOutput.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static String slurp(File f) throws IOException {
        int len = (int)f.length();
        byte[] buf = new byte[len];
        FileInputStream in = new FileInputStream(f);
        String contents = null;
        try {
            in.read(buf, 0, len);
            contents = new String(buf, "UTF-8");
        }
        finally {
            in.close();
        }
        return contents;
    }

    public static void main(String[] arg) throws Exception {
        TestRecovery test = new TestRecovery();
        test.testCrashed();
    }
}

