/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationEventSupport;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Terminus;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPFederationEventDispatcher
implements SenderController,
ActiveMQServerBindingPlugin,
ActiveMQServerAddressPlugin {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final Sender sender;
    private final AMQPFederation federation;
    private final AMQPSessionCallback session;
    private final ActiveMQServer server;
    private final Set<String> addressWatches = new HashSet<String>();
    private final Set<String> queueWatches = new HashSet<String>();
    private String eventsAddress;

    public AMQPFederationEventDispatcher(AMQPFederation federation, AMQPSessionCallback session, Sender sender) {
        this.session = session;
        this.sender = sender;
        this.federation = federation;
        this.server = federation.getServer();
    }

    private String getEventsLinkAddress() {
        return this.eventsAddress;
    }

    public void sendEvent(AMQPMessage event) throws Exception {
        Objects.requireNonNull(event, "Null event message is not expected and constitutes an error condition");
        event.setAddress(this.getEventsLinkAddress());
        this.server.getPostOffice().route((Message)event, true);
    }

    @Override
    public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
        Connection protonConnection = senderContext.getSender().getSession().getConnection();
        Record attachments = protonConnection.attachments();
        AMQPFederation federation = (AMQPFederation)attachments.get((Object)"FEDERATION_INSTANCE_RECORD", AMQPFederation.class);
        if (federation == null) {
            throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
        }
        this.sender.setSenderSettleMode(this.sender.getRemoteSenderSettleMode());
        this.sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        this.eventsAddress = federation.prefixEventsLinkQueueName(this.sender.getName());
        if (this.sender.getLocalState() != EndpointState.ACTIVE) {
            this.sender.setOfferedCapabilities(new Symbol[]{AMQPFederationConstants.FEDERATION_EVENT_LINK});
            Terminus remoteTerminus = (Terminus)this.sender.getRemoteSource();
            if (remoteTerminus == null || !remoteTerminus.getDynamic()) {
                throw new ActiveMQAMQPInternalErrorException("Remote Terminus did not arrive as dynamic node: " + remoteTerminus);
            }
            remoteTerminus.setAddress(this.getEventsLinkAddress());
        }
        try {
            this.session.createTemporaryQueue(SimpleString.of((String)this.getEventsLinkAddress()), RoutingType.ANYCAST, 1);
        }
        catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
        }
        federation.registerEventSender(this);
        this.server.registerBrokerPlugin((ActiveMQServerBasePlugin)this);
        return this.session.createSender(senderContext, SimpleString.of((String)this.getEventsLinkAddress()), null, false);
    }

    @Override
    public void close() {
        this.server.unRegisterBrokerPlugin((ActiveMQServerBasePlugin)this);
        try {
            this.session.removeTemporaryQueue(SimpleString.of((String)this.getEventsLinkAddress()));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void close(ErrorCondition error) {
        this.close();
    }

    public void addAddressWatch(String addressName) {
        this.addressWatches.add(addressName);
    }

    public void addQueueWatch(String queueName) {
        this.queueWatches.add(queueName);
    }

    public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
        String addressName = addressInfo.getName().toString();
        this.federation.getConnectionContext().runLater(() -> {
            if (this.addressWatches.remove(addressName)) {
                try {
                    this.sendEvent(AMQPFederationEventSupport.encodeAddressAddedEvent(addressName));
                }
                catch (Exception e) {
                    logger.warn("error on send of address added event: {}", (Object)e.getMessage());
                    this.federation.signalError((Exception)((Object)new ActiveMQAMQPInternalErrorException("Error while processing address added: " + e.getMessage())));
                }
            }
        });
    }

    public void afterAddBinding(Binding binding) throws ActiveMQException {
        if (binding instanceof QueueBinding) {
            String addressName = ((QueueBinding)binding).getAddress().toString();
            String queueName = ((QueueBinding)binding).getQueue().getName().toString();
            this.federation.getConnectionContext().runLater(() -> {
                if (this.queueWatches.remove(queueName)) {
                    try {
                        this.sendEvent(AMQPFederationEventSupport.encodeQueueAddedEvent(addressName, queueName));
                    }
                    catch (Exception e) {
                        logger.warn("Error on send of queue added event: {}", (Object)e.getMessage());
                        this.federation.signalError((Exception)((Object)new ActiveMQAMQPInternalErrorException("Error while processing queue added: " + e.getMessage())));
                    }
                }
            });
        }
    }
}

