/*
 * Decompiled with CFR 0.152.
 */
package com.solace.messaging.receiver;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.receiver.AsyncReceiverSubscriptions;
import com.solace.messaging.receiver.DirectMessageReceiver;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.ReceiverBuffers;
import com.solace.messaging.receiver.TraceableReceiver;
import com.solace.messaging.resources.ShareName;
import com.solace.messaging.resources.TopicSubscription;
import com.solace.messaging.trace.propagation.MessageTracingSupport;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageableReceiver;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExecutableMessageHandler;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessageReceiptFailureNotificationDispatcher;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.Subscription;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageListener;
import com.solacesystems.jcsmp.statistics.StatType;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
public class DirectMessageReceiverImpl
implements DirectMessageReceiver,
TraceableReceiver {
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTING = 1;
    static final int STATE_STARTED = 2;
    static final int STATE_TERMINATING = 3;
    static final int STATE_TERMINATED = 4;
    final AtomicStampedReference<CompletableFuture> stateHolder = new AtomicStampedReference<Object>(null, 0);
    private final MessagingServiceInternalView serviceInternalView;
    private final TypedProperties receiverConfiguration;
    private final ShareName shareName;
    private final ReceiverBuffers.ReceiverBuffer buffer;
    private final MessagingService.ReconnectionListener reconnectionListener;
    private final MessagingService.ReconnectionAttemptListener reconnectionAttemptListener;
    private final ReceiverBuffers.ReceiverBuffer.DiscardMessageHandler discardMessageHandler;
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener;
    private final ClientSession.ClientSessionStateListener closedSessionListener;
    private final List<Task<DirectMessageReceiverImpl>> postStartAsyncActions = new CopyOnWriteArrayList<Task<DirectMessageReceiverImpl>>();
    private final List<AppliedTopicSubscription> appliedTopicSubscriptions = new CopyOnWriteArrayList<AppliedTopicSubscription>();
    private final ExecutorService defaultReceiverExecutorService;
    private final ExecutorService asyncSubscriptionsExecutorService;
    private static final Task<DirectMessageReceiverImpl> NO_OP_TASK = receiver -> {};
    private final MessageReceiptFailureNotificationDispatcher messageReceiptFailureNotificationDispatcher;
    private final AtomicReference<ExecutableMessageHandler> messageHandlerRef = new AtomicReference();
    private final ReentrantLock messageHandlersLock = new ReentrantLock();
    private final ReentrantLock subscriptionsLock = new ReentrantLock();
    private final Condition notEmpty = this.messageHandlersLock.newCondition();
    private final AsyncConsumerMessageDispatchTask asyncConsumerMessageDispatchTask;
    private final ManageableReceiver.DirectReceiverInfo receiverInfo;
    private final TerminationNotificationDispatcher terminationNotificationDispatcher;
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "DirectMessageReceiver@" + this.id;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0L);
    private static final Log logger = LogFactory.getLog(DirectMessageReceiverImpl.class);

    public DirectMessageReceiverImpl(MessagingServiceInternalView serviceInternalView, TypedProperties receiverConfiguration, List<TopicSubscription> topicSubscriptions) {
        this(serviceInternalView, receiverConfiguration, topicSubscriptions, ShareName.ShareNameImpl.noOp());
    }

    public DirectMessageReceiverImpl(final MessagingServiceInternalView serviceInternalView, TypedProperties receiverConfiguration, List<TopicSubscription> topicSubscriptions, ShareName shareName) {
        this.defaultReceiverExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-message-dispatcher"));
        this.asyncSubscriptionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-subscription-dispatcher"));
        this.receiverInfo = new DirectReceiverInfoImpl();
        this.serviceInternalView = serviceInternalView;
        this.terminationNotificationDispatcher = new TerminationNotificationDispatcher();
        this.receiverConfiguration = receiverConfiguration;
        this.shareName = shareName;
        this.buffer = ReceiverBuffers.createDirectReceiverBuffer(this.receiverConfiguration, this.serviceInternalView.getApiMetricsCollector());
        this.messageReceiptFailureNotificationDispatcher = new MessageReceiptFailureNotificationDispatcher(this.receiverInfo);
        this.discardMessageHandler = new ReceiverBuffers.ReceiverBuffer.DiscardMessageHandler(){
            final MessagingServiceInternalView service;
            {
                this.service = DirectMessageReceiverImpl.this.serviceInternalView;
            }

            @Override
            public void onDiscardedMessage(ReceiverBuffers.Receivable receivable) {
                if (receivable != null) {
                    serviceInternalView.getApiMetricsCollector().incrementMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_BACKPRESSURE_DISCARDED);
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)(DirectMessageReceiverImpl.this.instanceName + " discarded a message on back-pressure." + receivable.getMessage()));
                    }
                }
            }
        };
        this.serviceInterruptionListener = new MessagingService.ServiceInterruptionListener(){

            @Override
            public void onServiceInterrupted(MessagingService.ServiceEvent e) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)"Shutting down receiver due to service interruption");
                }
                DirectMessageReceiverImpl.this.stateHolder.set(null, 4);
                DirectMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(e.getTimestamp(), e.getMessage(), e.getCause()));
                DirectMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
            }
        };
        this.closedSessionListener = new ClientSession.ClientSessionStateListener(){

            @Override
            public void onClientSessionStateChange(ClientSession.ClientSessionStateChangeEvent event) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)"Shutting down receiver due to service closure");
                }
                DirectMessageReceiverImpl.this.stateHolder.set(null, 4);
                DirectMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(event.getTimestamp(), event.getMessage(), event.getCause()));
                DirectMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
            }
        };
        this.buffer.addDiscardedHandler(this.discardMessageHandler);
        this.reconnectionListener = new MessagingService.ReconnectionListener(){

            @Override
            public void onReconnected(MessagingService.ServiceEvent e) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(DirectMessageReceiverImpl.this.instanceName + " is reconnected."));
                }
            }
        };
        this.reconnectionAttemptListener = new MessagingService.ReconnectionAttemptListener(){

            @Override
            public void onReconnecting(MessagingService.ServiceEvent e) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(DirectMessageReceiverImpl.this.instanceName + " is reconnecting."));
                }
            }
        };
        this.postStartAsyncActions.add(this.addSubscriptions(new ArrayList<TopicSubscription>(topicSubscriptions), shareName));
        this.asyncConsumerMessageDispatchTask = new AsyncConsumerMessageDispatchTask(this.buffer, this, this.messageHandlersLock, this.notEmpty);
    }

    @Override
    public DirectMessageReceiver start() throws PubSubPlusClientException, IllegalStateException {
        try {
            this.startAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " failed to start"), (Throwable)e);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        }
        catch (CancellationException e) {
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        }
        return this;
    }

    @Override
    public void terminate(long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        block7: {
            if (gracePeriod == 0L) {
                this.terminateNow();
                return;
            }
            try {
                this.terminateAsync(gracePeriod).get();
            }
            catch (ExecutionException e) {
                Throwable t = e.getCause();
                if (t != null) {
                    if (t instanceof PubSubPlusClientException) {
                        throw (PubSubPlusClientException)t;
                    }
                    throw new PubSubPlusClientException(t);
                }
                throw new PubSubPlusClientException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PubSubPlusClientException.RequestInterruptedException("Message receiver termination was interrupted", e);
            }
            catch (Exception e) {
                if (!logger.isWarnEnabled()) break block7;
                logger.warn((Object)(this.instanceName + " encountered problem during termination."), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        block5: {
            int m;
            this.stateHolder.set(null, 4);
            AtomicInteger lostMessages = new AtomicInteger(0);
            Task<DirectMessageReceiverImpl> postTerminationTask = receiver -> {
                block4: {
                    try {
                        int bufferSize = receiver.buffer.size();
                        boolean bufferEmpty = bufferSize < 1;
                        receiver.buffer.clear(receiver.createTerminationDiscardConsumer());
                        if (!bufferEmpty) {
                            lostMessages.set(bufferSize);
                            if (logger.isWarnEnabled()) {
                                logger.warn((Object)(receiver.instanceName + " non-gracefully terminated before all buffered messages were processed."));
                            }
                        }
                    }
                    catch (PubSubPlusClientException.RequestInterruptedException e) {
                        if (!logger.isWarnEnabled()) break block4;
                        logger.warn((Object)("Non-graceful termination of " + receiver.instanceName + " was interrupted"));
                    }
                }
            };
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " is being non gracefully terminated"));
                }
                this.onTerminate(null, postTerminationTask);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " is terminated"));
                }
                m = lostMessages.get();
                if (lostMessages.get() <= 0) break block5;
                this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, m);
            }
            catch (Throwable throwable) {
                int m2 = lostMessages.get();
                if (lostMessages.get() > 0) {
                    this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, m2);
                    throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to non graceful termination", m2), m2);
                }
                throw throwable;
            }
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to non graceful termination", m), m);
        }
    }

    void terminateOnUnsolicitedInterruption() throws PubSubPlusClientException {
        this.stateHolder.set(null, 4);
        Task<DirectMessageReceiverImpl> postTerminationTask = receiver -> {
            block4: {
                try {
                    int bufferSize = receiver.buffer.size();
                    boolean bufferEmpty = bufferSize < 1;
                    receiver.buffer.clear(receiver.createTerminationDiscardConsumer());
                    if (!bufferEmpty) {
                        this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, bufferSize);
                        if (logger.isWarnEnabled()) {
                            logger.warn((Object)(receiver.instanceName + " non-gracefully terminated before all buffered messages were processed."));
                        }
                    }
                }
                catch (PubSubPlusClientException.RequestInterruptedException e) {
                    if (!logger.isWarnEnabled()) break block4;
                    logger.warn((Object)("Non-graceful termination of " + receiver.instanceName + " was interrupted"));
                }
            }
        };
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is being non gracefully terminated due to service interruption"));
        }
        this.onTerminate(null, postTerminationTask);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is terminated"));
        }
    }

    @Override
    public boolean isRunning() {
        return 2 == this.stateHolder.getStamp();
    }

    @Override
    public boolean isTerminated() {
        return 4 == this.stateHolder.getStamp();
    }

    @Override
    public boolean isTerminating() {
        return 3 == this.stateHolder.getStamp();
    }

    @Override
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener listener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(listener);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public <DirectMessageReceiver> CompletableFuture<DirectMessageReceiver> startAsync() throws PubSubPlusClientException, IllegalStateException {
        int state = this.stateHolder.getStamp();
        if (3 == state || state == 4) {
            throw new IllegalStateException("Message receiver is already terminated");
        }
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Message receiver can't be started when service is not connected");
        }
        block6: while (true) {
            if (!this.serviceInternalView.isConnected()) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver can't be started when service is not connected"));
            }
            int[] currentStateHolder = new int[1];
            CompletableFuture currentFuture = this.stateHolder.get(currentStateHolder);
            int currentState = currentStateHolder[0];
            switch (currentState) {
                case 0: {
                    ExtendedCompletableFuture<DirectMessageReceiverImpl> starting;
                    boolean stateChanged;
                    if (!(stateChanged = this.stateHolder.compareAndSet(null, starting = new ExtendedCompletableFuture<DirectMessageReceiverImpl>(), 0, 1))) continue block6;
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)(this.instanceName + " is being started"));
                    }
                    try {
                        boolean startingToStarted = this.stateHolder.compareAndSet(starting, starting, 1, 2);
                        if (!startingToStarted) {
                            int newCurrentState = this.stateHolder.getStamp();
                            if (newCurrentState >= 3) {
                                this.onTerminate(null, null);
                                starting.completeExceptionally(new CancellationException("Starting of message receiver was interrupted"));
                                return starting;
                            }
                        } else {
                            this.onStart();
                            this.onPostStart();
                        }
                        starting.complete(this);
                        if (!logger.isDebugEnabled()) return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                        logger.debug((Object)(this.instanceName + " is started"));
                        return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                    }
                    catch (Exception e) {
                        this.stateHolder.set(null, 4);
                        this.onTerminate(null, null);
                        starting.completeExceptionally(PubSubPlusClientException.of(e));
                        if (!logger.isErrorEnabled()) return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                        logger.error((Object)(this.instanceName + " failed to start and is terminating"), (Throwable)e);
                    }
                    return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                        this.stateHolder.set(null, 4);
                        this.onTerminate(null, null);
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(this.instanceName + " async start was canceled"));
                        }
                    });
                }
                case 1: 
                case 2: {
                    return currentFuture;
                }
            }
            break;
        }
        return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver is already terminated"));
    }

    @Override
    public void setReceiveFailureListener(MessageReceiver.ReceiveFailureListener receiveFailureListener) {
        this.messageReceiptFailureNotificationDispatcher.setReceiveFailureListener(receiveFailureListener);
    }

    @Override
    public <DirectMessageReceiver> void startAsync(CompletionListener<DirectMessageReceiver> startListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(startListener, "Start listener can't be null");
        CompletableFuture<DirectMessageReceiver> onceStated = this.startAsync();
        onceStated.whenComplete((receiver, throwable) -> {
            block2: {
                try {
                    startListener.onCompletion(receiver, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block2;
                    logger.warn((Object)"Application code throw an unhandled exception by processing async start completion notification", (Throwable)e);
                }
            }
        });
    }

    void onStart() throws PubSubPlusClientException {
        this.serviceInternalView.enableConsumerAPI();
        this.serviceInternalView.addReconnectionListener(this.reconnectionListener);
        this.serviceInternalView.addReconnectionAttemptListener(this.reconnectionAttemptListener);
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
    }

    void onPostStart() {
        try {
            this.defaultReceiverExecutorService.submit(this.asyncConsumerMessageDispatchTask);
        }
        catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not schedule consumer dispatcher task"));
            }
            throw new PubSubPlusClientException("Internal Executor service rejected consumer dispatcher task", e);
        }
        this.addInitialSubscriptions();
    }

    void onTerminate(Task<DirectMessageReceiverImpl> preTerminationTask, Task<DirectMessageReceiverImpl> postTerminationTask) {
        try {
            if (preTerminationTask != null) {
                preTerminationTask.run(this);
            }
            this.appliedTopicSubscriptions.removeIf(appliedSubscription -> {
                block2: {
                    try {
                        ((AppliedTopicSubscription)appliedSubscription).consumer.close();
                    }
                    catch (Exception e) {
                        if (!logger.isWarnEnabled()) break block2;
                        logger.warn((Object)(this.instanceName + "  encountered problem closing subscriptions during termination."), (Throwable)e);
                    }
                }
                return true;
            });
            this.postStartAsyncActions.clear();
            this.serviceInternalView.removeReconnectionListener(this.reconnectionListener);
            this.serviceInternalView.removeReconnectionAttemptListener(this.reconnectionAttemptListener);
            this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
            this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " receiver is shutdown"));
            }
            this.buffer.clearDiscardedHandler();
            if (!this.defaultReceiverExecutorService.isShutdown()) {
                this.defaultReceiverExecutorService.shutdown();
            }
        }
        finally {
            if (postTerminationTask != null) {
                postTerminationTask.run(this);
            }
        }
    }

    @Override
    public InboundMessage receiveMessage() throws PubSubPlusClientException.RequestInterruptedException, PubSubPlusClientException {
        if (this.isRunning() || this.isTerminating()) {
            return this.buffer.consume().getMessage();
        }
        throw new IllegalStateException("Message receiver is not started");
    }

    @Override
    public InboundMessage receiveMessage(long timeOut) throws PubSubPlusClientException, PubSubPlusClientException.RequestInterruptedException {
        if (this.isRunning() || this.isTerminating()) {
            ReceiverBuffers.Receivable receivable = this.buffer.consume(timeOut, TimeUnit.MILLISECONDS);
            return receivable != null ? receivable.getMessage() : null;
        }
        throw new IllegalStateException("Message receiver is not started");
    }

    @Override
    public InboundMessage receiveOrElse(MessageReceiver.InboundMessageSupplier supplierOfAlternativeResponse) {
        if (this.isRunning() || this.isTerminating()) {
            Validation.nullIllegal(supplierOfAlternativeResponse, "Response supplier can't be null");
            ReceiverBuffers.Receivable receivable = this.buffer.consumeOrNull();
            return receivable != null ? receivable.getMessage() : (InboundMessage)supplierOfAlternativeResponse.get();
        }
        throw new IllegalStateException("Message receiver is not started");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveAsync(MessageReceiver.MessageHandler messageHandler) throws PubSubPlusClientException {
        Validation.nullIllegal(messageHandler, "Message handler can't be null");
        ReentrantLock lock = this.messageHandlersLock;
        lock.lock();
        try {
            boolean hasHandler = this.messageHandlerRef.compareAndSet(null, new ExecutableMessageHandler(messageHandler, null, logger, this.instanceName));
            if (!hasHandler) {
                throw new IllegalStateException("receiveAsync can be called once only for the given receiver instance");
            }
            this.notEmpty.signalAll();
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveAsync(MessageReceiver.MessageHandler messageHandler, ExecutorService executorService) throws PubSubPlusClientException {
        Validation.nullIllegal(messageHandler, "Message handler can't be null");
        Validation.nullIllegal(executorService, "Executor service can't be null");
        ReentrantLock lock = this.messageHandlersLock;
        lock.lock();
        try {
            boolean hasHandler = this.messageHandlerRef.compareAndSet(null, new ExecutableMessageHandler(messageHandler, executorService, logger, this.instanceName));
            if (!hasHandler) {
                throw new IllegalStateException("receiveAsync can be called once only for the given receiver instance");
            }
            this.notEmpty.signalAll();
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void addSubscription(TopicSubscription anotherSubscription) throws PubSubPlusClientException, InterruptedException {
        Validation.nullIllegal(anotherSubscription, "Topic subscription can't be null");
        Validation.nullOrEmptyIllegal(anotherSubscription.getName(), "Topic subscription name can't be null or empty");
        this.addSubscription0(anotherSubscription, this.shareName, this);
    }

    @Override
    public void removeSubscription(TopicSubscription topicSubscription) throws PubSubPlusClientException, InterruptedException {
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        this.removeSubscription0(topicSubscription, this.shareName, this);
    }

    @Override
    public void addSubscriptionAsync(TopicSubscription topicSubscription, AsyncReceiverSubscriptions.SubscriptionChangeListener listener) throws PubSubPlusClientException {
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullIllegal(listener, "Listener can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        this.addSubscriptionAsync0(topicSubscription, this.shareName, listener, this);
    }

    @Override
    public void removeSubscriptionAsync(TopicSubscription topicSubscription, AsyncReceiverSubscriptions.SubscriptionChangeListener listener) throws PubSubPlusClientException {
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullIllegal(listener, "Listener can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        this.removeSubscriptionAsync0(topicSubscription, this.shareName, listener, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public CompletableFuture<Void> terminateAsync(long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, gracePeriod, "Grace period < 1");
        lostMessages = new AtomicInteger(0);
        block8: while (true) {
            currentStateHolder = new int[1];
            currentFuture = this.stateHolder.get(currentStateHolder);
            currentState = currentStateHolder[0];
            switch (currentState) {
                case 0: {
                    stateChanged = this.stateHolder.compareAndSet(null, null, 0, 4);
                    if (!stateChanged) continue block8;
                    this.onTerminate(null, null);
                    return CompletableFuture.completedFuture(null);
                }
                case 1: {
                    this.stateHolder.set(null, 4);
                    currentFuture.cancel(true);
                    return CompletableFuture.completedFuture(null);
                }
                case 2: {
                    terminating = new ExtendedCompletableFuture<Void>();
                    stateChanged = this.stateHolder.compareAndSet(currentFuture, terminating, 2, 3);
                    if (stateChanged) ** break;
                    continue block8;
                    postTerminationTask = (Task<DirectMessageReceiverImpl>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$terminateAsync$6(long java.util.concurrent.atomic.AtomicInteger com.solace.messaging.receiver.DirectMessageReceiverImpl ), (Lcom/solace/messaging/receiver/DirectMessageReceiverImpl;)V)((DirectMessageReceiverImpl)this, (long)gracePeriod, (AtomicInteger)lostMessages);
                    try {
                        this.onTerminate(null, postTerminationTask);
                    }
                    finally {
                        this.stateHolder.set(null, 4);
                    }
                    lm = lostMessages.get();
                    if ((long)lm > 0L) {
                        this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, lm);
                        terminating.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to expiration of a grace period", new Object[]{lm}), lm));
                    } else {
                        terminating.complete(null);
                    }
                    return terminating;
                }
            }
            break;
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void terminateAsync(CompletionListener<Void> terminationListener, long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(terminationListener, "Termination listener can't be null");
        CompletableFuture<Void> disconnecting = this.terminateAsync(gracePeriod);
        disconnecting.whenComplete((nothing, throwable) -> {
            block2: {
                try {
                    terminationListener.onCompletion(null, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block2;
                    logger.warn((Object)"Application code throw an unhandled exception by processing termination completion notification", (Throwable)e);
                }
            }
        });
    }

    @Override
    public ManageableReceiver.DirectReceiverInfo receiverInfo() {
        return this.receiverInfo;
    }

    Task<DirectMessageReceiverImpl> addSubscriptions(List<TopicSubscription> topicSubscriptions, ShareName shareName) {
        if (topicSubscriptions.isEmpty()) {
            return NO_OP_TASK;
        }
        return receiver -> {
            if (receiver.isRunning()) {
                for (TopicSubscription subscription : topicSubscriptions) {
                    if (receiver.serviceInternalView.getClientSession().isConnected()) {
                        if (Thread.currentThread().isInterrupted()) break;
                        if (subscription == null || shareName == null) continue;
                        this.addSubscription0(subscription, shareName, (DirectMessageReceiverImpl)receiver);
                        continue;
                    }
                    throw new IllegalStateException("Subscriptions can't be added while service is not connected");
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addSubscription0(TopicSubscription subscription, ShareName shareName, final DirectMessageReceiverImpl receiver) throws PubSubPlusClientException {
        block10: {
            if (!receiver.isRunning() || !receiver.serviceInternalView.getClientSession().isConnected()) {
                throw new IllegalStateException("Subscription can't be added when receiver is not started or is not connected");
            }
            ReentrantLock sLock = this.subscriptionsLock;
            try {
                sLock.lockInterruptibly();
                try {
                    if (this.appliedTopicSubscriptions.contains(AppliedTopicSubscription.comparable(subscription, shareName))) break block10;
                    InboundMessage.SolaceMessageListener messageDispatcher = new InboundMessage.SolaceMessageListener(subscription.getName()){

                        @Override
                        public void onException(PubSubPlusClientException e) {
                            if (e.getCause() != null && e.getCause() instanceof ClosedFacilityException && logger.isErrorEnabled()) {
                                logger.error((Object)(DirectMessageReceiverImpl.this.instanceName + " receiver is down ."), (Throwable)e);
                            }
                            DirectMessageReceiverImpl.this.messageReceiptFailureNotificationDispatcher.onException(e);
                            if (logger.isErrorEnabled()) {
                                logger.error((Object)(DirectMessageReceiverImpl.this.instanceName + " encountered problem during message reception."), (Throwable)e);
                            }
                        }

                        @Override
                        public void onReceive(InboundMessage message) {
                            try {
                                receiver.buffer.insert(ReceiverBuffers.Receivable.of(message));
                            }
                            catch (Exception e) {
                                DirectMessageReceiverImpl.this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                                DirectMessageReceiverImpl.this.messageReceiptFailureNotificationDispatcher.onException(e);
                                if (logger.isErrorEnabled()) {
                                    logger.error((Object)(DirectMessageReceiverImpl.this.instanceName + " encountered problem during message reception."), (Throwable)e);
                                }
                                return;
                            }
                        }
                    };
                    Topic subscribeTo = receiver.createTopic(subscription, shareName);
                    try {
                        com.solacesystems.jcsmp.Consumer subscriptionConsumer = receiver.serviceInternalView.getClientSession().addSubscription((Subscription)subscribeTo, messageDispatcher, null);
                        subscriptionConsumer.start();
                        receiver.appliedTopicSubscriptions.add(new AppliedTopicSubscription(subscription, shareName, subscriptionConsumer, messageDispatcher));
                    }
                    catch (JCSMPException e) {
                        if (logger.isErrorEnabled()) {
                            logger.error((Object)(this.instanceName + " failed to apply subscription: " + subscribeTo), (Throwable)e);
                        }
                        throw new PubSubPlusClientException("Subscription could not be applied", e);
                    }
                }
                finally {
                    sLock.unlock();
                }
            }
            catch (InterruptedException e) {
                if (logger.isErrorEnabled()) {
                    logger.error((Object)(this.instanceName + " failed to apply subscription due to interruption"), (Throwable)e);
                }
                throw new PubSubPlusClientException.RequestInterruptedException("Failed to apply subscription due to interruption", e);
            }
        }
    }

    void addSubscriptionAsync0(TopicSubscription subscription, ShareName shareName, AsyncReceiverSubscriptions.SubscriptionChangeListener listener, DirectMessageReceiverImpl receiver) throws PubSubPlusClientException {
        if (!receiver.isRunning() || !receiver.serviceInternalView.getClientSession().isConnected()) {
            throw new IllegalStateException("Subscription can't be added when receiver is not started or is not connected");
        }
        try {
            this.asyncSubscriptionsExecutorService.submit(() -> {
                AsyncReceiverSubscriptions.AddSubscriptionListenerAdapter subscriptionListener = new AsyncReceiverSubscriptions.AddSubscriptionListenerAdapter(listener, subscription);
                try {
                    this.addSubscription0(subscription, shareName, receiver);
                    subscriptionListener.handleSuccess(subscription);
                }
                catch (Exception e) {
                    subscriptionListener.handleError(subscription, e);
                }
                return null;
            });
        }
        catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not schedule adding of subscription: " + subscription));
            }
            throw new PubSubPlusClientException("Adding subscription task was rejected by internal executor service", e);
        }
    }

    void removeSubscription0(TopicSubscription subscription, ShareName shareName, DirectMessageReceiverImpl receiver) throws PubSubPlusClientException {
        this.appliedTopicSubscriptions.removeIf(appliedSubscription -> {
            if (shareName.equals(((AppliedTopicSubscription)appliedSubscription).shareName) && subscription.equals(((AppliedTopicSubscription)appliedSubscription).subscription)) {
                try {
                    ((AppliedTopicSubscription)appliedSubscription).consumer.close();
                }
                catch (Exception e) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)(this.instanceName + " failed to remove subscription: " + subscription), (Throwable)e);
                    }
                    throw new PubSubPlusClientException("Un-subscription could not be performed", e);
                }
                return true;
            }
            return false;
        });
    }

    void removeSubscriptionAsync0(TopicSubscription subscription, ShareName shareName, AsyncReceiverSubscriptions.SubscriptionChangeListener listener, DirectMessageReceiverImpl receiver) throws PubSubPlusClientException {
        try {
            this.asyncSubscriptionsExecutorService.submit(() -> {
                AtomicBoolean noSuchSubscription = new AtomicBoolean(true);
                AsyncReceiverSubscriptions.RemoveSubscriptionListenerAdapter subscriptionListenerAdapter = new AsyncReceiverSubscriptions.RemoveSubscriptionListenerAdapter(listener, subscription);
                this.appliedTopicSubscriptions.removeIf(appliedSubscription -> {
                    if (shareName.equals(((AppliedTopicSubscription)appliedSubscription).shareName) && subscription.equals(((AppliedTopicSubscription)appliedSubscription).subscription)) {
                        block8: {
                            noSuchSubscription.set(false);
                            try {
                                ((AppliedTopicSubscription)appliedSubscription).consumer.close();
                                try {
                                    subscriptionListenerAdapter.handleSuccess(subscription);
                                }
                                catch (Exception e) {
                                    if (logger.isWarnEnabled()) {
                                        logger.warn((Object)("Application code throw an unhandled exception by processing adding of subscription: " + subscription), (Throwable)e);
                                    }
                                }
                            }
                            catch (Exception e) {
                                try {
                                    subscriptionListenerAdapter.handleError(subscription, e);
                                }
                                catch (Exception ex) {
                                    if (!logger.isWarnEnabled()) break block8;
                                    logger.warn((Object)("Application code throw an unhandled exception by processing removal of subscription: " + subscription), (Throwable)e);
                                }
                            }
                        }
                        return true;
                    }
                    return false;
                });
                if (noSuchSubscription.get()) {
                    subscriptionListenerAdapter.handleSuccess(subscription);
                }
                return null;
            });
        }
        catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not schedule removing of subscription: " + subscription));
            }
            throw new PubSubPlusClientException("Subscription removal task was rejected by internal executor service", e);
        }
    }

    Topic createTopic(TopicSubscription subscription, ShareName shareName) {
        if (ShareName.ShareNameImpl.noOp().equals(shareName)) {
            return JCSMPFactory.onlyInstance().createTopic(subscription.getName());
        }
        StringBuilder subscStringBuilder = new StringBuilder("#share/");
        subscStringBuilder.append(shareName.getName());
        subscStringBuilder.append("/");
        subscStringBuilder.append(subscription.getName());
        Topic subscribeTo = JCSMPFactory.onlyInstance().createTopic(subscStringBuilder.toString());
        return subscribeTo;
    }

    void onGracefulTerminateEmptyRemainingBuffer(ReceiverBuffers.ReceiverBuffer buffer) {
        buffer.clear(this.createTerminationDiscardConsumer());
    }

    private Consumer<ReceiverBuffers.Receivable> createTerminationDiscardConsumer() {
        return receivable -> {
            Object context;
            InboundMessage message = receivable.getMessage();
            if (message != null && (context = ((MessageTracingSupport)message).getContext()) != null) {
                this.terminationDiscardHook(context, "Direct message receiver terminated with message still in buffer");
            }
        };
    }

    void addInitialSubscriptions() {
        this.postStartAsyncActions.removeIf(task -> {
            task.run(this);
            return true;
        });
    }

    private /* synthetic */ void lambda$terminateAsync$6(long gracePeriod, AtomicInteger lostMessages, DirectMessageReceiverImpl receiver) {
        boolean bufferEmpty = this.buffer.awaitEmpty(gracePeriod, TimeUnit.MILLISECONDS);
        if (!bufferEmpty) {
            if (logger.isWarnEnabled()) {
                logger.warn((Object)(receiver.instanceName + " shutdown gracefully before all buffered messages were processed, grace period was not sufficient: " + gracePeriod));
            }
            lostMessages.set(receiver.buffer.size());
            this.onGracefulTerminateEmptyRemainingBuffer(receiver.buffer);
        }
    }

    @Internal
    @ProviderType
    private class DirectReceiverInfoImpl
    implements ManageableReceiver.DirectReceiverInfo {
        @Override
        public long getId() {
            return DirectMessageReceiverImpl.this.id;
        }

        @Override
        public String getInstanceName() {
            return DirectMessageReceiverImpl.this.instanceName;
        }
    }

    @Internal
    @ProviderType
    private static class AsyncConsumerMessageDispatchTask
    implements Callable<Void> {
        private final ReceiverBuffers.ReceiverBuffer buffer;
        private final DirectMessageReceiverImpl receiver;
        private final ReentrantLock messageHandlersLock;
        private final Condition notEmpty;

        private AsyncConsumerMessageDispatchTask(ReceiverBuffers.ReceiverBuffer buffer, DirectMessageReceiverImpl receiver, ReentrantLock messageHandlersLock, Condition notEmpty) {
            this.buffer = buffer;
            this.receiver = receiver;
            this.messageHandlersLock = messageHandlersLock;
            this.notEmpty = notEmpty;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            while (this.receiver.isRunning() || this.receiver.isTerminating()) {
                try {
                    MessageReceiver.MessageHandler messageHandler = (MessageReceiver.MessageHandler)this.receiver.messageHandlerRef.get();
                    if (messageHandler != null) {
                        ReceiverBuffers.Receivable nextMessageTask = this.buffer.consume();
                        if (nextMessageTask != null) {
                            InboundMessage message = nextMessageTask.getMessage();
                            try {
                                messageHandler.onMessage(message);
                            }
                            catch (Exception t) {
                                if (!logger.isErrorEnabled()) continue;
                                logger.error((Object)("Application code throw an unhandled exception by message processing: " + message), (Throwable)t);
                            }
                            continue;
                        }
                        if (!logger.isInfoEnabled()) continue;
                        logger.info((Object)(this.receiver.instanceName + " buffer returned no message"));
                        continue;
                    }
                    ReentrantLock theLock = this.messageHandlersLock;
                    try {
                        theLock.lockInterruptibly();
                        if (this.receiver.messageHandlerRef.get() != null) continue;
                        this.notEmpty.await();
                    }
                    catch (InterruptedException e) {}
                    continue;
                    finally {
                        theLock.unlock();
                    }
                }
                catch (Exception e) {
                    boolean threadInterrupted = Thread.interrupted();
                    if (threadInterrupted) {
                        if (!logger.isInfoEnabled()) continue;
                        logger.info((Object)(this.receiver.instanceName + " waiting for a message was interrupted"), (Throwable)e);
                        continue;
                    }
                    if (!logger.isErrorEnabled()) continue;
                    logger.error((Object)(this.receiver.instanceName + " Problem during a message reception"), (Throwable)e);
                }
            }
            return null;
        }
    }

    @Internal
    @ProviderType
    static class AppliedTopicSubscription {
        private final TopicSubscription subscription;
        private final ShareName shareName;
        private final com.solacesystems.jcsmp.Consumer consumer;
        private final XMLMessageListener messageListener;

        private AppliedTopicSubscription(TopicSubscription subscription, ShareName shareName, com.solacesystems.jcsmp.Consumer consumer, XMLMessageListener messageListener) {
            this.subscription = subscription;
            this.shareName = shareName;
            this.consumer = consumer;
            this.messageListener = messageListener;
        }

        private static AppliedTopicSubscription comparable(TopicSubscription subscription, ShareName shareName) {
            return new AppliedTopicSubscription(subscription, shareName, null, null);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AppliedTopicSubscription that = (AppliedTopicSubscription)o;
            if (!Objects.equals(this.subscription, that.subscription)) {
                return false;
            }
            return Objects.equals(this.shareName, that.shareName);
        }

        public int hashCode() {
            int result = this.subscription != null ? this.subscription.hashCode() : 0;
            result = 31 * result + (this.shareName != null ? this.shareName.hashCode() : 0);
            return result;
        }
    }
}

