/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.impl;

import com.solacesystems.common.util.LogWrapper;
import com.solacesystems.jcsmp.InvalidOperationException;
import com.solacesystems.jcsmp.JCSMPErrorResponseException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPInterruptedException;
import com.solacesystems.jcsmp.JCSMPProducerEventHandler;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.ProducerEvent;
import com.solacesystems.jcsmp.ProducerFlowProperties;
import com.solacesystems.jcsmp.impl.ADManager;
import com.solacesystems.jcsmp.impl.ContextImpl;
import com.solacesystems.jcsmp.impl.JCSMPUtils;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessage;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer;
import com.solacesystems.jcsmp.impl.MsgIdInfo;
import com.solacesystems.jcsmp.impl.TransactionSizeExceededException;
import com.solacesystems.jcsmp.impl.flow.ProducerEventArgsImpl;
import com.solacesystems.jcsmp.impl.queues.ConditionalBoundedMessageQueue;
import com.solacesystems.jcsmp.impl.queues.ProcessElementsTask;
import com.solacesystems.jcsmp.impl.timers.PubRetransmitTimedTask;
import com.solacesystems.jcsmp.impl.transaction.BaseTransactedSessionImpl;
import com.solacesystems.jcsmp.protocol.SeqNumAllocator;
import com.solacesystems.jcsmp.protocol.impl.SeqNum63bAllocator;
import com.solacesystems.jcsmp.protocol.nio.impl.ProducerErrorNotification;
import com.solacesystems.jcsmp.protocol.nio.impl.ProducerResponseNotification;
import com.solacesystems.jcsmp.statistics.StatType;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.osgi.annotation.versioning.ProviderType;

@ProviderType
public class PubADManager
extends ADManager {
    protected static int instanceCount = 0;
    public int pub_Ack_Time = -1;
    public int pub_Ack_Window_Size = -1;
    public int configured_Pub_Ack_Window_Size = -1;
    public int max_Resends = -1;
    public boolean rtr_Windowed_Ack = true;
    public String ack_Event_Mode = "SUPPORTED_ACK_EVENT_MODE_PER_MSG";
    public volatile long flow_Id = -1L;
    public volatile long pub_Id = -1L;
    public String flow_Name = null;
    private long _dbg_lastMsgIdAck = 0L;
    private PubState pub_state;
    public final PubState STATE_SENDING;
    public final PubState STATE_RETRANSMITTING;
    public final PubState STATE_CLOSED;
    public final PubState STATE_UNBOUND;
    public final PubState STATE_RETRANSMITTING_ALL;
    public volatile SeqNumAllocator idAllocator = new SeqNum63bAllocator("PubFlow");
    private final LogWrapper Trace = new LogWrapper(PubADManager.class);
    ConditionalBoundedMessageQueue _msgQueue;
    JCSMPXMLMessageProducer _producer;
    PubRetransmitTimedTask _pubAckTask;

    public PubADManager(JCSMPXMLMessageProducer producer, ContextImpl context) {
        super(context);
        this._producer = producer;
        this.idAllocator.getNext63b();
        this.STATE_SENDING = new StateSending(this);
        this.STATE_RETRANSMITTING = new StateRetransmitting(this);
        this.STATE_CLOSED = new StateClosed(this);
        this.STATE_UNBOUND = new StateUnbound(this);
        this.STATE_RETRANSMITTING_ALL = new StateRetransmittingAll(this);
        this.pub_state = this.STATE_UNBOUND;
    }

    protected synchronized boolean updateState(PubState newstate, StateChangeInput input) throws JCSMPException {
        if (this.Trace.isDebugEnabled()) {
            String debugStr = input == null ? "null" : input.toString();
            this.Trace.debug("State Change: " + this.pub_state.toString() + "->" + newstate.toString() + " with " + debugStr);
        }
        this.pub_state = newstate;
        this.pub_state.enter(input);
        return true;
    }

    protected boolean isTransactedSessionAndMarkedAsRollback() {
        return this._producer.isTransacted() && (this._producer.getTransactedSession().isMarkedAsRollback() || this._producer.getTransactedSession().isRollbackOnlySet(this._producer));
    }

    public PubState getState() {
        return this.pub_state;
    }

    public void cancelPendingResendTask() {
        this._producer.stopMsgRetransmit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reInit() {
        super.reInit();
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("PUBADMGR_ReInit");
        }
        Object object = this._ackTimerLock;
        synchronized (object) {
            if (this._ackTimer != null) {
                this.clearADTimer();
            }
        }
        this._pubAckTask = new PubRetransmitTimedTask(this);
    }

    public JCSMPXMLMessageProducer getMessageProducer() {
        return this._producer;
    }

    public void setFlowId(long id) {
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("set flowId:" + id);
        }
        this.flow_Id = id;
        this._producer.updateLogCntextInfo();
        String logContextInfo = this._producer.getLogContextInfo();
        this.Trace.setContextInfo(logContextInfo);
        super.updateLogContextInfo(logContextInfo);
    }

    public long getFlowId() {
        return this.flow_Id;
    }

    public String getFlowName() {
        return this.flow_Name;
    }

    public void setPub_Ack_Window_Size(int pub_Ack_Window_Size) {
        this.pub_Ack_Window_Size = this._producer.isTransacted() && !this._producer.getTransactedSession().isTransportAckExpected() ? 256 : pub_Ack_Window_Size;
    }

    public void resetAdFlow() {
        this.flow_Name = null;
        this.setLastTransportAcked(0L);
        this.setLastMessageIdSent(0L);
        this.setLastMessageIdAcked(0L);
        this.idAllocator.setToNoCheck(1L);
    }

    public int getPub_Ack_Window_Size() {
        return this.pub_Ack_Window_Size;
    }

    public void setRtr_Windowed_Ack(boolean rtr_Windowed_Ack) {
        this.rtr_Windowed_Ack = rtr_Windowed_Ack;
    }

    public boolean isRtr_Windowed_Ack() {
        return this.rtr_Windowed_Ack;
    }

    public long setMessageIdParamsOnPubMessage(JCSMPXMLMessage message) {
        long msgId_64bit = this.idAllocator.getNext63b();
        message.setMessageIdLong(msgId_64bit);
        message.setNewMsgIdRequired(false);
        message.setPrevMessageId(this.getLastMessageIdSent());
        if (JCSMPUtils.isAdMessage(message)) {
            this.setLastMessageIdSent(msgId_64bit);
        }
        return msgId_64bit;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void renumberMessageIdParamsOnPubMessages(long respLastIdAcked) {
        int numMsgs = 0;
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug(String.format("AD pub flow message renumbering: flowId=" + this.flow_Id + "; respLastIdAcked=" + respLastIdAcked, new Object[0]));
        }
        try {
            if (this._msgQueue != null && (numMsgs = this._msgQueue.msgIdRenumbering(respLastIdAcked)) > 0) {
                JCSMPProducerEventHandler eventHandler = this._producer.getProducerEventHandler();
                String infoStr = "Unknown Publisher Flow (flowId=" + this.flow_Id + ") recovered: " + numMsgs + " messages renumbered and resent (lastMessageIdSent =" + respLastIdAcked + ")";
                if (eventHandler != null) {
                    ProducerEventArgsImpl event = new ProducerEventArgsImpl(ProducerEvent.REPUBLISH_UNACKED_MESSAGES, infoStr, null, 0, numMsgs);
                    eventHandler.handleEvent(event);
                }
                if (this.Trace.isInfoEnabled()) {
                    this.Trace.info(infoStr);
                }
            }
        }
        finally {
            this.setLastMessageIdSent(respLastIdAcked + (long)numMsgs);
            this.idAllocator.setToNoCheck(respLastIdAcked + (long)numMsgs + 1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startADTimer() {
        if (this._producer.getTransactedSession() != null && !this._producer.getTransactedSession().isTransportAckExpected()) {
            return;
        }
        this.validateIsInitialized();
        String logmsg = null;
        if (this.Trace.isDebugEnabled()) {
            logmsg = "Starting pub ad timer: ";
        }
        Object object = this._ackTimerLock;
        synchronized (object) {
            if (this._ackTimer == null || !this._ackTimer.isActive()) {
                this._ackTimer = this._ackTimer == null ? this._timerQueue.schedule_relative(this.pub_Ack_Time, this._pubAckTask) : this._timerQueue.schedule_relative(this.pub_Ack_Time, this._pubAckTask, this._ackTimer);
                if (this.Trace.isDebugEnabled()) {
                    logmsg = logmsg + "scheduled new timer in " + this.pub_Ack_Time;
                }
            } else if (this.Trace.isDebugEnabled()) {
                logmsg = logmsg + "already scheduled in " + (this._ackTimer.getTimeout() - System.currentTimeMillis()) + ", do nothing";
            }
        }
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug(logmsg);
        }
    }

    public void initMessageQueue() {
        this.validateIsInitialized();
        if (this._msgQueue == null) {
            if (this.Trace.isDebugEnabled()) {
                this.Trace.debug(String.format("Init message queue: size=%s", this.pub_Ack_Window_Size));
            }
            Callable<Object> actionOnEnqueueOnClose = this._producer.isTransacted() && !this._producer.getTransactedSession().isTransportAckExpected() ? new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    throw new TransactionSizeExceededException("Too many messages in transaction.");
                }
            } : null;
            this._msgQueue = new ConditionalBoundedMessageQueue(this.pub_Ack_Window_Size, actionOnEnqueueOnClose);
        }
        this._msgQueue.activate();
    }

    public void clearMessageQueue(boolean closeQueue) {
        this.validateIsInitialized();
        if (this._msgQueue != null) {
            if (closeQueue) {
                this._msgQueue.deactivate();
            }
            ReturnUnackedMgsToPoolTask task = new ReturnUnackedMgsToPoolTask(this._msgQueue);
            try {
                int count = this._msgQueue.processElements(task);
                if (this.Trace.isDebugEnabled()) {
                    this.Trace.debug(String.format("Return %s AD messages to pool", count));
                }
            }
            catch (JCSMPException e) {
                this.Trace.warn("Unexpected exception occurred while returning AD msgs to pool", e);
            }
        }
        this.setLastMessageIdAcked(this.getLastMessageIdSent());
    }

    public void clearMessageQueue() {
        this.clearMessageQueue(true);
    }

    public void suspendMsgQueue() {
        this.validateIsInitialized();
        if (this._msgQueue != null) {
            this._msgQueue.suspend();
        }
    }

    public void resumeMsgQueue() {
        this.validateIsInitialized();
        if (this._msgQueue != null) {
            this._msgQueue.resume();
        }
    }

    public int processWindowedAck(long msgId) throws JCSMPException {
        this.validateIsInitialized();
        if (msgId < 0L) {
            return 0;
        }
        this._dbg_lastMsgIdAck = msgId;
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug(String.format("Processing windowed ack ackid=%s, ackEventCode=%s", msgId, this.ack_Event_Mode));
        }
        JCSMPStreamingPublishEventHandler cbHandler = null;
        try {
            cbHandler = this._producer.getStreamingCallbackHandler();
        }
        catch (InvalidOperationException invalidOperationException) {
            // empty catch block
        }
        ProcessWindowedAckTask ackTask = new ProcessWindowedAckTask(this._msgQueue, this._producer, cbHandler, msgId, this.ack_Event_Mode);
        int count = this._msgQueue.processElements(ackTask);
        return count;
    }

    public int processWindowedAckError(long msgId, JCSMPException e) throws JCSMPException {
        this.validateIsInitialized();
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug(String.format("Processing windowed error ackid=%s, ackEventCode=%s", msgId, this.ack_Event_Mode));
        }
        this.processWindowedAck(msgId - 1L);
        JCSMPStreamingPublishEventHandler cbHandler = null;
        try {
            cbHandler = this._producer.getStreamingCallbackHandler();
        }
        catch (InvalidOperationException invalidOperationException) {
            // empty catch block
        }
        ProcessWindowedAckErrorTask ackTask = new ProcessWindowedAckErrorTask(this._msgQueue, this._producer, cbHandler, msgId, e);
        int count = this._msgQueue.processElements(ackTask);
        return count;
    }

    public boolean transactedProcessWindowedAckError(long msgId, JCSMPException e) throws JCSMPException {
        this.validateIsInitialized();
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug(String.format("Processing windowed error ackid=%s, ackEventCode=%s", msgId, this.ack_Event_Mode));
        }
        this.processWindowedAck(msgId - 1L);
        ProcessTransactedWindowedAckErrorTask ackTask = new ProcessTransactedWindowedAckErrorTask(this._msgQueue, this.getLastMessageIdSent());
        int count = this._msgQueue.processElements(ackTask);
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("Message queue is empty: " + this._msgQueue.isEmpty());
        }
        return count > 0;
    }

    public synchronized void handleAckTimeout() {
        block3: {
            try {
                if (this.Trace.isTraceEnabled()) {
                    this.Trace.trace("handleAckTimeout: state=" + this.pub_state.toString());
                }
                this.pub_state.handleTimeout();
            }
            catch (JCSMPException e) {
                if (!this.Trace.isDebugEnabled()) break block3;
                this.Trace.debug("got exception: ", e);
            }
        }
    }

    protected synchronized void handleMsgEnqueued() {
        this.pub_state.handleMsgEnqueued();
    }

    protected synchronized void setLastIdSent(long id) {
        this.pub_state.setLastIdSent(id);
    }

    public synchronized JCSMPXMLMessage getNextADMsgForRetransmit() {
        return this.pub_state.getNextADMsgForRetransmit();
    }

    protected long getExitRetransmitId() {
        return this._msgQueue.getExitRetransmitId();
    }

    protected synchronized boolean isAckImmediatelyRequired() {
        return this.pub_state.isAckImmediatelyRequired();
    }

    public synchronized boolean handleClientAck(long lastAckedMsgId, JCSMPErrorResponseException err, boolean retransmitRequest, boolean rollbackOnly) throws JCSMPException {
        if (this.Trace.isTraceEnabled()) {
            this.Trace.trace("handleClientAck (retransmitRequest=" + retransmitRequest + "): state=" + this.pub_state.toString());
        }
        this.clearADTimer();
        this.setLastMessageIdAcked(lastAckedMsgId);
        int numAcked = err == null ? this.processWindowedAck(lastAckedMsgId) : this.processWindowedAckError(lastAckedMsgId, err);
        if (!this.isQueueEmpty() && this.isRetransmitNotStopped()) {
            this.startADTimer();
        }
        if (this._producer.hasStreamingCallback()) {
            this._producer.checkErrorResponseForNoCug(err);
        }
        if (retransmitRequest && rollbackOnly) {
            this.clearMessageQueue(false);
            retransmitRequest = false;
        }
        this.pub_state.handleClientAck(lastAckedMsgId, numAcked, retransmitRequest);
        return numAcked > 0;
    }

    private boolean isRetransmitNotStopped() {
        if (this._producer.getTransactedSession() != null) {
            return this._producer.getTransactedSession().isRetransmitNotStopped();
        }
        return true;
    }

    public synchronized void handleRetransmitDone() throws JCSMPException {
        if (this.Trace.isTraceEnabled()) {
            this.Trace.trace("handleRetransmitDone: state=" + this.pub_state.toString());
        }
        this.pub_state.handleRetransmitDone();
    }

    public synchronized void handleFlowOpenResponse() throws JCSMPException {
        if (this.Trace.isTraceEnabled()) {
            this.Trace.trace("handleFlowOpenResponse: state=" + this.pub_state.toString());
        }
        this.pub_state.handleFlowOpenResponse();
    }

    public synchronized void handlePubFlowResumed() throws JCSMPException {
        if (this.Trace.isTraceEnabled()) {
            this.Trace.trace("handlePubFlowResumed: state=" + this.pub_state.toString() + ", lastReceivedMsgId=" + this.getLastTransportAcked());
        }
        this.pub_state.handlePubFlowResumed();
    }

    public synchronized void handlePreReconnect() {
        if (this.Trace.isTraceEnabled()) {
            this.Trace.trace("handlePreReconnect: state=" + this.pub_state.toString());
        }
        this.pub_state.handlePreReconnect();
    }

    public void enqueueMsgWithIdUpdateWithThrows(JCSMPXMLMessage msg) throws JCSMPException {
        this.validateIsInitialized();
        try {
            this._msgQueue.queueMsgWithIdUpdate(msg, this);
            this.handleMsgEnqueued();
        }
        catch (InterruptedException e) {
            this.Trace.warn(e);
            throw new JCSMPInterruptedException("Message enqueue interrupted", e);
        }
    }

    protected void notifyReconnectAborted() {
        this._msgQueue.clear();
    }

    public void enqueueMsgWithIdUpdate(JCSMPXMLMessage msg) throws InvalidOperationException {
        try {
            this.enqueueMsgWithIdUpdateWithThrows(msg);
        }
        catch (JCSMPException e) {
            this.Trace.debug(e);
        }
    }

    public boolean isQueueFull() {
        this.validateIsInitialized();
        return this._msgQueue.isFull();
    }

    public boolean isQueueEmpty() {
        this.validateIsInitialized();
        return this._msgQueue.isEmpty();
    }

    public boolean noFirstRetransmitMsg(long prev_id) {
        if (this._msgQueue.isEmpty()) {
            if (this.Trace.isDebugEnabled()) {
                this.Trace.debug("checkFirstQueuedMsgPrevId: empty queue");
            }
            return true;
        }
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("checkFirstQueuedMsgPrevId: " + this._msgQueue.peek().toString());
        }
        return this._msgQueue.peek().prevMessageId > prev_id;
    }

    public boolean hasRetransmitMsgs() {
        if (!this._msgQueue.isEmpty()) {
            if (this.Trace.isDebugEnabled()) {
                this.Trace.debug("hasRetransmitMsgs: " + this._msgQueue.peek().toString());
            }
            return this._msgQueue.peek().isSendAttemptedOnce() || this._msgQueue.peek().getSendCount() > 0;
        }
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("hasRetransmitMsgs: empty queue");
        }
        return false;
    }

    public void waitUntilQueueEmpty() throws InterruptedException {
        this.validateIsInitialized();
        this._msgQueue.waitUntilEmpty();
    }

    public int getQueueUsedSize() {
        this.validateIsInitialized();
        return this._msgQueue.size();
    }

    public static PubADManager getNewADManager(JCSMPXMLMessageProducer producer, JCSMPProperties props, ContextImpl context, ProducerFlowProperties fprop) {
        PubADManager set = new PubADManager(producer, context);
        Integer iPubAckTime = fprop != null && fprop.getPubAckTime() != null ? fprop.getPubAckTime() : props.getIntegerProperty("pub_ack_time");
        Integer iPubAckWindowSz = fprop.getWindowSize();
        Integer iMaxResends = props.getIntegerProperty("max_resends");
        assert (iPubAckTime != null && iPubAckWindowSz != null && iMaxResends != null) : "Property error: missing windowed ack property.";
        set.pub_Ack_Time = iPubAckTime;
        set.configured_Pub_Ack_Window_Size = iPubAckWindowSz;
        set.rtr_Windowed_Ack = fprop.isRtrWindowedAck();
        set.max_Resends = iMaxResends;
        set.ack_Event_Mode = fprop.getAckEventMode();
        return set;
    }

    public void handleRollback(long msgID) {
        block2: {
            this.setLastMessageIdSent(msgID);
            this.setLastMessageIdAcked(msgID);
            try {
                this.processWindowedAck(msgID);
            }
            catch (JCSMPException e) {
                if (!this.Trace.isDebugEnabled()) break block2;
                this.Trace.debug("got exception: ", e);
            }
        }
    }

    @Override
    public String toString() {
        String s = String.format("Pub_Ack_Time=%s  Pub_Ack_Window_Size=%s  Max_Resends=%s Ack_Event_Mode=%s", this.pub_Ack_Time, this.pub_Ack_Window_Size, this.max_Resends, this.ack_Event_Mode);
        return s;
    }

    public static class ReturnUnackedMgsToPoolTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;

        public ReturnUnackedMgsToPoolTask(ConditionalBoundedMessageQueue queue) {
            this.queue = queue;
        }

        @Override
        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        @Override
        public int process() throws JCSMPException {
            int count = 0;
            while (!this.queue.isEmpty()) {
                JCSMPXMLMessage msg = this.queue.poll();
                if (msg.isRetransmitting()) {
                    msg.setSafeToRelease(true);
                } else {
                    msg.callout_ad_release_opportunity();
                }
                ++count;
            }
            return count;
        }
    }

    public static class ProcessTransactedWindowedAckErrorTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;
        private long msgIdToAck = -1L;

        public ProcessTransactedWindowedAckErrorTask(ConditionalBoundedMessageQueue queue, long msgIdToAck) {
            this.queue = queue;
            this.msgIdToAck = msgIdToAck;
        }

        @Override
        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        @Override
        public int process() throws JCSMPException {
            JCSMPXMLMessage msg;
            long curMsgId;
            int count = 0;
            Iterator<JCSMPXMLMessage> iter = this.queue.iterator();
            while (iter.hasNext() && (curMsgId = (msg = iter.next()).getMessageIdLong()) != -1L && curMsgId <= this.msgIdToAck) {
                iter.remove();
                if (!msg.isRetransmitting()) {
                    msg.setSafeToRelease(true);
                    msg.callout_ad_release_opportunity();
                } else {
                    msg.setSafeToRelease(true);
                }
                ++count;
            }
            return count;
        }
    }

    public class ProcessWindowedAckErrorTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;
        private JCSMPXMLMessageProducer producer;
        private JCSMPStreamingPublishEventHandler spHandler;
        private long msgIdToAck = -1L;
        private JCSMPException e;

        public ProcessWindowedAckErrorTask(ConditionalBoundedMessageQueue queue, JCSMPXMLMessageProducer producer, JCSMPStreamingPublishEventHandler spHandler, long msgIdToAck, JCSMPException e) {
            this.queue = queue;
            this.producer = producer;
            this.spHandler = spHandler;
            this.msgIdToAck = msgIdToAck;
            this.e = e;
        }

        @Override
        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        @Override
        public int process() throws JCSMPException {
            JCSMPXMLMessage msg = this.queue.peek();
            long curMsgId = -1L;
            if (msg != null && (curMsgId = msg.getMessageIdLong()) != -1L) {
                if (curMsgId == this.msgIdToAck) {
                    Object corrKey = msg.getCorrelationKey();
                    this.queue.remove(msg);
                    this.producer.getSessionStats().incStat(StatType.RELIABLE_MSGS_SENT_CONFIRMED);
                    if (PubADManager.this.Trace.isDebugEnabled()) {
                        PubADManager.this.Trace.debug(String.format("Windowed error for: msg=%s", curMsgId));
                    }
                    if (!msg.isRetransmitting()) {
                        msg.setSafeToRelease(true);
                        msg.callout_ad_release_opportunity();
                    } else {
                        msg.setSafeToRelease(true);
                    }
                    if (this.spHandler != null) {
                        if (!msg.isTransacted()) {
                            PubADManager.this.context.getProducerDispatcher().enqueueNotification(new ProducerErrorNotification(this.spHandler, new MsgIdInfo(this.msgIdToAck, corrKey), this.e, System.currentTimeMillis(), this.producer, true));
                        }
                    } else {
                        throw this.e;
                    }
                }
                return 1;
            }
            return 0;
        }
    }

    public class ProcessWindowedAckTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;
        private JCSMPXMLMessageProducer producer;
        private JCSMPStreamingPublishEventHandler spHandler;
        private long msgIdToAck = -1L;
        private String ackEventMode;

        public ProcessWindowedAckTask(ConditionalBoundedMessageQueue queue, JCSMPXMLMessageProducer producer, JCSMPStreamingPublishEventHandler spHandler, long msgIdToAck, String ackEventMode) {
            this.queue = queue;
            this.producer = producer;
            this.spHandler = spHandler;
            this.msgIdToAck = msgIdToAck;
            this.ackEventMode = ackEventMode;
        }

        @Override
        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        @Override
        public int process() throws JCSMPException {
            JCSMPXMLMessage msg;
            long curMsgId;
            int count = 0;
            if (PubADManager.this.Trace.isDebugEnabled()) {
                PubADManager.this.Trace.debug(String.format("Windowed ack for: msg=%s", this.msgIdToAck));
            }
            ProducerResponseNotification notif = new ProducerResponseNotification(this.spHandler, this.producer);
            Iterator<JCSMPXMLMessage> iter = this.queue.iterator();
            while (iter.hasNext() && (curMsgId = (msg = iter.next()).getMessageIdLong()) != -1L && curMsgId <= this.msgIdToAck) {
                iter.remove();
                if (this.spHandler != null && !msg.isTransacted()) {
                    if (this.ackEventMode.equals("SUPPORTED_ACK_EVENT_MODE_WINDOWED")) {
                        if (curMsgId == this.msgIdToAck) {
                            notif.addMsgInfo(new MsgIdInfo(curMsgId, msg.getCorrelationKey()));
                        }
                    } else {
                        notif.addMsgInfo(new MsgIdInfo(curMsgId, msg.getCorrelationKey()));
                    }
                }
                if (!msg.isRetransmitting()) {
                    msg.setSafeToRelease(true);
                    msg.callout_ad_release_opportunity();
                } else {
                    msg.setSafeToRelease(true);
                }
                ++count;
            }
            if (this.spHandler != null && this.producer.getTransactedSession() == null) {
                PubADManager.this.context.getProducerDispatcher().enqueueNotification(notif);
            }
            this.producer.getSessionStats().incStat(StatType.RELIABLE_MSGS_SENT_CONFIRMED, count);
            return count;
        }
    }

    protected class StateRetransmittingAll
    extends StateRetransmittingBase {
        public StateRetransmittingAll(PubADManager parent) {
            super(parent);
        }

        @Override
        protected void enter(StateChangeInput input) throws JCSMPException {
            if (!this.shouldEnterRetransmit()) {
                return;
            }
            this.remainingWindow = this.getAdMgr().pub_Ack_Window_Size;
            if (input instanceof InputAckTimeout) {
                this.scheduleRetransmitTask();
            } else if (input instanceof InputSessionReconnect) {
                this.scheduleRetransmitTask();
            } else {
                throw new InvalidOperationException("state change not supported: enter " + this.toString() + " with " + input.toString());
            }
        }

        @Override
        protected void handleClientAck(long lastAckedMsgId, int count, boolean retransmitRequest) throws JCSMPException {
            if (retransmitRequest) {
                this.transitionToRetransmitState(new InputRetransmitRequest());
            } else if (lastAckedMsgId >= this.exitRetransmitId) {
                block6: {
                    try {
                        this.handleRetransmitDone();
                    }
                    catch (JCSMPException e) {
                        if (!PubADManager.this.Trace.isDebugEnabled()) break block6;
                        PubADManager.this.Trace.debug("setLastIdSent exception", e);
                    }
                }
                return;
            }
            this.remainingWindow += count;
            if (this.remainingWindow <= count) {
                this.scheduleRetransmitTask();
            }
        }

        @Override
        protected void handlePubFlowResumed() throws JCSMPException {
            this.transitionToRetransmitState(new InputSessionReconnect());
        }

        @Override
        protected void setLastIdSent(long id) {
            block3: {
                super.setLastIdSent(id);
                if (this.lastIdSent >= this.exitRetransmitId) {
                    try {
                        this.handleRetransmitDone();
                    }
                    catch (JCSMPException e) {
                        if (!PubADManager.this.Trace.isDebugEnabled()) break block3;
                        PubADManager.this.Trace.debug("setLastIdSent exception: ", e);
                    }
                }
            }
        }

        public String toString() {
            return "RetransmittingAll";
        }
    }

    protected class StateRetransmitting
    extends StateRetransmittingBase {
        protected long enterRetransmitId;

        public StateRetransmitting(PubADManager parent) {
            super(parent);
            this.enterRetransmitId = 0L;
        }

        @Override
        protected void enter(StateChangeInput input) throws JCSMPException {
            if (!this.shouldEnterRetransmit()) {
                return;
            }
            this.remainingWindow = 1;
            if (input instanceof InputAckTimeout) {
                this.enterRetransmitId = 0L;
                if (this.lastIdSent >= this.exitRetransmitId) {
                    this.lastIdSent = this.exitRetransmitId - 1L;
                }
                this.scheduleRetransmitTask();
            } else if (input instanceof InputRetransmitRequest) {
                this.lastIdSent = this.enterRetransmitId = this.getAdMgr().getLastMessageIdAcked();
                this.scheduleRetransmitTask();
            } else {
                throw new InvalidOperationException("state change not supported: enter " + this.toString() + " with " + input.toString());
            }
        }

        @Override
        protected void handleClientAck(long lastAckedMsgId, int count, boolean retransmitRequest) throws JCSMPException {
            if (retransmitRequest) {
                if (lastAckedMsgId > this.enterRetransmitId) {
                    this.transitionToRetransmitState(new InputRetransmitRequest());
                    return;
                }
                count = 1;
            } else if (lastAckedMsgId >= this.exitRetransmitId) {
                block7: {
                    try {
                        this.handleRetransmitDone();
                    }
                    catch (JCSMPException e) {
                        if (!PubADManager.this.Trace.isDebugEnabled()) break block7;
                        PubADManager.this.Trace.debug("setLastIdSent exception", e);
                    }
                }
                return;
            }
            this.remainingWindow += count;
            if (this.lastIdSent >= this.exitRetransmitId && this.remainingWindow > 0) {
                this.getMessageProducer().resume();
                return;
            }
            this.scheduleRetransmitTask();
        }

        @Override
        protected boolean isAckImmediatelyRequired() {
            return true;
        }

        @Override
        protected void setLastIdSent(long id) {
            super.setLastIdSent(id);
            if (this.lastIdSent >= this.exitRetransmitId && this.remainingWindow > 0) {
                this.getMessageProducer().resume();
            }
        }

        @Override
        protected void handleMsgEnqueued() {
            --this.remainingWindow;
            if (this.remainingWindow <= 0) {
                this.remainingWindow = 0;
                this.getMessageProducer().suspend();
            }
        }

        public String toString() {
            return "Retransmitting";
        }
    }

    protected abstract class StateRetransmittingBase
    extends PubState {
        protected int remainingWindow;
        protected long lastIdSent;
        protected long exitRetransmitId;

        protected StateRetransmittingBase(PubADManager parent) {
            super(parent);
            this.remainingWindow = 0;
            this.lastIdSent = 0L;
            this.exitRetransmitId = 0L;
        }

        protected boolean shouldEnterRetransmit() throws JCSMPException {
            this.getMessageProducer().suspend();
            BaseTransactedSessionImpl transactedSession = this.getMessageProducer().getTransactedSession();
            if (transactedSession != null) {
                transactedSession.notifyPreRetransmit();
            }
            this.exitRetransmitId = this.getAdMgr().getExitRetransmitId();
            this.lastIdSent = this.getAdMgr().getLastTransportAcked();
            if (this.exitRetransmitId <= this.lastIdSent) {
                this.handleRetransmitDone();
                return false;
            }
            return true;
        }

        @Override
        protected void handleTimeout() throws JCSMPException {
            this.transitionToRetransmitState(new InputAckTimeout());
        }

        @Override
        protected void handleRetransmitDone() throws JCSMPException {
            this.getAdMgr().updateState(this.getAdMgr().STATE_SENDING, new InputRetransmitDone());
        }

        @Override
        protected void handlePubFlowResumed() throws JCSMPException {
            this.transitionToRetransmitState(new InputSessionReconnect());
        }

        @Override
        protected JCSMPXMLMessage getNextADMsgForRetransmit() {
            if (PubADManager.this.Trace.isDebugEnabled()) {
                PubADManager.this.Trace.debug(this.toString() + " getNextADMsgForRetransmit: window=" + this.remainingWindow + "; lastSent=" + this.getLastIdSent());
            }
            if (this.remainingWindow > 0) {
                JCSMPXMLMessage msg = this.getAdMgr()._msgQueue.getNextUnackedADMsgForRetransmit(this.getLastIdSent());
                if (msg != null && this.isAckImmediatelyRequired() && !msg.isAckImmediately()) {
                    msg.clearReadOnly();
                    msg.setAckImmediately(true);
                }
                return msg;
            }
            return null;
        }

        protected long getLastIdSent() {
            return this.lastIdSent;
        }

        @Override
        protected void setLastIdSent(long id) {
            this.lastIdSent = id;
            --this.remainingWindow;
            if (this.remainingWindow < 0) {
                this.remainingWindow = 0;
            }
        }

        protected void scheduleRetransmitTask() {
            this.getMessageProducer().scheduleRetransmitTask(this.lastIdSent, this.remainingWindow);
        }
    }

    protected class StateSending
    extends PubState {
        public StateSending(PubADManager parent) {
            super(parent);
        }

        @Override
        protected void enter(StateChangeInput input) throws JCSMPException {
            if (input instanceof InputRetransmitDone) {
                this.getMessageProducer().resume();
                BaseTransactedSessionImpl transactedSession = this.getMessageProducer().getTransactedSession();
                if (transactedSession != null) {
                    transactedSession.notifyPostRetransmit();
                }
            } else if (!(input instanceof InputFlowOpened)) {
                throw new InvalidOperationException("state change not supported: enter " + this.toString() + " with " + input.toString());
            }
        }

        @Override
        protected void handleTimeout() throws JCSMPException {
            if (this.getAdMgr().isTransactedSessionAndMarkedAsRollback()) {
                if (PubADManager.this.Trace.isDebugEnabled()) {
                    PubADManager.this.Trace.debug("no retransmit when the transaction is marked as rollback");
                }
            } else {
                this.transitionToRetransmitState(new InputAckTimeout());
            }
        }

        @Override
        protected void handleClientAck(long lastAckedMsgId, int count, boolean retransmitRequest) throws JCSMPException {
            if (retransmitRequest) {
                this.transitionToRetransmitState(new InputRetransmitRequest());
            }
        }

        @Override
        protected void handlePubFlowResumed() throws JCSMPException {
            this.transitionToRetransmitState(new InputSessionReconnect());
        }

        public String toString() {
            return "Sending";
        }
    }

    protected class StateClosed
    extends PubState {
        public StateClosed(PubADManager parent) {
            super(parent);
        }

        public String toString() {
            return "Closed";
        }
    }

    protected class StateUnbound
    extends PubState {
        public StateUnbound(PubADManager parent) {
            super(parent);
        }

        @Override
        protected void handleFlowOpenResponse() throws JCSMPException {
            this.getAdMgr().updateState(this.getAdMgr().STATE_SENDING, new InputFlowOpened());
        }

        public String toString() {
            return "Unbound";
        }
    }

    protected abstract class PubState {
        private PubADManager adMgr;

        protected PubState(PubADManager parent) {
            this.adMgr = parent;
        }

        protected PubADManager getAdMgr() {
            return this.adMgr;
        }

        protected JCSMPXMLMessageProducer getMessageProducer() {
            return this.adMgr.getMessageProducer();
        }

        protected void enter(StateChangeInput input) throws JCSMPException {
        }

        protected void handleTimeout() throws JCSMPException {
        }

        protected void handleRetransmitDone() throws JCSMPException {
        }

        protected void handlePubFlowResumed() throws JCSMPException {
        }

        protected void handlePreReconnect() {
            this.getAdMgr().cancelPendingResendTask();
        }

        protected void handleClientAck(long lastAckedMsgId, int count, boolean retransmitRequest) throws JCSMPException {
        }

        protected void notifyProducerClosed() {
            block2: {
                try {
                    this.getAdMgr().updateState(this.getAdMgr().STATE_CLOSED, null);
                }
                catch (JCSMPException e) {
                    if (!PubADManager.this.Trace.isDebugEnabled()) break block2;
                    PubADManager.this.Trace.debug("got exception: " + e.toString());
                }
            }
        }

        protected void handleFlowOpenResponse() throws JCSMPException {
        }

        protected boolean isAckImmediatelyRequired() {
            return false;
        }

        protected void handleMsgEnqueued() {
        }

        protected void setLastIdSent(long id) {
        }

        protected JCSMPXMLMessage getNextADMsgForRetransmit() {
            return null;
        }

        protected void transitionToRetransmitState(StateChangeInput input) throws InvalidOperationException {
            block8: {
                PubState targetState;
                if (input instanceof InputAckTimeout) {
                    targetState = this.getMessageProducer().getAdCtrlVersion() < 4 ? this.getAdMgr().STATE_RETRANSMITTING_ALL : this.getAdMgr().STATE_RETRANSMITTING;
                } else if (input instanceof InputRetransmitRequest) {
                    targetState = this.getAdMgr().STATE_RETRANSMITTING;
                } else if (input instanceof InputSessionReconnect) {
                    targetState = this.getAdMgr().STATE_RETRANSMITTING_ALL;
                } else {
                    throw new InvalidOperationException("Invalid input for retransmit state change: " + this.toString() + " with " + input.toString());
                }
                try {
                    this.getAdMgr().updateState(targetState, input);
                }
                catch (JCSMPException e) {
                    if (!PubADManager.this.Trace.isDebugEnabled()) break block8;
                    PubADManager.this.Trace.debug("transitionToRetransmitState exception", e);
                }
            }
        }
    }

    class InputRetransmitRequest
    implements StateChangeInput {
        InputRetransmitRequest() {
        }

        public String toString() {
            return "input: RetransmitRequest";
        }
    }

    class InputFlowOpened
    implements StateChangeInput {
        protected InputFlowOpened() {
        }

        public String toString() {
            return "input: FlowOpened";
        }
    }

    class InputSessionReconnect
    implements StateChangeInput {
        protected InputSessionReconnect() {
        }

        public String toString() {
            return "input: SessionReconnect";
        }
    }

    class InputAckTimeout
    implements StateChangeInput {
        protected InputAckTimeout() {
        }

        public String toString() {
            return "input: AckTimeout";
        }
    }

    class InputRetransmitDone
    implements StateChangeInput {
        protected InputRetransmitDone() {
        }

        public String toString() {
            return "input: RetransmitDone";
        }
    }

    static interface StateChangeInput {
    }
}

