/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class EditLogTailer {
    public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
    private final EditLogTailerThread tailerThread = new EditLogTailerThread();
    private final Configuration conf;
    private final FSNamesystem namesystem;
    private FSEditLog editLog;
    private InetSocketAddress activeAddr;
    private NamenodeProtocol cachedActiveProxy = null;
    private long lastRollTriggerTxId = -12345L;
    private long lastLoadedTxnId = -12345L;
    private long lastLoadTimeMs;
    private final long logRollPeriodMs;
    private final long sleepTimeMs;

    public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
        this.conf = conf;
        this.namesystem = namesystem;
        this.editLog = namesystem.getEditLog();
        this.lastLoadTimeMs = Time.monotonicNow();
        this.logRollPeriodMs = conf.getInt("dfs.ha.log-roll.period", 120) * 1000;
        if (this.logRollPeriodMs >= 0L) {
            this.activeAddr = this.getActiveNodeAddress();
            Preconditions.checkArgument(this.activeAddr.getPort() > 0, "Active NameNode must have an IPC port configured. Got address '%s'", this.activeAddr);
            LOG.info("Will roll logs on active node at " + this.activeAddr + " every " + this.logRollPeriodMs / 1000L + " seconds.");
        } else {
            LOG.info("Not going to trigger log rolls on active node because dfs.ha.log-roll.period is negative.");
        }
        this.sleepTimeMs = conf.getInt("dfs.ha.tail-edits.period", 60) * 1000;
        LOG.debug("logRollPeriodMs=" + this.logRollPeriodMs + " sleepTime=" + this.sleepTimeMs);
    }

    private InetSocketAddress getActiveNodeAddress() {
        Configuration activeConf = HAUtil.getConfForOtherNode(this.conf);
        return NameNode.getServiceAddress(activeConf, true);
    }

    private NamenodeProtocol getActiveNodeProxy() throws IOException {
        if (this.cachedActiveProxy == null) {
            int rpcTimeout = this.conf.getInt("dfs.ha.log-roll.rpc.timeout", 20000);
            NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class, RPC.getProtocolVersion(NamenodeProtocolPB.class), this.activeAddr, this.conf, rpcTimeout, Long.MAX_VALUE);
            this.cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
        }
        assert (this.cachedActiveProxy != null);
        return this.cachedActiveProxy;
    }

    public void start() {
        this.tailerThread.start();
    }

    public void stop() throws IOException {
        this.tailerThread.setShouldRun(false);
        this.tailerThread.interrupt();
        try {
            this.tailerThread.join();
        }
        catch (InterruptedException e) {
            LOG.warn("Edit log tailer thread exited with an exception");
            throw new IOException(e);
        }
    }

    @VisibleForTesting
    FSEditLog getEditLog() {
        return this.editLog;
    }

    @VisibleForTesting
    public void setEditLog(FSEditLog editLog) {
        this.editLog = editLog;
    }

    public void catchupDuringFailover() throws IOException {
        Preconditions.checkState(this.tailerThread == null || !this.tailerThread.isAlive(), "Tailer thread should not be running once failover starts");
        SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                try {
                    EditLogTailer.this.doTailEdits();
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
                return null;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    void doTailEdits() throws IOException, InterruptedException {
        this.namesystem.writeLockInterruptibly();
        try {
            long editsLoaded;
            FSImage image;
            block14: {
                Collection<EditLogInputStream> streams;
                image = this.namesystem.getFSImage();
                long lastTxnId = image.getLastAppliedTxId();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("lastTxnId: " + lastTxnId);
                }
                try {
                    streams = this.editLog.selectInputStreams(lastTxnId + 1L, 0L, null, false);
                }
                catch (IOException ioe) {
                    LOG.warn("Edits tailer failed to find any streams. Will try again later.", ioe);
                    this.namesystem.writeUnlock();
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("edit streams to load from: " + streams.size());
                }
                editsLoaded = 0L;
                try {
                    editsLoaded = image.loadEdits(streams, this.namesystem);
                    if (editsLoaded <= 0L && !LOG.isDebugEnabled()) break block14;
                }
                catch (EditLogInputException elie) {
                    try {
                        editsLoaded = elie.getNumEditsLoaded();
                        throw elie;
                    }
                    catch (Throwable throwable) {
                        if (editsLoaded <= 0L) {
                            if (!LOG.isDebugEnabled()) throw throwable;
                        }
                        LOG.info(String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId));
                        throw throwable;
                    }
                }
                LOG.info(String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId));
            }
            if (editsLoaded > 0L) {
                this.lastLoadTimeMs = Time.monotonicNow();
            }
            this.lastLoadedTxnId = image.getLastAppliedTxId();
            return;
        }
        finally {
            this.namesystem.writeUnlock();
        }
    }

    public long getLastLoadTimeMs() {
        return this.lastLoadTimeMs;
    }

    private boolean tooLongSinceLastLoad() {
        return this.logRollPeriodMs >= 0L && Time.monotonicNow() - this.lastLoadTimeMs > this.logRollPeriodMs;
    }

    private void triggerActiveLogRoll() {
        LOG.info("Triggering log roll on remote NameNode " + this.activeAddr);
        try {
            this.getActiveNodeProxy().rollEditLog();
            this.lastRollTriggerTxId = this.lastLoadedTxnId;
        }
        catch (IOException ioe) {
            LOG.warn("Unable to trigger a roll of the active NN", ioe);
        }
    }

    private class EditLogTailerThread
    extends Thread {
        private volatile boolean shouldRun;

        private EditLogTailerThread() {
            super("Edit log tailer");
            this.shouldRun = true;
        }

        private void setShouldRun(boolean shouldRun) {
            this.shouldRun = shouldRun;
        }

        @Override
        public void run() {
            SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Object>(){

                @Override
                public Object run() {
                    EditLogTailerThread.this.doWork();
                    return null;
                }
            });
        }

        private void doWork() {
            while (this.shouldRun) {
                try {
                    if (EditLogTailer.this.tooLongSinceLastLoad() && EditLogTailer.this.lastRollTriggerTxId < EditLogTailer.this.lastLoadedTxnId) {
                        EditLogTailer.this.triggerActiveLogRoll();
                    }
                    if (!this.shouldRun) break;
                    EditLogTailer.this.namesystem.cpLockInterruptibly();
                    try {
                        EditLogTailer.this.doTailEdits();
                    }
                    finally {
                        EditLogTailer.this.namesystem.cpUnlock();
                    }
                }
                catch (EditLogInputException elie) {
                    LOG.warn("Error while reading edits from disk. Will try again.", elie);
                }
                catch (InterruptedException ie) {
                    continue;
                }
                catch (Throwable t) {
                    LOG.fatal("Unknown error encountered while tailing edits. Shutting down standby NN.", t);
                    ExitUtil.terminate(1, t);
                }
                try {
                    Thread.sleep(EditLogTailer.this.sleepTimeMs);
                }
                catch (InterruptedException e) {
                    LOG.warn("Edit log tailer interrupted", e);
                }
            }
        }
    }
}

