/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.FollowerSyncRequest;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.QuorumPacket;

public class FollowerHandler
extends Thread {
    private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
    public Socket sock;
    Leader leader;
    long tickOfLastAck;
    LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue();
    private BinaryInputArchive ia;
    private BinaryOutputArchive oa;
    private BufferedOutputStream bufferedOutput;
    QuorumPacket proposalOfDeath = new QuorumPacket();

    FollowerHandler(Socket sock, Leader leader) throws IOException {
        super("FollowerHandler-" + sock.getRemoteSocketAddress());
        this.sock = sock;
        this.leader = leader;
        leader.addFollowerHandler(this);
        this.start();
    }

    private void sendPackets() throws InterruptedException {
        QuorumPacket p;
        long traceMask = 16L;
        while ((p = this.queuedPackets.take()) != this.proposalOfDeath) {
            if (p.getType() == 5) {
                traceMask = 128L;
            }
            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
            try {
                this.oa.writeRecord(p, "packet");
                this.bufferedOutput.flush();
            }
            catch (IOException e) {
                if (this.sock.isClosed()) break;
                LOG.warn((Object)"Unexpected exception", (Throwable)e);
                break;
            }
        }
    }

    public static String packetToString(QuorumPacket p) {
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        QuorumPacket qp;
        block36: {
            this.ia = BinaryInputArchive.getArchive(new BufferedInputStream(this.sock.getInputStream()));
            this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
            this.oa = BinaryOutputArchive.getArchive(this.bufferedOutput);
            qp = new QuorumPacket();
            this.ia.readRecord(qp, "packet");
            if (qp.getType() == 11) break block36;
            LOG.error((Object)("First packet " + qp.toString() + " is not LASTZXID!"));
            LOG.warn((Object)("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********"));
            try {
                this.queuedPackets.put(this.proposalOfDeath);
            }
            catch (InterruptedException e) {
                LOG.error((Object)"FIXMSG", (Throwable)e);
            }
            this.shutdown();
            return;
        }
        try {
            long peerLastZxid = qp.getZxid();
            int packetToSend = 15;
            boolean logTxns = true;
            long zxidToSend = 0L;
            LinkedList linkedList = this.leader.zk.committedLog;
            synchronized (linkedList) {
                if (this.leader.zk.committedLog.size() != 0) {
                    if (this.leader.zk.maxCommittedLog >= peerLastZxid && this.leader.zk.minCommittedLog <= peerLastZxid) {
                        packetToSend = 13;
                        zxidToSend = this.leader.zk.maxCommittedLog;
                        for (Leader.Proposal propose : this.leader.zk.committedLog) {
                            if (propose.packet.getZxid() <= peerLastZxid) continue;
                            this.queuePacket(propose.packet);
                            QuorumPacket qcommit = new QuorumPacket(4, propose.packet.getZxid(), null, null);
                            this.queuePacket(qcommit);
                        }
                    }
                } else {
                    logTxns = false;
                }
            }
            long leaderLastZxid = this.leader.startForwarding(this, peerLastZxid);
            QuorumPacket newLeaderQP = new QuorumPacket(10, leaderLastZxid, null, null);
            this.oa.writeRecord(newLeaderQP, "packet");
            this.bufferedOutput.flush();
            if (peerLastZxid == leaderLastZxid) {
                packetToSend = 13;
                zxidToSend = leaderLastZxid;
            }
            if (logTxns && peerLastZxid > this.leader.zk.maxCommittedLog) {
                packetToSend = 14;
                zxidToSend = this.leader.zk.maxCommittedLog;
            }
            this.oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            this.bufferedOutput.flush();
            if (packetToSend == 15) {
                LOG.warn((Object)("Sending snapshot last zxid of peer is 0x" + Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x" + Long.toHexString(leaderLastZxid)));
                this.leader.zk.serializeSnapshot(this.oa);
                this.oa.writeString("BenWasHere", "signature");
            }
            this.bufferedOutput.flush();
            this.queuedPackets.add(new QuorumPacket(12, -1L, null, null));
            new Thread(){

                public void run() {
                    Thread.currentThread().setName("Sender-" + FollowerHandler.this.sock.getRemoteSocketAddress());
                    try {
                        FollowerHandler.this.sendPackets();
                    }
                    catch (InterruptedException e) {
                        LOG.warn((Object)"Interrupted", (Throwable)e);
                    }
                }
            }.start();
            block25: while (true) {
                qp = new QuorumPacket();
                this.ia.readRecord(qp, "packet");
                long traceMask = 16L;
                if (qp.getType() == 5) {
                    traceMask = 128L;
                }
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                this.tickOfLastAck = this.leader.self.tick;
                switch (qp.getType()) {
                    case 3: {
                        this.leader.processAck(qp.getZxid(), this.sock.getLocalSocketAddress());
                        break;
                    }
                    case 5: {
                        int to;
                        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                        DataInputStream dis = new DataInputStream(bis);
                        while (dis.available() > 0) {
                            long sess = dis.readLong();
                            to = dis.readInt();
                            this.leader.zk.touch(sess, to);
                        }
                        continue block25;
                    }
                    case 6: {
                        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                        DataInputStream dis = new DataInputStream(bis);
                        long id = dis.readLong();
                        int to = dis.readInt();
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        DataOutputStream dos = new DataOutputStream(bos);
                        dos.writeLong(id);
                        boolean valid = this.leader.zk.touch(id, to);
                        ZooTrace.logTraceMessage(LOG, 32L, "Session 0x" + Long.toHexString(id) + " is valid: " + valid);
                        dos.writeBoolean(valid);
                        qp.setData(bos.toByteArray());
                        this.queuedPackets.add(qp);
                        break;
                    }
                    case 1: {
                        ByteBuffer bb = ByteBuffer.wrap(qp.getData());
                        long sessionId = bb.getLong();
                        int cxid = bb.getInt();
                        int type = bb.getInt();
                        bb = bb.slice();
                        if (type == 9) {
                            this.leader.zk.submitRequest(new FollowerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()));
                            break;
                        }
                        this.leader.zk.submitRequest(null, sessionId, type, cxid, bb, qp.getAuthinfo());
                        break;
                    }
                }
            }
        }
        catch (IOException e) {
            if (this.sock != null && !this.sock.isClosed()) {
                LOG.error((Object)"FIXMSG", (Throwable)e);
            }
            LOG.warn((Object)("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********"));
            try {
                this.queuedPackets.put(this.proposalOfDeath);
            }
            catch (InterruptedException e2) {
                LOG.error((Object)"FIXMSG", (Throwable)e2);
            }
            this.shutdown();
        }
        catch (InterruptedException e) {
            try {
                LOG.error((Object)"FIXMSG", (Throwable)e);
                LOG.warn((Object)("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********"));
            }
            catch (Throwable throwable) {
                LOG.warn((Object)("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********"));
                try {
                    this.queuedPackets.put(this.proposalOfDeath);
                }
                catch (InterruptedException e3) {
                    LOG.error((Object)"FIXMSG", (Throwable)e3);
                }
                this.shutdown();
                throw throwable;
            }
            try {
                this.queuedPackets.put(this.proposalOfDeath);
            }
            catch (InterruptedException e4) {
                LOG.error((Object)"FIXMSG", (Throwable)e4);
            }
            this.shutdown();
        }
    }

    public void shutdown() {
        try {
            if (this.sock != null && !this.sock.isClosed()) {
                this.sock.close();
            }
        }
        catch (IOException e) {
            LOG.error((Object)"FIXMSG", (Throwable)e);
        }
        this.leader.removeFollowerHandler(this);
    }

    public long tickOfLastAck() {
        return this.tickOfLastAck;
    }

    public void ping() {
        QuorumPacket ping = new QuorumPacket(5, this.leader.lastProposed, null, null);
        this.queuePacket(ping);
    }

    void queuePacket(QuorumPacket p) {
        this.queuedPackets.add(p);
    }

    public boolean synced() {
        return this.isAlive() && this.tickOfLastAck >= (long)(this.leader.self.tick - this.leader.self.syncLimit);
    }
}

