/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConsumerConfiguration;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;

public abstract class AMQPFederationConsumer
implements FederationConsumerInternal {
    protected static final Symbol[] OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
    protected static final Modified DEFAULT_OUTCOME = new Modified();
    protected final AMQPFederation federation;
    protected final AMQPFederationConsumerConfiguration configuration;
    protected final FederationConsumerInfo consumerInfo;
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionContext session;
    protected final Predicate<Link> remoteCloseInterceptor = this::remoteLinkClosedInterceptor;
    protected final BiConsumer<FederationConsumerInfo, Message> messageObserver;
    protected final AtomicLong messageCount = new AtomicLong();
    protected Receiver protonReceiver;
    protected boolean started;
    protected volatile boolean closed;
    protected Consumer<FederationConsumerInternal> remoteCloseHandler;

    public AMQPFederationConsumer(AMQPFederation federation, AMQPFederationConsumerConfiguration configuration, AMQPSessionContext session, FederationConsumerInfo consumerInfo, BiConsumer<FederationConsumerInfo, Message> messageObserver) {
        this.federation = federation;
        this.consumerInfo = consumerInfo;
        this.connection = session.getAMQPConnectionContext();
        this.session = session;
        this.configuration = configuration;
        this.messageObserver = messageObserver;
    }

    public final long getMessagesReceived() {
        return this.messageCount.get();
    }

    @Override
    public final AMQPFederation getFederation() {
        return this.federation;
    }

    @Override
    public final FederationConsumerInfo getConsumerInfo() {
        return this.consumerInfo;
    }

    @Override
    public final synchronized void start() {
        if (!this.started && !this.closed) {
            this.started = true;
            this.asyncCreateReceiver();
        }
    }

    @Override
    public final synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.started) {
                this.started = false;
                this.asyncCloseReceiver();
            }
        }
    }

    @Override
    public final AMQPFederationConsumer setRemoteClosedHandler(Consumer<FederationConsumerInternal> handler) {
        if (this.started) {
            throw new IllegalStateException("Cannot set a remote close handler after the consumer is started");
        }
        this.remoteCloseHandler = handler;
        return this;
    }

    protected final boolean remoteLinkClosedInterceptor(Link link) {
        if (link == this.protonReceiver && link.getRemoteCondition() != null && link.getRemoteCondition().getCondition() != null) {
            Symbol errorCondition = link.getRemoteCondition().getCondition();
            if (AmqpSupport.RESOURCE_DELETED.equals(errorCondition)) {
                return true;
            }
            if (AmqpSupport.NOT_FOUND.equals(errorCondition)) {
                return true;
            }
            if (AmqpSupport.DETACH_FORCED.equals(errorCondition)) {
                return true;
            }
        }
        return false;
    }

    protected final void recordFederatedMessageReceived(Message message) {
        this.messageCount.incrementAndGet();
        if (this.messageObserver != null) {
            this.messageObserver.accept(this.consumerInfo, message);
        }
    }

    protected abstract void asyncCreateReceiver();

    protected abstract void asyncCloseReceiver();

    static {
        DEFAULT_OUTCOME.setDeliveryFailed(Boolean.valueOf(true));
    }
}

