/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v1_0;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Modified;
import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.protocol.v1_0.QueueDestination;
import org.apache.qpid.server.protocol.v1_0.SendingLink_1_0;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.UnsettledAction;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;

class Subscription_1_0
implements Subscription {
    private SendingLink_1_0 _link;
    private AMQQueue _queue;
    private final AtomicReference<Subscription.State> _state = new AtomicReference<Subscription.State>(Subscription.State.SUSPENDED);
    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
    private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
    private final long _id;
    private final boolean _acquires;
    private volatile AMQQueue.Context _queueContext;
    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
    private ReentrantLock _stateChangeLock = new ReentrantLock();
    private boolean _noLocal;
    private FilterManager _filters;
    private long _deliveryTag = 0L;
    private Subscription.StateListener _stateListener;
    private Binary _transactionId;
    private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer().registerTransactionLayer().registerSecurityLayer();
    private SectionEncoder _sectionEncoder = new SectionEncoderImpl(this._typeRegistry);

    public Subscription_1_0(SendingLink_1_0 link, QueueDestination destination) {
        this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY);
    }

    public Subscription_1_0(SendingLink_1_0 link, QueueDestination destination, boolean acquires) {
        this._link = link;
        this._queue = destination.getQueue();
        this._id = this.getEndpoint().getLocalHandle().longValue();
        this._acquires = acquires;
    }

    private SendingLinkEndpoint getEndpoint() {
        return this._link.getEndpoint();
    }

    public LogActor getLogActor() {
        return null;
    }

    public boolean isTransient() {
        return true;
    }

    public AMQQueue getQueue() {
        return this._queue;
    }

    public QueueEntry.SubscriptionAcquiredState getOwningState() {
        return this._owningState;
    }

    public QueueEntry.SubscriptionAssignedState getAssignedState() {
        return this._assignedState;
    }

    public void setQueue(AMQQueue queue, boolean exclusive) {
    }

    public void setNoLocal(boolean noLocal) {
        this._noLocal = noLocal;
    }

    public boolean isNoLocal() {
        return this._noLocal;
    }

    public long getSubscriptionID() {
        return this._id;
    }

    public boolean isSuspended() {
        return !this.isActive();
    }

    public boolean hasInterest(QueueEntry entry) {
        return (!this._noLocal || !(entry.getMessage() instanceof Message_1_0) || ((Message_1_0)entry.getMessage()).getSession() != this.getSession()) && this.checkFilters(entry);
    }

    private boolean checkFilters(QueueEntry entry) {
        return this._filters == null || this._filters.allAllow(entry);
    }

    public boolean isClosed() {
        return !this.getEndpoint().isAttached();
    }

    public boolean acquires() {
        return this._acquires;
    }

    public boolean seesRequeues() {
        return this.acquires();
    }

    public void close() {
        this.getEndpoint().detach();
    }

    public void send(QueueEntry entry, boolean batch) throws AMQException {
        this.send(entry);
    }

    public void flushBatched() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(final QueueEntry queueEntry) throws AMQException {
        ServerMessage serverMessage = queueEntry.getMessage();
        if (serverMessage instanceof Message_1_0) {
            ByteBuffer payload;
            Message_1_0 message = (Message_1_0)serverMessage;
            Transfer transfer = new Transfer();
            List<ByteBuffer> fragments = message.getFragments();
            if (fragments.size() == 1) {
                payload = fragments.get(0);
            } else {
                int size = 0;
                for (ByteBuffer fragment : fragments) {
                    size += fragment.remaining();
                }
                payload = ByteBuffer.allocate(size);
                for (ByteBuffer fragment : fragments) {
                    payload.put(fragment.duplicate());
                }
                payload.flip();
            }
            if (queueEntry.getDeliveryCount() != 0) {
                payload = payload.duplicate();
                ValueHandler valueHandler = new ValueHandler((DescribedTypeConstructorRegistry)this._typeRegistry);
                Header oldHeader = null;
                try {
                    ByteBuffer encodedBuf = payload.duplicate();
                    Object value = valueHandler.parse(payload);
                    if (value instanceof Header) {
                        oldHeader = (Header)value;
                    } else {
                        payload.position(0);
                    }
                }
                catch (AmqpErrorException e) {
                    throw new RuntimeException(e);
                }
                Header header = new Header();
                if (oldHeader != null) {
                    header.setDurable(oldHeader.getDurable());
                    header.setPriority(oldHeader.getPriority());
                    header.setTtl(oldHeader.getTtl());
                }
                header.setDeliveryCount(UnsignedInteger.valueOf((int)queueEntry.getDeliveryCount()));
                this._sectionEncoder.reset();
                this._sectionEncoder.encodeObject((Object)header);
                Binary encodedHeader = this._sectionEncoder.getEncoding();
                ByteBuffer oldPayload = payload;
                payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
                payload.put(encodedHeader.getArray(), encodedHeader.getArrayOffset(), encodedHeader.getLength());
                payload.put(oldPayload);
                payload.flip();
            }
            transfer.setPayload(payload);
            byte[] data = new byte[8];
            ByteBuffer.wrap(data).putLong(this._deliveryTag++);
            final Binary tag = new Binary(data);
            transfer.setDeliveryTag(tag);
            Object object = this._link.getLock();
            synchronized (object) {
                if (this._link.isAttached()) {
                    ServerTransaction txn;
                    if (SenderSettleMode.SETTLED.equals(this.getEndpoint().getSendingSettlementMode())) {
                        transfer.setSettled(Boolean.valueOf(true));
                    } else {
                        UnsettledAction action = this._acquires ? new DispositionAction(tag, queueEntry) : new DoNothingAction(tag, queueEntry);
                        this._link.addUnsettled(tag, action, queueEntry);
                    }
                    if (this._transactionId != null) {
                        TransactionalState state = new TransactionalState();
                        state.setTxnId(this._transactionId);
                        transfer.setState((DeliveryState)state);
                    }
                    if (this._acquires && this._transactionId != null && (txn = this._link.getTransaction(this._transactionId)) != null) {
                        txn.addPostTransactionAction(new ServerTransaction.Action(){

                            public void postCommit() {
                            }

                            public void onRollback() {
                                if (queueEntry.isAcquiredBy(Subscription_1_0.this)) {
                                    queueEntry.release();
                                    Subscription_1_0.this._link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
                                }
                            }
                        });
                    }
                    this.getEndpoint().transfer(transfer);
                } else {
                    queueEntry.release();
                }
            }
        }
    }

    public void queueDeleted(AMQQueue queue) {
        this.getEndpoint().setSource(null);
        this.getEndpoint().detach();
    }

    public synchronized boolean wouldSuspend(QueueEntry msg) {
        boolean hasCredit;
        boolean bl = hasCredit = this._link.isAttached() && this.getEndpoint().hasCreditToSend();
        if (!hasCredit && this.getState() == Subscription.State.ACTIVE) {
            this.suspend();
        }
        return !hasCredit;
    }

    public boolean trySendLock() {
        return this._stateChangeLock.tryLock();
    }

    public synchronized void suspend() {
        if (this._state.compareAndSet(Subscription.State.ACTIVE, Subscription.State.SUSPENDED)) {
            this._stateListener.stateChange(this, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
        }
    }

    public void getSendLock() {
        this._stateChangeLock.lock();
    }

    public void releaseSendLock() {
        this._stateChangeLock.unlock();
    }

    public void releaseQueueEntry(QueueEntry queueEntryImpl) {
    }

    public void onDequeue(QueueEntry queueEntry) {
    }

    public void restoreCredit(QueueEntry queueEntry) {
    }

    public void setStateListener(Subscription.StateListener listener) {
        this._stateListener = listener;
    }

    public Subscription.State getState() {
        return this._state.get();
    }

    public AMQQueue.Context getQueueContext() {
        return this._queueContext;
    }

    public void setQueueContext(AMQQueue.Context queueContext) {
        this._queueContext = queueContext;
    }

    public boolean isActive() {
        return this.getState() == Subscription.State.ACTIVE;
    }

    public void set(String key, Object value) {
        this._properties.put(key, value);
    }

    public Object get(String key) {
        return this._properties.get(key);
    }

    public boolean isSessionTransactional() {
        return false;
    }

    public synchronized void queueEmpty() {
        if (this._link.drained() && this._state.compareAndSet(Subscription.State.ACTIVE, Subscription.State.SUSPENDED)) {
            this._stateListener.stateChange(this, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
        }
    }

    public synchronized void flowStateChanged() {
        if (this.isSuspended() && this.getEndpoint() != null) {
            if (this._state.compareAndSet(Subscription.State.SUSPENDED, Subscription.State.ACTIVE)) {
                this._stateListener.stateChange(this, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
            }
            this._transactionId = this._link.getTransactionId();
        }
    }

    public Session_1_0 getSession() {
        return this._link.getSession();
    }

    public FilterManager getFilters() {
        return this._filters;
    }

    public void setFilters(FilterManager filters) {
        this._filters = filters;
    }

    public AMQSessionModel getSessionModel() {
        return this.getSession();
    }

    public long getBytesOut() {
        return 0L;
    }

    public long getMessagesOut() {
        return 0L;
    }

    public long getUnacknowledgedBytes() {
        return 0L;
    }

    public long getUnacknowledgedMessages() {
        return 0L;
    }

    public String getConsumerName() {
        return "TODO";
    }

    private class DoNothingAction
    implements UnsettledAction {
        public DoNothingAction(Binary tag, QueueEntry queueEntry) {
        }

        public boolean process(DeliveryState state, Boolean settled) {
            Binary transactionId = null;
            Outcome outcome = null;
            if (state instanceof TransactionalState) {
                transactionId = ((TransactionalState)state).getTxnId();
                outcome = ((TransactionalState)state).getOutcome();
            } else if (state instanceof Outcome) {
                outcome = (Outcome)state;
            }
            return true;
        }
    }

    private class DispositionAction
    implements UnsettledAction {
        private final QueueEntry _queueEntry;
        private final Binary _deliveryTag;

        public DispositionAction(Binary tag, QueueEntry queueEntry) {
            this._deliveryTag = tag;
            this._queueEntry = queueEntry;
        }

        public boolean process(DeliveryState state, final Boolean settled) {
            Outcome outcome;
            Binary transactionId = null;
            if (state instanceof TransactionalState) {
                transactionId = ((TransactionalState)state).getTxnId();
                outcome = ((TransactionalState)state).getOutcome();
            } else {
                outcome = state instanceof Outcome ? (Outcome)state : null;
            }
            ServerTransaction txn = Subscription_1_0.this._link.getTransaction(transactionId);
            if (outcome instanceof Accepted) {
                txn.dequeue(this._queueEntry.getQueue(), this._queueEntry.getMessage(), new ServerTransaction.Action(){

                    public void postCommit() {
                        if (DispositionAction.this._queueEntry.isAcquiredBy(Subscription_1_0.this)) {
                            DispositionAction.this._queueEntry.discard();
                        }
                    }

                    public void onRollback() {
                    }
                });
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        Subscription_1_0.this._link.getEndpoint().updateDisposition(DispositionAction.this._deliveryTag, (DeliveryState)outcome, true);
                        Subscription_1_0.this._link.getEndpoint().sendFlowConditional();
                    }

                    public void onRollback() {
                        if (Boolean.TRUE.equals(settled)) {
                            Modified modified = new Modified();
                            modified.setDeliveryFailed(Boolean.valueOf(true));
                            Subscription_1_0.this._link.getEndpoint().updateDisposition(DispositionAction.this._deliveryTag, (DeliveryState)modified, true);
                            Subscription_1_0.this._link.getEndpoint().sendFlowConditional();
                        }
                    }
                });
            } else if (outcome instanceof Released) {
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        DispositionAction.this._queueEntry.release();
                        Subscription_1_0.this._link.getEndpoint().settle(DispositionAction.this._deliveryTag);
                    }

                    public void onRollback() {
                        Subscription_1_0.this._link.getEndpoint().settle(DispositionAction.this._deliveryTag);
                    }
                });
            } else if (outcome instanceof Modified) {
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        DispositionAction.this._queueEntry.release();
                        if (Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed())) {
                            DispositionAction.this._queueEntry.incrementDeliveryCount();
                        }
                        Subscription_1_0.this._link.getEndpoint().settle(DispositionAction.this._deliveryTag);
                    }

                    public void onRollback() {
                        if (Boolean.TRUE.equals(settled)) {
                            Modified modified = new Modified();
                            modified.setDeliveryFailed(Boolean.valueOf(true));
                            Subscription_1_0.this._link.getEndpoint().updateDisposition(DispositionAction.this._deliveryTag, (DeliveryState)modified, true);
                            Subscription_1_0.this._link.getEndpoint().sendFlowConditional();
                        }
                    }
                });
            }
            return transactionId == null && outcome != null;
        }
    }
}

