/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.transport;

import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.NotAssociatedDtxException;
import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlow;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageSetFlowMode;
import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ServerSession
extends Session
implements AuthorizationHolder,
SessionConfig,
AMQSessionModel,
LogSubject,
AsyncAutoCommitTransaction.FutureRecorder {
    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
    private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
    private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 0x40000000;
    private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
    private final UUID _id;
    private ConnectionConfig _connectionConfig;
    private long _createTime = System.currentTimeMillis();
    private LogActor _actor = GenericActor.getInstance(this);
    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet());
    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private ChannelLogSubject _logSubject;
    private final AtomicInteger _outstandingCredit = new AtomicInteger(-1);
    private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
    private ServerTransaction _transaction;
    private final AtomicLong _txnStarts = new AtomicLong(0L);
    private final AtomicLong _txnCommits = new AtomicLong(0L);
    private final AtomicLong _txnRejects = new AtomicLong(0L);
    private final AtomicLong _txnCount = new AtomicLong(0L);
    private final AtomicLong _txnUpdateTime = new AtomicLong(0L);
    private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
    private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList();

    ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) {
        this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
    }

    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) {
        super(connection, delegate, name, expiry);
        this._connectionConfig = connConfig;
        this._transaction = new AsyncAutoCommitTransaction(this.getMessageStore(), this);
        this._logSubject = new ChannelLogSubject(this);
        this._id = this.getConfigStore().createId();
        this.getConfigStore().addConfiguredObject(this);
    }

    protected void setState(Session.State state) {
        super.setState(state);
        if (state == Session.State.OPEN) {
            this._actor.message(ChannelMessages.CREATE());
            if (this._blocking.get()) {
                this.invokeBlock();
            }
        }
    }

    private void invokeBlock() {
        this.invoke((Method)new MessageSetFlowMode("", MessageFlowMode.CREDIT, new Option[0]));
        this.invoke((Method)new MessageStop("", new Option[0]));
    }

    private ConfigStore getConfigStore() {
        return this.getConnectionConfig().getConfigStore();
    }

    protected boolean isFull(int id) {
        return this.isCommandsFull(id);
    }

    public void enqueue(ServerMessage message, List<? extends BaseQueue> queues) {
        if (this._outstandingCredit.get() != -1 && this._outstandingCredit.decrementAndGet() == 0x3FFFFFFF) {
            this._outstandingCredit.addAndGet(0x40000000);
            this.invoke((Method)new MessageFlow("", MessageCreditUnit.MESSAGE, 0x40000000L, new Option[0]));
        }
        this.getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
        PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, this.isTransactional());
        this._transaction.enqueue(queues, message, postTransactionAction, 0L);
        this.incrementOutstandingTxnsIfNecessary();
        this.updateTransactionalActivity();
    }

    public void sendMessage(MessageTransfer xfr, Runnable postIdSettingAction) {
        this.getConnectionModel().registerMessageDelivered(xfr.getBodySize());
        this.invoke((Method)xfr, postIdSettingAction);
    }

    public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) {
        this._messageDispositionListenerMap.put(xfr.getId(), acceptListener);
    }

    public void accept(RangeSet ranges) {
        this.dispositionChange(ranges, new MessageDispositionAction(){

            public void performAction(MessageDispositionChangeListener listener) {
                listener.onAccept();
            }
        });
    }

    public void release(RangeSet ranges, final boolean setRedelivered) {
        this.dispositionChange(ranges, new MessageDispositionAction(){

            public void performAction(MessageDispositionChangeListener listener) {
                listener.onRelease(setRedelivered);
            }
        });
    }

    public void reject(RangeSet ranges) {
        this.dispositionChange(ranges, new MessageDispositionAction(){

            public void performAction(MessageDispositionChangeListener listener) {
                listener.onReject();
            }
        });
    }

    public RangeSet acquire(RangeSet transfers) {
        RangeSet acquired = RangeSetFactory.createRangeSet();
        if (!this._messageDispositionListenerMap.isEmpty()) {
            Iterator<Integer> unacceptedMessages = this._messageDispositionListenerMap.keySet().iterator();
            Iterator rangeIter = transfers.iterator();
            if (rangeIter.hasNext()) {
                Range range = (Range)rangeIter.next();
                while (range != null && unacceptedMessages.hasNext()) {
                    MessageDispositionChangeListener changeListener;
                    int next = unacceptedMessages.next();
                    while (Serial.gt((int)next, (int)range.getUpper())) {
                        if (rangeIter.hasNext()) {
                            range = (Range)rangeIter.next();
                            continue;
                        }
                        range = null;
                        break;
                    }
                    if (range == null || !range.includes(next) || (changeListener = (MessageDispositionChangeListener)this._messageDispositionListenerMap.get(next)) == null || !changeListener.acquire()) continue;
                    acquired.add(next);
                }
            }
        }
        return acquired;
    }

    public void dispositionChange(RangeSet ranges, MessageDispositionAction action) {
        block5: {
            block6: {
                if (ranges == null) break block5;
                if (ranges.size() != 1) break block6;
                Range r = ranges.getFirst();
                for (int i = r.getLower(); i <= r.getUpper(); ++i) {
                    MessageDispositionChangeListener changeListener = (MessageDispositionChangeListener)this._messageDispositionListenerMap.remove(i);
                    if (changeListener == null) continue;
                    action.performAction(changeListener);
                }
                break block5;
            }
            if (this._messageDispositionListenerMap.isEmpty()) break block5;
            Iterator<Integer> unacceptedMessages = this._messageDispositionListenerMap.keySet().iterator();
            Iterator rangeIter = ranges.iterator();
            if (rangeIter.hasNext()) {
                Range range = (Range)rangeIter.next();
                while (range != null && unacceptedMessages.hasNext()) {
                    int next = unacceptedMessages.next();
                    while (Serial.gt((int)next, (int)range.getUpper())) {
                        if (rangeIter.hasNext()) {
                            range = (Range)rangeIter.next();
                            continue;
                        }
                        range = null;
                        break;
                    }
                    if (range == null || !range.includes(next)) continue;
                    MessageDispositionChangeListener changeListener = (MessageDispositionChangeListener)this._messageDispositionListenerMap.remove(next);
                    action.performAction(changeListener);
                }
            }
        }
    }

    public void removeDispositionListener(Method method) {
        this._messageDispositionListenerMap.remove(method.getId());
    }

    public void onClose() {
        if (this._transaction instanceof LocalTransaction) {
            this._transaction.rollback();
        } else if (this._transaction instanceof DistributedTransaction) {
            this.getVirtualHost().getDtxRegistry().endAssociations(this);
        }
        for (MessageDispositionChangeListener listener : this._messageDispositionListenerMap.values()) {
            listener.onRelease(true);
        }
        this._messageDispositionListenerMap.clear();
        this.getConfigStore().removeConfiguredObject(this);
        for (Task task : this._taskList) {
            task.doTask(this);
        }
        CurrentActor.get().message(this.getLogSubject(), ChannelMessages.CLOSE());
    }

    protected void awaitClose() {
    }

    public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) {
        this._transaction.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action(){

            public void postCommit() {
                sub.acknowledge(entry);
            }

            public void onRollback() {
                entry.setRedelivered();
                entry.release();
            }
        });
        this.updateTransactionalActivity();
    }

    public Collection<Subscription_0_10> getSubscriptions() {
        return this._subscriptions.values();
    }

    public void register(String destination, Subscription_0_10 sub) {
        this._subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub);
    }

    public Subscription_0_10 getSubscription(String destination) {
        return this._subscriptions.get(destination == null ? NULL_DESTINTATION : destination);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregister(Subscription_0_10 sub) {
        this._subscriptions.remove(sub.getName());
        try {
            sub.getSendLock();
            AMQQueue queue = sub.getQueue();
            if (queue != null) {
                queue.unregisterSubscription(sub);
            }
        }
        catch (AMQException e) {
            _logger.error("Failed to unregister subscription :" + e.getMessage(), (Throwable)e);
        }
        finally {
            sub.releaseSendLock();
        }
    }

    @Override
    public boolean isTransactional() {
        return this._transaction.isTransactional();
    }

    public boolean inTransaction() {
        return this.isTransactional() && this._txnUpdateTime.get() > 0L && this._transaction.getTransactionStartTime() > 0L;
    }

    public void selectTx() {
        this._transaction = new LocalTransaction(this.getMessageStore());
        this._txnStarts.incrementAndGet();
    }

    public void selectDtx() {
        this._transaction = new DistributedTransaction(this, this.getMessageStore(), this.getVirtualHost());
    }

    public void startDtx(Xid xid, boolean join, boolean resume) throws JoinAndResumeDtxException, UnknownDtxBranchException, AlreadyKnownDtxException, DtxNotSelectedException {
        DistributedTransaction distributedTransaction = this.assertDtxTransaction();
        distributedTransaction.start(xid, join, resume);
    }

    public void endDtx(Xid xid, boolean fail, boolean suspend) throws NotAssociatedDtxException, UnknownDtxBranchException, DtxNotSelectedException, SuspendAndFailDtxException, TimeoutDtxException {
        DistributedTransaction distributedTransaction = this.assertDtxTransaction();
        distributedTransaction.end(xid, fail, suspend);
    }

    public long getTimeoutDtx(Xid xid) throws UnknownDtxBranchException {
        return this.getVirtualHost().getDtxRegistry().getTimeout(xid);
    }

    public void setTimeoutDtx(Xid xid, long timeout) throws UnknownDtxBranchException {
        this.getVirtualHost().getDtxRegistry().setTimeout(xid, timeout);
    }

    public void prepareDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException {
        this.getVirtualHost().getDtxRegistry().prepare(xid);
    }

    public void commitDtx(Xid xid, boolean onePhase) throws UnknownDtxBranchException, IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException {
        this.getVirtualHost().getDtxRegistry().commit(xid, onePhase);
    }

    public void rollbackDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException, AMQStoreException, TimeoutDtxException {
        this.getVirtualHost().getDtxRegistry().rollback(xid);
    }

    public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException {
        this.getVirtualHost().getDtxRegistry().forget(xid);
    }

    public List<Xid> recoverDtx() {
        return this.getVirtualHost().getDtxRegistry().recover();
    }

    private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException {
        if (this._transaction instanceof DistributedTransaction) {
            return (DistributedTransaction)this._transaction;
        }
        throw new DtxNotSelectedException();
    }

    public void commit() {
        this._transaction.commit();
        this._txnCommits.incrementAndGet();
        this._txnStarts.incrementAndGet();
        this.decrementOutstandingTxnsIfNecessary();
    }

    public void rollback() {
        this._transaction.rollback();
        this._txnRejects.incrementAndGet();
        this._txnStarts.incrementAndGet();
        this.decrementOutstandingTxnsIfNecessary();
    }

    private void incrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(0L, 1L);
        }
    }

    private void decrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(1L, 0L);
        }
    }

    public void updateTransactionalActivity() {
        if (this.isTransactional()) {
            this._txnUpdateTime.set(System.currentTimeMillis());
        }
    }

    @Override
    public Long getTxnStarts() {
        return this._txnStarts.get();
    }

    @Override
    public Long getTxnCommits() {
        return this._txnCommits.get();
    }

    @Override
    public Long getTxnRejects() {
        return this._txnRejects.get();
    }

    @Override
    public int getChannelId() {
        return this.getChannel();
    }

    @Override
    public Long getTxnCount() {
        return this._txnCount.get();
    }

    @Override
    public Long getTxnStart() {
        return this._txnStarts.get();
    }

    @Override
    public Principal getAuthorizedPrincipal() {
        return this.getConnection().getAuthorizedPrincipal();
    }

    @Override
    public Subject getAuthorizedSubject() {
        return this.getConnection().getAuthorizedSubject();
    }

    public void addSessionCloseTask(Task task) {
        this._taskList.add(task);
    }

    public void removeSessionCloseTask(Task task) {
        this._taskList.remove(task);
    }

    public Object getReference() {
        return this.getConnection().getReference();
    }

    public MessageStore getMessageStore() {
        return this.getVirtualHost().getMessageStore();
    }

    @Override
    public VirtualHost getVirtualHost() {
        return (VirtualHost)this._connectionConfig.getVirtualHost();
    }

    @Override
    public UUID getQMFId() {
        return this._id;
    }

    @Override
    public SessionConfigType getConfigType() {
        return SessionConfigType.getInstance();
    }

    @Override
    public ConfiguredObject getParent() {
        return this.getVirtualHost();
    }

    @Override
    public boolean isDurable() {
        return false;
    }

    @Override
    public boolean isAttached() {
        return true;
    }

    @Override
    public long getDetachedLifespan() {
        return 0L;
    }

    @Override
    public Long getExpiryTime() {
        return null;
    }

    @Override
    public Long getMaxClientRate() {
        return null;
    }

    @Override
    public ConnectionConfig getConnectionConfig() {
        return this._connectionConfig;
    }

    @Override
    public String getSessionName() {
        return this.getName().toString();
    }

    @Override
    public long getCreateTime() {
        return this._createTime;
    }

    @Override
    public void mgmtClose() {
        this.close();
    }

    @Override
    public AMQConnectionModel getConnectionModel() {
        return this.getConnection();
    }

    @Override
    public String getClientID() {
        return this.getConnection().getClientId();
    }

    public ServerConnection getConnection() {
        return (ServerConnection)super.getConnection();
    }

    public LogActor getLogActor() {
        return this._actor;
    }

    @Override
    public LogSubject getLogSubject() {
        return this;
    }

    @Override
    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException {
        if (this.inTransaction()) {
            long currentTime = System.currentTimeMillis();
            long openTime = currentTime - this._transaction.getTransactionStartTime();
            long idleTime = currentTime - this._txnUpdateTime.get();
            if (idleWarn > 0L && idleTime > idleWarn) {
                CurrentActor.get().message(this.getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
                _logger.warn("IDLE TRANSACTION ALERT " + this.getLogSubject().toString() + " " + idleTime + " ms");
            } else if (openWarn > 0L && openTime > openWarn) {
                CurrentActor.get().message(this.getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
                _logger.warn("OPEN TRANSACTION ALERT " + this.getLogSubject().toString() + " " + openTime + " ms");
            }
            if (idleClose > 0L && idleTime > idleClose) {
                this.getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
            } else if (openClose > 0L && openTime > openClose) {
                this.getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
            }
        }
    }

    @Override
    public void block(AMQQueue queue) {
        this.block(queue, queue.getName());
    }

    @Override
    public void block() {
        this.block(this, "** All Queues **");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void block(Object queue, String name) {
        Set<Object> set = this._blockingEntities;
        synchronized (set) {
            if (this._blockingEntities.add(queue) && this._blocking.compareAndSet(false, true)) {
                if (this.getState() == Session.State.OPEN) {
                    this.invokeBlock();
                }
                this._actor.message(this._logSubject, ChannelMessages.FLOW_ENFORCED(name));
            }
        }
    }

    @Override
    public void unblock(AMQQueue queue) {
        this.unblock((Object)queue);
    }

    @Override
    public void unblock() {
        this.unblock(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unblock(Object queue) {
        Set<Object> set = this._blockingEntities;
        synchronized (set) {
            if (this._blockingEntities.remove(queue) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false) && !this.isClosing()) {
                this._actor.message(this._logSubject, ChannelMessages.FLOW_REMOVED());
                MessageFlow mf = new MessageFlow();
                mf.setUnit(MessageCreditUnit.MESSAGE);
                mf.setDestination("");
                this._outstandingCredit.set(Integer.MAX_VALUE);
                mf.setValue(Integer.MAX_VALUE);
                this.invoke((Method)mf);
            }
        }
    }

    @Override
    public boolean onSameConnection(InboundMessage inbound) {
        return inbound instanceof MessageTransferMessage && ((MessageTransferMessage)inbound).getConnectionReference() == this.getConnection().getReference() || inbound instanceof MessageMetaData_0_10 && ((MessageMetaData_0_10)inbound).getConnectionReference() == this.getConnection().getReference();
    }

    @Override
    public String toLogString() {
        long connectionId = super.getConnection() instanceof ServerConnection ? this.getConnection().getConnectionId() : -1L;
        String remoteAddress = this._connectionConfig instanceof ProtocolEngine ? ((ProtocolEngine)this._connectionConfig).getRemoteAddress().toString() : "";
        return "[" + MessageFormat.format("con:{0}({1}@{2}/{3})/ch:{4}", connectionId, this.getClientID(), remoteAddress, this.getVirtualHost().getName(), this.getChannel()) + "] ";
    }

    @Override
    public void close() {
        this.unregisterSubscriptions();
        super.close();
    }

    void unregisterSubscriptions() {
        Collection<Subscription_0_10> subscriptions = this.getSubscriptions();
        for (Subscription_0_10 subscription_0_10 : subscriptions) {
            this.unregister(subscription_0_10);
        }
    }

    void stopSubscriptions() {
        Collection<Subscription_0_10> subscriptions = this.getSubscriptions();
        for (Subscription_0_10 subscription_0_10 : subscriptions) {
            subscription_0_10.stop();
        }
    }

    public void receivedComplete() {
        Collection<Subscription_0_10> subscriptions = this.getSubscriptions();
        for (Subscription_0_10 subscription_0_10 : subscriptions) {
            subscription_0_10.flushCreditState(false);
        }
        this.awaitCommandCompletion();
    }

    @Override
    public int getUnacknowledgedMessageCount() {
        return this._messageDispositionListenerMap.size();
    }

    @Override
    public boolean getBlocking() {
        return this._blocking.get();
    }

    public void completeAsyncCommands() {
        AsyncCommand cmd;
        while ((cmd = this._unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) {
            cmd.complete();
            this._unfinishedCommandsQueue.poll();
        }
        while (this._unfinishedCommandsQueue.size() > 500) {
            cmd = this._unfinishedCommandsQueue.poll();
            cmd.awaitReadyForCompletion();
            cmd.complete();
        }
    }

    public void awaitCommandCompletion() {
        AsyncCommand cmd;
        while ((cmd = this._unfinishedCommandsQueue.poll()) != null) {
            cmd.awaitReadyForCompletion();
            cmd.complete();
        }
    }

    public Object getAsyncCommandMark() {
        return this._unfinishedCommandsQueue.isEmpty() ? null : this._unfinishedCommandsQueue.getLast();
    }

    @Override
    public void recordFuture(StoreFuture future, ServerTransaction.Action action) {
        this._unfinishedCommandsQueue.add(new AsyncCommand(future, action));
    }

    protected void setClose(boolean close) {
        super.setClose(close);
    }

    @Override
    public int compareTo(AMQSessionModel session) {
        return this.getQMFId().compareTo(session.getQMFId());
    }

    @Override
    public int getConsumerCount() {
        return this._subscriptions.values().size();
    }

    private static class AsyncCommand {
        private final StoreFuture _future;
        private ServerTransaction.Action _action;

        public AsyncCommand(StoreFuture future, ServerTransaction.Action action) {
            this._future = future;
            this._action = action;
        }

        void awaitReadyForCompletion() {
            this._future.waitForCompletion();
        }

        void complete() {
            if (!this._future.isComplete()) {
                this._future.waitForCompletion();
            }
            this._action.postCommit();
            this._action = null;
        }

        boolean isReadyForCompletion() {
            return this._future.isComplete();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class PostEnqueueAction
    implements ServerTransaction.Action {
        private List<? extends BaseQueue> _queues;
        private ServerMessage _message;
        private final boolean _transactional;

        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, boolean transactional) {
            this._transactional = transactional;
            this.setState(queues, message);
        }

        public void setState(List<? extends BaseQueue> queues, ServerMessage message) {
            this._message = message;
            this._queues = queues;
        }

        @Override
        public void postCommit() {
            MessageReference ref = this._message.newReference();
            for (int i = 0; i < this._queues.size(); ++i) {
                try {
                    BaseQueue queue = this._queues.get(i);
                    queue.enqueue(this._message, this._transactional, null);
                    if (!(queue instanceof AMQQueue)) continue;
                    ((AMQQueue)queue).checkCapacity(ServerSession.this);
                    continue;
                }
                catch (AMQException e) {
                    throw new RuntimeException(e);
                }
            }
            ref.release();
        }

        @Override
        public void onRollback() {
        }
    }

    private static interface MessageDispositionAction {
        public void performAction(MessageDispositionChangeListener var1);
    }

    public static interface Task {
        public void doTask(ServerSession var1);
    }

    public static interface MessageDispositionChangeListener {
        public void onAccept();

        public void onRelease(boolean var1);

        public void onReject();

        public boolean acquire();
    }
}

