/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.recovery;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;

public class RecoveryService
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(RecoveryService.class);
    private final AppContext appContext;
    public static final String RECOVERY_FATAL_OCCURRED_DIR = "RecoveryFatalErrorOccurred";
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue();
    private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
    private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
    private Thread eventHandlingThread;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private AtomicBoolean started = new AtomicBoolean(false);
    private int eventCounter = 0;
    private int eventsProcessed = 0;
    private final Object lock = new Object();
    private FileSystem recoveryDirFS;
    Path recoveryPath;
    Map<TezDAGID, FSDataOutputStream> outputStreamMap = new HashMap<TezDAGID, FSDataOutputStream>();
    private int bufferSize;
    private FSDataOutputStream summaryStream;
    private int unflushedEventsCount = 0;
    private long lastFlushTime = -1L;
    private int maxUnflushedEvents;
    private int flushInterval;
    private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);

    public RecoveryService(AppContext appContext) {
        super(RecoveryService.class.getName());
        this.appContext = appContext;
    }

    public void serviceInit(Configuration conf) throws Exception {
        LOG.info((Object)"Initializing RecoveryService");
        this.recoveryPath = this.appContext.getCurrentRecoveryDir();
        this.recoveryDirFS = FileSystem.get((URI)this.recoveryPath.toUri(), (Configuration)conf);
        this.bufferSize = conf.getInt("tez.dag.recovery.io.buffer.size", 8192);
        this.flushInterval = conf.getInt("tez.dag.recovery.flush.interval.secs", 30);
        this.maxUnflushedEvents = conf.getInt("tez.dag.recovery.max.unflushed.events", 100);
    }

    public void serviceStart() {
        LOG.info((Object)"Starting RecoveryService");
        this.lastFlushTime = this.appContext.getClock().getTime();
        this.eventHandlingThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!RecoveryService.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    DAGHistoryEvent event;
                    if (RecoveryService.this.recoveryFatalErrorOccurred.get()) {
                        LOG.error((Object)("Recovery failure occurred. Stopping recovery thread. Current eventQueueSize=" + RecoveryService.this.eventQueue.size()));
                        RecoveryService.this.eventQueue.clear();
                        return;
                    }
                    if (RecoveryService.this.eventCounter != 0 && RecoveryService.this.eventCounter % 1000 == 0) {
                        LOG.info((Object)("Event queue stats, eventsProcessedSinceLastUpdate=" + RecoveryService.this.eventsProcessed + ", eventQueueSize=" + RecoveryService.this.eventQueue.size()));
                        RecoveryService.this.eventCounter = 0;
                        RecoveryService.this.eventsProcessed = 0;
                    } else {
                        ++RecoveryService.this.eventCounter;
                    }
                    try {
                        event = (DAGHistoryEvent)RecoveryService.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)"EventQueue take interrupted. Returning");
                        return;
                    }
                    Object object = RecoveryService.this.lock;
                    synchronized (object) {
                        try {
                            ++RecoveryService.this.eventsProcessed;
                            RecoveryService.this.handleRecoveryEvent(event);
                        }
                        catch (Exception e) {
                            LOG.warn((Object)"Error handling recovery event", (Throwable)e);
                        }
                    }
                }
            }
        }, "RecoveryEventHandlingThread");
        this.eventHandlingThread.start();
        this.started.set(true);
    }

    public void serviceStop() {
        LOG.info((Object)"Stopping RecoveryService");
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.summaryStream != null) {
            try {
                LOG.info((Object)"Closing Summary Stream");
                this.summaryStream.hsync();
                this.summaryStream.close();
            }
            catch (IOException ioe) {
                LOG.warn((Object)"Error when closing summary stream", (Throwable)ioe);
            }
        }
        for (Map.Entry<TezDAGID, FSDataOutputStream> entry : this.outputStreamMap.entrySet()) {
            try {
                LOG.info((Object)("Closing Output Stream for DAG " + entry.getKey()));
                entry.getValue().hsync();
                entry.getValue().close();
            }
            catch (IOException ioe) {
                LOG.warn((Object)"Error when closing output stream", (Throwable)ioe);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(DAGHistoryEvent event) throws IOException {
        DAGSubmittedEvent dagSubmittedEvent;
        String dagName;
        if (this.stopped.get()) {
            LOG.warn((Object)("Igoring event as service stopped, eventType" + (Object)((Object)event.getHistoryEvent().getEventType())));
            return;
        }
        HistoryEventType eventType = event.getHistoryEvent().getEventType();
        if (this.recoveryFatalErrorOccurred.get()) {
            return;
        }
        if (!this.started.get()) {
            LOG.warn((Object)("Adding event of type " + (Object)((Object)eventType) + " to queue as service not started"));
            this.eventQueue.add(event);
            return;
        }
        TezDAGID dagId = event.getDagID();
        if (eventType.equals((Object)HistoryEventType.DAG_SUBMITTED) && (dagName = (dagSubmittedEvent = (DAGSubmittedEvent)event.getHistoryEvent()).getDAGName()) != null && dagName.startsWith("TezPreWarmDAG")) {
            this.skippedDAGs.add(dagId);
            return;
        }
        if (dagId == null || this.skippedDAGs.contains(dagId)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Skipping event for DAG, eventType=" + (Object)((Object)eventType) + ", dagId=" + (dagId == null ? "null" : dagId.toString()) + ", isSkippedDAG=" + (dagId == null ? "null" : Boolean.valueOf(this.skippedDAGs.contains(dagId)))));
            }
            return;
        }
        if (event.getHistoryEvent() instanceof SummaryEvent) {
            Object object = this.lock;
            synchronized (object) {
                block23: {
                    try {
                        SummaryEvent summaryEvent = (SummaryEvent)((Object)event.getHistoryEvent());
                        this.handleSummaryEvent(dagId, eventType, summaryEvent);
                        this.summaryStream.hsync();
                        if (summaryEvent.writeToRecoveryImmediately()) {
                            this.handleRecoveryEvent(event);
                            this.doFlush(this.outputStreamMap.get(event.getDagID()), this.appContext.getClock().getTime(), true);
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)("Queueing Non-immediate Summary/Recovery event of type" + eventType.name()));
                            }
                            this.eventQueue.add(event);
                        }
                        if (eventType.equals((Object)HistoryEventType.DAG_FINISHED)) {
                            LOG.info((Object)("DAG completed, dagId=" + event.getDagID() + ", queueSize=" + this.eventQueue.size()));
                            this.completedDAGs.add(dagId);
                            if (this.outputStreamMap.containsKey(dagId)) {
                                try {
                                    this.outputStreamMap.get(dagId).close();
                                    this.outputStreamMap.remove(dagId);
                                }
                                catch (IOException ioe) {
                                    LOG.warn((Object)("Error when trying to flush/close recovery file for dag, dagId=" + event.getDagID()));
                                }
                            }
                        }
                    }
                    catch (IOException ioe) {
                        LOG.error((Object)("Error handling summary event, eventType=" + (Object)((Object)event.getHistoryEvent().getEventType())), (Throwable)ioe);
                        Path fatalErrorDir = new Path(this.recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
                        try {
                            LOG.error((Object)("Adding a flag to ensure next AM attempt does not start up, flagFile=" + fatalErrorDir.toString()));
                            this.recoveryFatalErrorOccurred.set(true);
                            this.recoveryDirFS.mkdirs(fatalErrorDir);
                            if (!this.recoveryDirFS.exists(fatalErrorDir)) {
                                throw ioe;
                            }
                            LOG.error((Object)"Recovery failure occurred. Skipping all events");
                        }
                        catch (IOException e) {
                            LOG.fatal((Object)("Failed to create fatal error flag dir " + fatalErrorDir.toString()), (Throwable)e);
                            throw ioe;
                        }
                        if (!eventType.equals((Object)HistoryEventType.DAG_SUBMITTED)) break block23;
                        throw ioe;
                    }
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Queueing Non-Summary Recovery event of type " + eventType.name()));
        }
        this.eventQueue.add(event);
    }

    private void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Handling summary event, dagID=" + dagID + ", eventType=" + (Object)((Object)eventType)));
        }
        if (this.summaryStream == null) {
            Path summaryPath = new Path(this.recoveryPath, this.appContext.getApplicationID() + ".summary");
            this.summaryStream = !this.recoveryDirFS.exists(summaryPath) ? this.recoveryDirFS.create(summaryPath, false, this.bufferSize) : this.recoveryDirFS.append(summaryPath, this.bufferSize);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Writing recovery event to summary stream, dagId=" + dagID + ", eventType=" + (Object)((Object)eventType)));
        }
        summaryEvent.toSummaryProtoStream((OutputStream)this.summaryStream);
    }

    private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
        TezDAGID dagID;
        HistoryEventType eventType = event.getHistoryEvent().getEventType();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Handling recovery event of type " + (Object)((Object)event.getHistoryEvent().getEventType())));
        }
        if (this.completedDAGs.contains(dagID = event.getDagID())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Skipping Recovery Event as DAG completed, dagId=" + dagID + ", completed=" + this.completedDAGs.contains(dagID) + ", skipped=" + this.skippedDAGs.contains(dagID) + ", eventType=" + (Object)((Object)eventType)));
            }
            return;
        }
        if (!this.outputStreamMap.containsKey(dagID)) {
            FSDataOutputStream outputStream;
            Path dagFilePath = new Path(this.recoveryPath, dagID.toString() + ".recovery");
            if (this.recoveryDirFS.exists(dagFilePath)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Opening DAG recovery file in append mode, filePath=" + dagFilePath));
                }
                outputStream = this.recoveryDirFS.append(dagFilePath, this.bufferSize);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Opening DAG recovery file in create mode, filePath=" + dagFilePath));
                }
                outputStream = this.recoveryDirFS.create(dagFilePath, false, this.bufferSize);
            }
            this.outputStreamMap.put(dagID, outputStream);
        }
        FSDataOutputStream outputStream = this.outputStreamMap.get(dagID);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Writing recovery event to output stream, dagId=" + dagID + ", eventType=" + (Object)((Object)eventType)));
        }
        ++this.unflushedEventsCount;
        outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
        event.getHistoryEvent().toProtoStream((OutputStream)outputStream);
        if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED, HistoryEventType.DAG_FINISHED).contains((Object)eventType)) {
            this.maybeFlush(outputStream);
        }
    }

    private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
        long currentTime = this.appContext.getClock().getTime();
        boolean doFlush = false;
        if (this.maxUnflushedEvents >= 0 && this.unflushedEventsCount >= this.maxUnflushedEvents) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Max unflushed events count reached. Flushing recovery data, unflushedEventsCount=" + this.unflushedEventsCount + ", maxUnflushedEvents=" + this.maxUnflushedEvents));
            }
            doFlush = true;
        } else if (this.flushInterval >= 0 && currentTime - this.lastFlushTime >= (long)(this.flushInterval * 1000)) {
            LOG.debug((Object)("Flush interval time period elapsed. Flushing recovery data, lastTimeSinceFLush=" + this.lastFlushTime + ", timeSinceLastFlush=" + (currentTime - this.lastFlushTime)));
            doFlush = true;
        }
        if (!doFlush) {
            return;
        }
        this.doFlush(outputStream, currentTime, false);
    }

    private void doFlush(FSDataOutputStream outputStream, long currentTime, boolean sync) throws IOException {
        if (sync) {
            outputStream.hsync();
        } else {
            outputStream.hflush();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Flushing output stream, lastTimeSinceFLush=" + this.lastFlushTime + ", timeSinceLastFlush=" + (currentTime - this.lastFlushTime) + ", unflushedEventsCount=" + this.unflushedEventsCount + ", maxUnflushedEvents=" + this.maxUnflushedEvents));
        }
        this.unflushedEventsCount = 0;
        this.lastFlushTime = currentTime;
    }

    public boolean hasRecoveryFailed() {
        return this.recoveryFatalErrorOccurred.get();
    }
}

