/*
 * Decompiled with CFR 0.152.
 */
package com.solace.messaging.receiver;

import com.solace.messaging.DirectMessagePublisherBuilder;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.DirectMessagePublisher;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.receiver.DirectMessageReceiverImpl;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.RequestReplyMessageReceiver;
import com.solace.messaging.resources.ReceiverInfo;
import com.solace.messaging.resources.ShareName;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.resources.TopicSubscription;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.SolaceMessageUtil;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicStampedReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
public class RequestReplyMessageReceiverImpl
implements RequestReplyMessageReceiver {
    private static final Log logger = LogFactory.getLog(RequestReplyMessageReceiverImpl.class);
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTING = 1;
    static final int STATE_STARTED = 2;
    static final int STATE_TERMINATING = 3;
    static final int STATE_TERMINATED = 4;
    final AtomicStampedReference<CompletableFuture> stateHolder = new AtomicStampedReference<Object>(null, 0);
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "RequestReplyMessageReceiver@" + this.id;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0L);
    private final MessagingServiceInternalView serviceInternalView;
    private final DirectMessageReceiverImpl delegateReceiver;
    private final DirectMessagePublisher delegatePublisher;
    private final MessageCorrelationKeyProvider messageKeyProvider;
    private final ReceiverInfo receiverInfo = new RequestReplyReceiverInfoImpl();
    private LifecycleControl.TerminationNotificationListener applicationTerminationNotificationListener;

    public RequestReplyMessageReceiverImpl(MessagingServiceInternalView serviceInternalView, TypedProperties receiverConfiguration, DirectMessagePublisherBuilder replierBuilder, List<TopicSubscription> topicSubscriptions) {
        this(serviceInternalView, receiverConfiguration, replierBuilder, topicSubscriptions, ShareName.ShareNameImpl.noOp());
    }

    public RequestReplyMessageReceiverImpl(MessagingServiceInternalView serviceInternalView, TypedProperties receiverConfiguration, DirectMessagePublisherBuilder replierBuilder, List<TopicSubscription> topicSubscriptions, ShareName shareName) {
        this.serviceInternalView = serviceInternalView;
        this.delegateReceiver = new DirectMessageReceiverImpl(serviceInternalView, receiverConfiguration, topicSubscriptions, shareName);
        this.delegatePublisher = replierBuilder.build();
        this.messageKeyProvider = new MessageCorrelationKeyProvider();
        LifecycleControl.TerminationNotificationListener internalTerminationNotificationListener = terminationEvent -> {
            if (this.applicationTerminationNotificationListener != null) {
                try {
                    this.applicationTerminationNotificationListener.onTermination(terminationEvent);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.terminateOnUnsolicitedInterruption();
        };
        this.delegateReceiver.setTerminationNotificationListener(internalTerminationNotificationListener);
    }

    @Override
    public void setReceiveFailureListener(MessageReceiver.ReceiveFailureListener receiveFailureListener) {
        this.delegateReceiver.setReceiveFailureListener(receiveFailureListener);
    }

    @Override
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener listener) {
        this.applicationTerminationNotificationListener = listener;
    }

    @Override
    public void setReplyFailureListener(RequestReplyMessageReceiver.ReplyFailureListener listener) {
        if (listener == null) {
            this.delegatePublisher.setPublishFailureListener(null);
            return;
        }
        this.delegatePublisher.setPublishFailureListener(failedPublishEvent -> {
            block3: {
                CorrelationContext correlationContext;
                Object userContext;
                Object correlationKey;
                InboundMessage requestMessage = null;
                OutboundMessage responseMessage = failedPublishEvent.getMessage();
                if (responseMessage != null && (correlationKey = responseMessage.getCorrelationKey()) != null && correlationKey instanceof CorrelationContext && (userContext = (correlationContext = (CorrelationContext)correlationKey).getUserContext()) != null && userContext instanceof InboundMessage) {
                    requestMessage = (InboundMessage)userContext;
                }
                RequestReplyMessageReceiver.FailedReplyEvent failedReplyEvent = new RequestReplyMessageReceiver.FailedReplyEvent(requestMessage, failedPublishEvent.getMessage(), failedPublishEvent.getDestination(), failedPublishEvent.getException(), failedPublishEvent.getTimeStamp());
                try {
                    listener.onFailedReply(failedReplyEvent);
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block3;
                    logger.warn((Object)"Application code threw an unhandled exception while processing FailedReplyEvent in ReplyFailureListener", (Throwable)e);
                }
            }
        });
    }

    @Override
    public boolean isRunning() {
        return 2 == this.stateHolder.getStamp();
    }

    @Override
    public boolean isTerminated() {
        return 4 == this.stateHolder.getStamp();
    }

    @Override
    public boolean isTerminating() {
        return 3 == this.stateHolder.getStamp();
    }

    @Override
    public RequestReplyMessageReceiver start() throws PubSubPlusClientException {
        try {
            this.startAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " failed to start"), (Throwable)e);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        }
        catch (CancellationException e) {
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        }
        return this;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public <RequestReplyMessageReceiver> CompletableFuture<RequestReplyMessageReceiver> startAsync() {
        int state = this.stateHolder.getStamp();
        if (3 == state || 4 == state) {
            throw new IllegalStateException("Message receiver is already terminated");
        }
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Message receiver can't be started when service is not connected");
        }
        block6: while (true) {
            if (!this.serviceInternalView.isConnected()) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver can't be started when service is not connected"));
            }
            int[] currentStateHolder = new int[1];
            CompletableFuture currentFuture = this.stateHolder.get(currentStateHolder);
            int currentState = currentStateHolder[0];
            switch (currentState) {
                case 0: {
                    ExtendedCompletableFuture<RequestReplyMessageReceiverImpl> starting;
                    boolean stateChanged;
                    if (!(stateChanged = this.stateHolder.compareAndSet(null, starting = new ExtendedCompletableFuture<RequestReplyMessageReceiverImpl>(), 0, 1))) continue block6;
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)(this.instanceName + " is being started"));
                    }
                    try {
                        boolean startingToStarted = this.stateHolder.compareAndSet(starting, starting, 1, 2);
                        if (!startingToStarted) {
                            int newCurrentState = this.stateHolder.getStamp();
                            if (newCurrentState >= 3) {
                                this.onTerminate();
                                starting.completeExceptionally(new CancellationException("Starting of message receiver was interrupted"));
                                return starting;
                            }
                        } else {
                            this.delegatePublisher.start();
                            this.delegateReceiver.start();
                        }
                        starting.complete(this);
                        if (!logger.isDebugEnabled()) return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate();
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                        logger.debug((Object)(this.instanceName + " is started"));
                        return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate();
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                    }
                    catch (Exception e) {
                        this.stateHolder.set(null, 4);
                        this.onTerminate();
                        starting.completeExceptionally(PubSubPlusClientException.of(e));
                        if (!logger.isErrorEnabled()) return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate();
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                        logger.error((Object)(this.instanceName + " failed to start and is terminating"), (Throwable)e);
                    }
                    return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                        this.stateHolder.set(null, 4);
                        this.onTerminate();
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(this.instanceName + " async start was canceled"));
                        }
                    });
                }
                case 1: 
                case 2: {
                    return currentFuture;
                }
            }
            break;
        }
        return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver is already terminated"));
    }

    @Override
    public <RequestReplyMessageReceiver> void startAsync(CompletionListener<RequestReplyMessageReceiver> startListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(startListener, "Start listener can't be null");
        CompletableFuture<RequestReplyMessageReceiver> onceStarted = this.startAsync();
        onceStarted.whenComplete((receiver, throwable) -> {
            block2: {
                try {
                    startListener.onCompletion(receiver, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block2;
                    logger.warn((Object)"Application code throw an unhandled exception by processing async start completion notification", (Throwable)e);
                }
            }
        });
    }

    @Override
    public void terminate(long gracePeriod) throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        block7: {
            if (gracePeriod == 0L) {
                this.terminateNow();
                return;
            }
            try {
                this.terminateAsync(gracePeriod).get();
            }
            catch (ExecutionException e) {
                Throwable t = e.getCause();
                if (t != null) {
                    if (t instanceof PubSubPlusClientException) {
                        throw (PubSubPlusClientException)t;
                    }
                    throw new PubSubPlusClientException(t);
                }
                throw new PubSubPlusClientException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PubSubPlusClientException.RequestInterruptedException("Message receiver termination was interrupted", e);
            }
            catch (Exception e) {
                if (!logger.isWarnEnabled()) break block7;
                logger.warn((Object)(this.instanceName + " encountered problem during termination."), (Throwable)e);
            }
        }
    }

    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        this.stateHolder.set(null, 4);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is being non gracefully terminated"));
        }
        int discardMessageCount = 0;
        PubSubPlusClientException.IncompleteMessageDeliveryException incompleteMessageDeliveryException = null;
        try {
            this.delegateReceiver.terminateNow();
        }
        catch (PubSubPlusClientException.IncompleteMessageDeliveryException e) {
            discardMessageCount += e.getMessageCount();
            incompleteMessageDeliveryException = e;
        }
        try {
            this.delegatePublisher.terminate(0L);
        }
        catch (PubSubPlusClientException.IncompleteMessageDeliveryException e) {
            discardMessageCount += e.getMessageCount();
            incompleteMessageDeliveryException = e;
        }
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is terminated"));
        }
        if (incompleteMessageDeliveryException != null) {
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %s messages could not be completed due to non graceful termination", discardMessageCount), discardMessageCount);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public CompletableFuture<Void> terminateAsync(long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, gracePeriod, "Grace period < 1");
        block8: while (true) {
            currentStateHolder = new int[1];
            currentFuture = this.stateHolder.get(currentStateHolder);
            currentState = currentStateHolder[0];
            switch (currentState) {
                case 0: {
                    stateChanged = this.stateHolder.compareAndSet(null, null, 0, 4);
                    if (!stateChanged) continue block8;
                    this.onTerminate();
                    return CompletableFuture.completedFuture(null);
                }
                case 1: {
                    this.stateHolder.set(null, 4);
                    currentFuture.cancel(true);
                    return CompletableFuture.completedFuture(null);
                }
                case 2: {
                    terminating = new ExtendedCompletableFuture<T>();
                    stateChanged = this.stateHolder.compareAndSet(currentFuture, terminating, 2, 3);
                    if (stateChanged) ** break;
                    continue block8;
                    try {
                        var8_8 = CompletableFuture.allOf(new CompletableFuture[]{this.delegateReceiver.terminateAsync(gracePeriod), this.delegatePublisher.terminateAsync(gracePeriod)});
                        return var8_8;
                    }
                    finally {
                        this.stateHolder.set(null, 4);
                    }
                }
            }
            break;
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void terminateAsync(CompletionListener<Void> terminationListener, long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(terminationListener, "Termination listener can't be null");
        CompletableFuture<Void> disconnecting = this.terminateAsync(gracePeriod);
        disconnecting.whenComplete((nothing, throwable) -> {
            block2: {
                try {
                    terminationListener.onCompletion(null, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block2;
                    logger.warn((Object)"Application code throw an unhandled exception by processing termination completion notification", (Throwable)e);
                }
            }
        });
    }

    @Override
    public void receiveMessage(RequestReplyMessageReceiver.RequestMessageHandler messageHandler) throws PubSubPlusClientException {
        InboundMessage inboundMessage = this.delegateReceiver.receiveMessage();
        if (inboundMessage != null) {
            this.doRequestReply(messageHandler, inboundMessage);
        }
    }

    @Override
    public void receiveMessage(RequestReplyMessageReceiver.RequestMessageHandler messageHandler, long timeOut) throws PubSubPlusClientException {
        InboundMessage inboundMessage = this.delegateReceiver.receiveMessage(timeOut);
        if (inboundMessage != null) {
            this.doRequestReply(messageHandler, inboundMessage);
        }
    }

    @Override
    public void receiveAsync(RequestReplyMessageReceiver.RequestMessageHandler messageHandler) throws PubSubPlusClientException {
        this.delegateReceiver.receiveAsync((InboundMessage inboundMessage) -> this.doRequestReply(messageHandler, inboundMessage));
    }

    @Override
    public void receiveAsync(RequestReplyMessageReceiver.RequestMessageHandler messageHandler, ExecutorService executorService) throws PubSubPlusClientException {
        this.delegateReceiver.receiveAsync((InboundMessage inboundMessage) -> this.doRequestReply(messageHandler, inboundMessage), executorService);
    }

    @Override
    public ReceiverInfo receiverInfo() {
        return this.receiverInfo;
    }

    @Internal
    void doRequestReply(RequestReplyMessageReceiver.RequestMessageHandler messageHandler, InboundMessage requestMessage) {
        block3: {
            if (requestMessage != null) {
                BytesXMLMessage solaceMessage = SolaceMessageUtil.getSolaceMessage(requestMessage);
                Topic replyTo = solaceMessage.getReplyTo() == null ? null : Topic.of(solaceMessage.getReplyTo().getName());
                String correlationId = solaceMessage.getCorrelationId();
                ReplierImpl replier = replyTo == null || correlationId == null || correlationId.isEmpty() ? null : new ReplierImpl(this.serviceInternalView, requestMessage, replyTo, correlationId);
                try {
                    messageHandler.onMessage(requestMessage, replier);
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block3;
                    logger.warn((Object)"Application code threw an unhandled exception while processing request message in RequestMessageHandler", (Throwable)e);
                }
            }
        }
    }

    @Internal
    void terminateOnUnsolicitedInterruption() throws PubSubPlusClientException {
        this.stateHolder.set(null, 4);
    }

    @Internal
    void onTerminate() {
        this.delegateReceiver.terminateNow();
        this.delegatePublisher.terminate(0L);
    }

    @Internal
    @ProviderType
    private class RequestReplyReceiverInfoImpl
    implements ReceiverInfo {
        private RequestReplyReceiverInfoImpl() {
        }

        @Override
        public String getInstanceName() {
            return RequestReplyMessageReceiverImpl.this.instanceName;
        }

        @Override
        public long getId() {
            return RequestReplyMessageReceiverImpl.this.id;
        }
    }

    @Internal
    @ProviderType
    static class CorrelationContext
    implements Serializable {
        private static final long serialVersionUID = 426395239935681793L;
        private final Long correlationKey;
        private volatile Object userContext;
        private volatile OutboundMessage linkedMessage;

        CorrelationContext(Long correlationKey, Object userContext, OutboundMessage message) {
            this.correlationKey = correlationKey;
            this.userContext = userContext;
            this.linkedMessage = message;
        }

        public Long getCorrelationKey() {
            return this.correlationKey;
        }

        public Object getUserContext() {
            return this.userContext;
        }

        public OutboundMessage getLinkedMessage() {
            return this.linkedMessage;
        }

        public void clear() {
            this.linkedMessage = null;
            this.userContext = null;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            CorrelationContext context = (CorrelationContext)o;
            return Objects.equals(this.correlationKey, context.correlationKey);
        }

        public int hashCode() {
            return this.correlationKey != null ? this.correlationKey.hashCode() : 0;
        }
    }

    @Internal
    @ProviderType
    static class MessageCorrelationKeyProvider
    implements Serializable {
        final AtomicLong messageKeyProvider = new AtomicLong(ThreadLocalRandom.current().nextLong(1L, 1000L));

        MessageCorrelationKeyProvider() {
        }

        Long nextLongKey() {
            return this.messageKeyProvider.incrementAndGet();
        }
    }

    @Internal
    @ProviderType
    class ReplierImpl
    implements RequestReplyMessageReceiver.Replier {
        private final MessagingServiceInternalView serviceInternalView;
        private final InboundMessage inboundMessage;
        private final Topic replyTo;
        private final String correlationId;

        ReplierImpl(MessagingServiceInternalView serviceInternalView, InboundMessage inboundMessage, Topic replyTo, String correlationId) {
            this.serviceInternalView = serviceInternalView;
            this.inboundMessage = inboundMessage;
            this.replyTo = replyTo;
            this.correlationId = correlationId;
        }

        @Override
        public void reply(OutboundMessage responseMessage) throws PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException {
            this.reply(responseMessage, null);
        }

        @Override
        public void reply(OutboundMessage responseMessage, Properties additionalMessageProperties) throws PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException {
            BytesXMLMessage solaceMessage = SolaceMessageUtil.getSolaceMessage(responseMessage);
            solaceMessage.setAsReplyMessage(true);
            if (this.correlationId != null) {
                solaceMessage.setCorrelationId(this.correlationId);
            }
            CorrelationContext messageContext = new CorrelationContext(RequestReplyMessageReceiverImpl.this.messageKeyProvider.nextLongKey(), this.inboundMessage, responseMessage);
            solaceMessage.setCorrelationKey((Object)messageContext);
            try {
                RequestReplyMessageReceiverImpl.this.delegatePublisher.publish(responseMessage, this.replyTo, additionalMessageProperties);
            }
            catch (IllegalStateException ise) {
                if (RequestReplyMessageReceiverImpl.this.delegatePublisher.isTerminated()) {
                    RequestReplyMessageReceiverImpl.this.serviceInternalView.getApiMetricsCollector().incrementMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED);
                }
                throw ise;
            }
        }
    }
}

