/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationCommandProcessor;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationTarget;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpTransferTagGenerator;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPSessionContext
extends ProtonInitializable {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionCallback sessionSPI;
    protected final Session session;
    protected Map<Receiver, ProtonAbstractReceiver> receivers = new ConcurrentHashMap<Receiver, ProtonAbstractReceiver>();
    protected Map<Sender, ProtonServerSenderContext> senders = new ConcurrentHashMap<Sender, ProtonServerSenderContext>();
    protected boolean closed = false;
    protected final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
    protected final ActiveMQServer server;
    protected Map<Object, ProtonServerSenderContext> serverSenders = new ConcurrentHashMap<Object, ProtonServerSenderContext>();

    public AMQPSessionContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, Session session, ActiveMQServer server) {
        this.connection = connection;
        this.sessionSPI = sessionSPI;
        this.session = session;
        this.server = server;
    }

    public AMQPSessionCallback getSessionSPI() {
        return this.sessionSPI;
    }

    public AMQPConnectionContext getAMQPConnectionContext() {
        return this.connection;
    }

    public Session getSession() {
        return this.session;
    }

    public ActiveMQServer getServer() {
        return this.server;
    }

    @Override
    public void initialize() throws Exception {
        if (!this.isInitialized()) {
            this.initialized = true;
            if (this.sessionSPI != null) {
                try {
                    this.sessionSPI.init(this, this.connection.getSASLResult());
                }
                catch (ActiveMQSecurityException e) {
                    throw e;
                }
                catch (Exception e) {
                    throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
                }
            }
        }
    }

    public void disconnect(Object consumer, String queueName) {
        ProtonServerSenderContext protonConsumer = this.senders.remove(consumer);
        if (protonConsumer != null) {
            this.serverSenders.remove(protonConsumer.getBrokerConsumer());
            try {
                protonConsumer.close(false);
            }
            catch (ActiveMQAMQPException e) {
                protonConsumer.getSender().setTarget(null);
                protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
            }
        }
    }

    public byte[] getTag() {
        return this.tagCache.getNextTag();
    }

    public void replaceTag(byte[] tag) {
        this.tagCache.returnTag(tag);
    }

    public void close() {
        if (this.closed) {
            return;
        }
        HashSet<ProtonAbstractReceiver> receiversCopy = new HashSet<ProtonAbstractReceiver>();
        receiversCopy.addAll(this.receivers.values());
        for (ProtonAbstractReceiver protonAbstractReceiver : receiversCopy) {
            try {
                protonAbstractReceiver.close(false);
            }
            catch (Exception e) {
                logger.warn(e.getMessage(), (Throwable)e);
            }
        }
        this.receivers.clear();
        HashSet<ProtonServerSenderContext> protonSendersClone = new HashSet<ProtonServerSenderContext>();
        protonSendersClone.addAll(this.senders.values());
        for (ProtonServerSenderContext protonConsumer : protonSendersClone) {
            try {
                protonConsumer.close(false);
            }
            catch (Exception e) {
                logger.warn(e.getMessage(), (Throwable)e);
            }
        }
        this.senders.clear();
        this.serverSenders.clear();
        try {
            if (this.sessionSPI != null) {
                this.sessionSPI.close();
            }
        }
        catch (Exception exception) {
            logger.warn(exception.getMessage(), (Throwable)exception);
        }
        this.closed = true;
    }

    public void removeReceiver(Receiver receiver) {
        this.receivers.remove(receiver);
    }

    public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
        ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(this.sessionSPI, this.connection);
        coordinator.setCapabilities(new Symbol[]{Symbol.getSymbol((String)"amqp:local-transactions"), Symbol.getSymbol((String)"amqp:multi-txns-per-ssn"), Symbol.getSymbol((String)"amqp:multi-ssns-per-txn")});
        receiver.setContext((Object)transactionHandler);
        this.connection.runNow(() -> {
            receiver.open();
            receiver.flow(this.connection.getAmqpCredits());
            this.connection.flush();
        });
    }

    public void addSender(Sender sender) throws Exception {
        this.addSender(sender, (SenderController)null);
    }

    public void addSender(Sender sender, SenderController senderController) throws Exception {
        boolean outgoing = sender.getContext() != null && sender.getContext().equals(true);
        ProtonServerSenderContext protonSender = outgoing ? new ProtonClientSenderContext(this.connection, sender, this, this.sessionSPI) : new ProtonServerSenderContext(this.connection, sender, this, this.sessionSPI, senderController);
        this.addSender(sender, protonSender);
    }

    public void addSender(Sender sender, ProtonServerSenderContext protonSender) throws Exception {
        try {
            protonSender.initialize();
            this.senders.put(sender, protonSender);
            this.serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
            sender.setContext((Object)protonSender);
            if (sender.getLocalState() != EndpointState.ACTIVE) {
                this.connection.runNow(() -> {
                    sender.open();
                    this.connection.flush();
                });
            }
            protonSender.start();
        }
        catch (ActiveMQAMQPException e) {
            this.senders.remove(sender);
            if (protonSender.getBrokerConsumer() != null) {
                this.serverSenders.remove(protonSender.getBrokerConsumer());
            }
            sender.setSource(null);
            sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
            this.connection.runNow(() -> {
                sender.close();
                this.connection.flush();
            });
        }
    }

    public void removeSender(Sender sender) throws ActiveMQAMQPException {
        ProtonServerSenderContext senderRemoved = this.senders.remove(sender);
        if (senderRemoved != null) {
            this.serverSenders.remove(senderRemoved.getBrokerConsumer());
        }
    }

    public void addReplicaTarget(Receiver receiver) throws Exception {
        this.addReceiver(receiver, (r, s) -> {
            AMQPMirrorControllerTarget protonReceiver = new AMQPMirrorControllerTarget(this.sessionSPI, this.connection, this, receiver, this.server);
            HashMap<Symbol, String> brokerIDProperties = new HashMap<Symbol, String>();
            brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, this.server.getNodeID().toString());
            receiver.setProperties(brokerIDProperties);
            return protonReceiver;
        });
    }

    public void addFederationCommandProcessor(Receiver receiver) throws Exception {
        this.addReceiver(receiver, (r, s) -> {
            Connection protonConnection = receiver.getSession().getConnection();
            Record attachments = protonConnection.attachments();
            try {
                if (attachments.get((Object)"FEDERATION_INSTANCE_RECORD", AMQPFederation.class) != null) {
                    throw new ActiveMQAMQPIllegalStateException("Unexpected federation instance found on connection when creating control link processor");
                }
                Map federationConfigurationMap = receiver.getRemoteProperties() == null || !receiver.getRemoteProperties().containsKey(AMQPFederationConstants.FEDERATION_CONFIGURATION) ? Collections.EMPTY_MAP : (Map)receiver.getRemoteProperties().get(AMQPFederationConstants.FEDERATION_CONFIGURATION);
                AMQPFederationConfiguration configuration = new AMQPFederationConfiguration(this.connection, federationConfigurationMap);
                AMQPFederationTarget federation = new AMQPFederationTarget(receiver.getName(), configuration, this, this.server);
                federation.start();
                AMQPFederationCommandProcessor commandProcessor = new AMQPFederationCommandProcessor(federation, this.sessionSPI.getAMQPSessionContext(), receiver);
                attachments.set((Object)"FEDERATION_INSTANCE_RECORD", AMQPFederationTarget.class, (Object)federation);
                return commandProcessor;
            }
            catch (ActiveMQException e) {
                ActiveMQAMQPException cause = e instanceof ActiveMQAMQPException ? (ActiveMQAMQPException)e : new ActiveMQAMQPInternalErrorException(e.getMessage());
                throw new RuntimeException(e.getMessage(), (Throwable)((Object)cause));
            }
        });
    }

    public void addReceiver(Receiver receiver) throws Exception {
        this.addReceiver(receiver, (r, s) -> new ProtonServerReceiverContext(this.sessionSPI, this.connection, this, receiver));
    }

    public <T extends ProtonAbstractReceiver> T addReceiver(Receiver receiver, BiFunction<AMQPSessionContext, Receiver, T> receiverBuilder) throws Exception {
        try {
            ProtonAbstractReceiver protonReceiver;
            try {
                protonReceiver = (ProtonAbstractReceiver)receiverBuilder.apply(this, receiver);
            }
            catch (RuntimeException e) {
                if (e.getCause() instanceof ActiveMQAMQPException) {
                    throw (ActiveMQAMQPException)((Object)e.getCause());
                }
                if (e.getCause() != null) {
                    throw new ActiveMQAMQPInternalErrorException(e.getCause().getMessage(), e.getCause());
                }
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }
            protonReceiver.initialize();
            this.receivers.put(receiver, protonReceiver);
            this.sessionSPI.addProducer(receiver.getName(), receiver.getTarget().getAddress());
            receiver.setContext((Object)protonReceiver);
            this.connection.runNow(() -> {
                receiver.open();
                this.connection.flush();
            });
            return (T)protonReceiver;
        }
        catch (ActiveMQAMQPException e) {
            this.receivers.remove(receiver);
            receiver.setTarget(null);
            receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
            this.connection.runNow(() -> {
                receiver.close();
                this.connection.flush();
            });
            return null;
        }
    }

    public int getReceiverCount() {
        return this.receivers.size();
    }

    public Map<Receiver, ProtonAbstractReceiver> getReceivers() {
        return this.receivers;
    }

    public int getSenderCount() {
        return this.senders.size();
    }
}

