/*
 * Decompiled with CFR 0.152.
 */
package com.solace.messaging.publisher;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PublisherBuffers;
import com.solace.messaging.publisher.PublisherHealthCheck;
import com.solace.messaging.publisher.RequestReplyMessagePublisher;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.trace.propagation.MessageTracingSupport;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageablePublisher;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.BiTask;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPInterruptedException;
import com.solacesystems.jcsmp.JCSMPRequestTimeoutException;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.RequestReplyListener;
import com.solacesystems.jcsmp.Requestor;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
public class RequestReplyMessagePublisherImpl
implements RequestReplyMessagePublisher {
    private static final Log logger = LogFactory.getLog(RequestReplyMessagePublisherImpl.class);
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0L);
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTED = 1;
    static final int STATE_TERMINATED = 2;
    private final long id;
    private final String instanceName;
    private final ManageablePublisher.PublisherInfo publisherInfo;
    final AtomicInteger stateHolder = new AtomicInteger(0);
    private volatile boolean gracefulShutdownInProgress = false;
    private final TypedProperties publisherConfiguration;
    private final MessagingServiceInternalView serviceInternalView;
    private final MessageCorrelationKeyProvider messageKeyProvider;
    private volatile XMLMessageProducer sessionDefaultProducer;
    private volatile Requestor requestor;
    private PublisherHealthCheck.PublisherReadinessListener publisherReadinessListener;
    private final TerminationNotificationDispatcher terminationNotificationDispatcher;
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener;
    private final ClientSession.ClientSessionStateListener closedSessionListener;
    private final JCSMPStreamingPublishCorrelatingEventHandler publishEventHandler;
    private final PublishFailureNotificationDispatcher errorNotificationDispatcher;
    private final Task<RequestReplyMessagePublisherImpl> postTerminationClearOutstandingRequestsSilentTask;
    private final OutStandingRequestTracker outstandingRequestsTracker;
    private static final BiTask<RequestReplyMessagePublisherImpl, AtomicInteger> postTerminateNowTask = (publisher, lostMessages) -> {
        boolean hasOutstandingRequests;
        int outstandingRequests = publisher.outstandingRequestsTracker.getOutStandingRequests();
        boolean bl = hasOutstandingRequests = outstandingRequests > 0;
        if (hasOutstandingRequests) {
            lostMessages.set(outstandingRequests);
            publisher.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, outstandingRequests);
            if (logger.isWarnEnabled()) {
                logger.warn((Object)(publisher.instanceName + " not gracefully terminated before all messages were processed."));
            }
        }
    };

    public RequestReplyMessagePublisherImpl(MessagingServiceInternalView serviceInternalView, TypedProperties publisherConfiguration) {
        this.id = instanceIdGenerator.incrementAndGet();
        this.instanceName = "RequestReplyMessagePublisher@" + this.id;
        this.publisherInfo = new RequestReplyPublisherInfoImpl();
        this.serviceInternalView = serviceInternalView;
        this.publisherConfiguration = publisherConfiguration;
        this.messageKeyProvider = new MessageCorrelationKeyProvider();
        this.outstandingRequestsTracker = new OutStandingRequestTracker();
        this.terminationNotificationDispatcher = new TerminationNotificationDispatcher();
        this.postTerminationClearOutstandingRequestsSilentTask = publisher -> {
            int lostMessages = this.outstandingRequestsTracker.getOutStandingRequests();
            if (lostMessages > 0) {
                publisher.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, lostMessages);
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)(publisher.instanceName + " non-gracefully terminated before all messages were processed."));
                }
            }
            this.outstandingRequestsTracker.close();
        };
        this.serviceInterruptionListener = e -> {
            if (logger.isWarnEnabled()) {
                logger.warn((Object)"Shutting down publisher due to Service interruption");
            }
            if (this.stateHolder.getAndSet(2) < 2) {
                this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(e.getTimestamp(), e.getMessage(), e.getCause()));
                this.onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
            }
        };
        this.closedSessionListener = event -> {
            if (logger.isWarnEnabled()) {
                logger.info((Object)"Shutting down publisher due to service closure");
            }
            if (this.stateHolder.getAndSet(2) < 2) {
                this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(event.getTimestamp(), event.getMessage(), event.getCause()));
                this.onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
            }
        };
        this.errorNotificationDispatcher = new PublishFailureNotificationDispatcher();
        this.publishEventHandler = new JCSMPStreamingPublishCorrelatingEventHandler(){

            public void responseReceivedEx(Object key) {
            }

            public void handleErrorEx(Object key, JCSMPException cause, long timestamp) {
                if (logger.isErrorEnabled()) {
                    logger.error((Object)(RequestReplyMessagePublisherImpl.this.instanceName + " encountered problem during message publishing."), (Throwable)cause);
                }
                if (key != null && key instanceof CorrelationContext) {
                    CorrelationContext messageContext = (CorrelationContext)key;
                    Object userContext = messageContext.userContext;
                    RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler = messageContext.replyMessageHandler;
                    RequestReplyMessagePublisherImpl.this.errorNotificationDispatcher.onException((Exception)((Object)cause), userContext, replyMessageHandler);
                }
            }

            public void handleError(String s, JCSMPException e, long l) {
            }

            public void responseReceived(String s) {
            }
        };
    }

    public void requestPublishCompleteHook(Object context) {
    }

    public void requestPublishFailureHook(Object context, String errorMessage) {
    }

    private void invokeRequestPublishHook(OutboundMessage outboundMessage, Exception exception) {
        Object context = ((MessageTracingSupport)outboundMessage).getContext();
        if (context != null) {
            if (exception == null) {
                this.requestPublishCompleteHook(context);
            } else {
                this.requestPublishFailureHook(context, exception.getMessage());
            }
        }
    }

    @Override
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener listener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(listener);
    }

    @Override
    public boolean isReady() {
        return this.isRunning() && !this.gracefulShutdownInProgress;
    }

    @Override
    public void setPublisherReadinessListener(PublisherHealthCheck.PublisherReadinessListener listener) {
        this.publisherReadinessListener = listener;
    }

    @Override
    public void notifyWhenReady() {
        if (this.publisherReadinessListener == null) {
            logger.warn((Object)"Skip notification on a PublisherReadinessListener, listener is not set");
            return;
        }
        try {
            if (this.isReady()) {
                this.publisherReadinessListener.ready();
            } else {
                logger.debug((Object)"Skip notification on a PublisherReadinessListener, publisher is not ready");
            }
        }
        catch (Exception e) {
            logger.error((Object)"Client code in PublisherReadinessListener:ready() thrown an exception", (Throwable)e);
        }
    }

    @Override
    public boolean isRunning() {
        return 1 == this.stateHolder.get();
    }

    @Override
    public boolean isTerminated() {
        return 2 == this.stateHolder.get();
    }

    @Override
    public boolean isTerminating() {
        return this.gracefulShutdownInProgress;
    }

    @Override
    public RequestReplyMessagePublisher start() {
        if (1 == this.stateHolder.get()) {
            return this;
        }
        try {
            this.startAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " failed to start"));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        }
        catch (CancellationException e) {
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        }
        return this;
    }

    @Override
    public void terminate(long gracePeriod) throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        if (gracePeriod == 0L) {
            this.terminateNow();
            return;
        }
        try {
            this.terminateAsync(gracePeriod).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            throw new PubSubPlusClientException(e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher termination was canceled", e);
        }
    }

    @Override
    public <RequestReplyMessagePublisher> CompletableFuture<RequestReplyMessagePublisher> startAsync() throws PubSubPlusClientException {
        ExtendedCompletableFuture<RequestReplyMessagePublisherImpl> onStart;
        boolean started;
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Publisher can't be started before it is connected to a messaging service");
        }
        int state = this.stateHolder.get();
        if (state == 2) {
            throw new IllegalStateException("Message publisher is already terminated");
        }
        do {
            boolean terminated;
            onStart = new ExtendedCompletableFuture<RequestReplyMessagePublisherImpl>();
            boolean bl = terminated = 2 == this.stateHolder.get();
            if (terminated) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Publisher is already terminated"));
            }
            boolean starting = this.stateHolder.compareAndSet(0, 1);
            if (!starting) continue;
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " is being started"));
            }
            try {
                this.onStart();
                onStart.complete(this);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " is started"));
                }
            }
            catch (Exception e) {
                logger.error((Object)(this.instanceName + " failed to start and is terminating"), (Throwable)e);
                this.onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
                onStart.completeExceptionally(new IllegalStateException("Publisher is already closed due to internal error"));
            }
            return ExtendedCompletableFuture.onCancellation(onStart, (service, throwable) -> {
                this.stateHolder.set(2);
                this.onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
                this.onTerminate(null, null);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " async start was canceled"));
                }
            });
        } while (!(started = 1 == this.stateHolder.get()));
        onStart.complete(this);
        return onStart;
    }

    @Override
    public <RequestReplyMessagePublisher> void startAsync(CompletionListener<RequestReplyMessagePublisher> startListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(startListener, "startListener can't be null");
        CompletableFuture<RequestReplyMessagePublisher> onceConnected = this.startAsync();
        onceConnected.whenComplete((publisher, throwable) -> {
            block2: {
                try {
                    startListener.onCompletion(publisher, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isErrorEnabled()) break block2;
                    logger.error((Object)(this.instanceName + " failed to start"), (Throwable)e);
                }
            }
        });
    }

    @Override
    public CompletableFuture<Void> terminateAsync(long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, gracePeriod, "Grace period < 1");
        ExtendedCompletableFuture<Void> term = new ExtendedCompletableFuture<Void>();
        Task<RequestReplyMessagePublisherImpl> preTerminationTask = publisher -> {
            publisher.stateHolder.set(2);
            publisher.gracefulShutdownInProgress = true;
        };
        AtomicInteger lostMessages = new AtomicInteger(0);
        Task<RequestReplyMessagePublisherImpl> postTerminationTask = publisher -> {
            try {
                this.outstandingRequestsTracker.awaitTermination(gracePeriod);
                int outstandingRequests = this.outstandingRequestsTracker.getOutStandingRequests();
                boolean hasOutstandingRequest = outstandingRequests > 0;
                publisher.gracefulShutdownInProgress = false;
                if (hasOutstandingRequest) {
                    if (logger.isWarnEnabled()) {
                        logger.warn((Object)(publisher.instanceName + " gracefully terminated before all messages were processed, not sufficient grace period of " + gracePeriod));
                    }
                    lostMessages.set(outstandingRequests);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PubSubPlusClientException.RequestInterruptedException("Graceful termination of " + publisher.instanceName + " was interrupted", e);
            }
        };
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is being terminated"));
        }
        this.onTerminate(preTerminationTask, postTerminationTask);
        int lm = lostMessages.get();
        if (lm > 0) {
            this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, lm);
            term.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to expiration of a grace period", lm), lm));
        } else {
            term.complete(null);
        }
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is terminated"));
        }
        return term;
    }

    @Override
    public void terminateAsync(CompletionListener<Void> terminationListener, long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(terminationListener, "Termination listener can't be null");
        CompletableFuture<Void> onceDisconnected = this.terminateAsync(gracePeriod);
        onceDisconnected.whenComplete((nothing, throwable) -> {
            try {
                terminationListener.onCompletion(null, throwable == null ? null : throwable.getCause());
            }
            catch (Exception e) {
                logger.warn((Object)(this.instanceName + "  encountered problem during termination."), (Throwable)e);
            }
        });
    }

    @Override
    public void publish(OutboundMessage requestMessage, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Topic requestDestination, long replyTimeout) {
        this.publish(requestMessage, replyMessageHandler, null, requestDestination, replyTimeout);
    }

    @Override
    public void publish(OutboundMessage requestMessage, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Object userContext, Topic requestDestination, long replyTimeout) {
        this.publish(requestMessage, null, replyMessageHandler, userContext, requestDestination, replyTimeout);
    }

    @Override
    public void publish(OutboundMessage requestMessage, Properties additionalMessageProperties, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Object userContext, Topic requestDestination, long replyTimeout) {
        Validation.nullIllegal(requestMessage, "requestMessage can't be null");
        Validation.nullIllegal(replyMessageHandler, "replyMessageHandler can't be null");
        Validation.nullIllegal(requestDestination, "requestDestination can't be null");
        Validation.smallerThanNumbersIllegal(1L, replyTimeout, "replyTimeout < 1");
        this.validatePublisher();
        this.publishExternalMessage(requestMessage, additionalMessageProperties, replyMessageHandler, userContext, requestDestination, replyTimeout);
    }

    @Override
    public InboundMessage publishAwaitResponse(OutboundMessage requestMessage, Topic requestDestination, long replyTimeout) throws PubSubPlusClientException.TimeoutException, PubSubPlusClientException.MessageRejectedByBrokerException, PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException, InterruptedException, IllegalArgumentException {
        return this.publishAwaitResponse(requestMessage, null, requestDestination, replyTimeout);
    }

    @Override
    public InboundMessage publishAwaitResponse(OutboundMessage requestMessage, Properties additionalMessageProperties, Topic requestDestination, long replyTimeout) throws PubSubPlusClientException.TimeoutException, PubSubPlusClientException.MessageRejectedByBrokerException, PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException, InterruptedException, IllegalArgumentException {
        Validation.nullIllegal(requestMessage, "requestMessage can't be null");
        Validation.nullIllegal(requestDestination, "requestDestination can't be null");
        Validation.smallerThanNumbersIllegal(1L, replyTimeout, "replyTimeout < 1");
        OutboundMessage outboundMessage = additionalMessageProperties == null || additionalMessageProperties.isEmpty() ? requestMessage : OutboundMessageBuilder.deepCopy(requestMessage, additionalMessageProperties);
        BytesXMLMessage solaceMessage = OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage(outboundMessage);
        try {
            this.outstandingRequestsTracker.incrementOutstandingRequestCount();
            this.validatePublisher();
            BytesXMLMessage response = this.requestor.request((XMLMessage)solaceMessage, replyTimeout, (Destination)requestDestination);
            InboundMessage inboundMessage = MessageReceiver.InboundMessageImpl.toInboundMessage(response);
            return inboundMessage;
        }
        catch (Exception e) {
            if (e instanceof ClosedFacilityException) {
                throw new PubSubPlusClientException(this.instanceName + " publisher is down.", e);
            }
            if (e instanceof JCSMPInterruptedException) {
                throw new PubSubPlusClientException.RequestInterruptedException(e.getMessage(), e);
            }
            if (e instanceof JCSMPRequestTimeoutException) {
                throw new PubSubPlusClientException.TimeoutException(e.getMessage());
            }
            throw new PubSubPlusClientException(e);
        }
        finally {
            this.outstandingRequestsTracker.decrementOutstandingRequestCount();
        }
    }

    @Override
    public ManageablePublisher.PublisherInfo publisherInfo() {
        return this.publisherInfo;
    }

    @Internal
    void publishExternalMessage(OutboundMessage requestMessage, Properties additionalMessageProperties, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Object userContextObject, Topic requestDestination, long replyTimeout) {
        OutboundMessage outboundMessage = additionalMessageProperties == null || additionalMessageProperties.isEmpty() ? requestMessage : OutboundMessageBuilder.deepCopy(requestMessage, additionalMessageProperties);
        CorrelationContext messageContext = new CorrelationContext(this.messageKeyProvider.nextLongKey(), userContextObject, outboundMessage, replyMessageHandler);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectCorrelationKey(outboundMessage, messageContext);
        BytesXMLMessage solaceMessage = OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage(outboundMessage);
        try {
            this.outstandingRequestsTracker.incrementOutstandingRequestCount();
            this.validatePublisher();
            RequestReplyListenerImpl callback = new RequestReplyListenerImpl(replyMessageHandler, this);
            this.requestor.request((XMLMessage)solaceMessage, (RequestReplyListener)callback, replyTimeout, (Destination)requestDestination, (Object)messageContext);
            this.invokeRequestPublishHook(outboundMessage, null);
        }
        catch (IllegalArgumentException ex) {
            this.outstandingRequestsTracker.decrementOutstandingRequestCount();
            this.invokeRequestPublishHook(outboundMessage, ex);
            throw ex;
        }
        catch (Exception e) {
            this.outstandingRequestsTracker.decrementOutstandingRequestCount();
            this.invokeRequestPublishHook(outboundMessage, e);
            this.errorNotificationDispatcher.onException(PublisherBuffers.Publishable.of(outboundMessage, requestDestination), e, userContextObject, replyMessageHandler);
        }
    }

    @Internal
    XMLMessageProducer createSessionDefaultProducer(JCSMPStreamingPublishEventHandler eventHandler) throws PubSubPlusClientException {
        try {
            ClientSession clientSession = this.serviceInternalView.getClientSession();
            if (clientSession.getDefaultProducer() == null) {
                clientSession.getDirectMessageProducer(eventHandler);
            }
            return clientSession.getDefaultProducer();
        }
        catch (Exception e) {
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    @Internal
    Requestor createRequester() throws PubSubPlusClientException {
        try {
            return this.serviceInternalView.getClientSession().createRequestor();
        }
        catch (Exception e) {
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    @Internal
    void validatePublisher() {
        if (this.isTerminating() || this.isTerminated()) {
            throw new IllegalStateException("Message publisher was terminated");
        }
        if (!this.isRunning()) {
            throw new IllegalStateException("Message publisher not started");
        }
    }

    @Internal
    void onStart() throws PubSubPlusClientException {
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        this.sessionDefaultProducer = this.createSessionDefaultProducer((JCSMPStreamingPublishEventHandler)this.publishEventHandler);
        this.requestor = this.createRequester();
        this.serviceInternalView.enableConsumerAPI();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Internal
    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        int lm;
        this.stateHolder.set(2);
        AtomicInteger lostMessages = new AtomicInteger(0);
        try {
            logger.debug((Object)(this.instanceName + " is being non gracefully terminated"));
            this.onTerminate(null, postTerminateNowTask, lostMessages);
            logger.debug((Object)(this.instanceName + " is terminated"));
            lm = lostMessages.get();
            if (lm <= 0) return;
        }
        catch (Throwable throwable) {
            int lm2 = lostMessages.get();
            if (lm2 <= 0) throw throwable;
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", lm2), lm2);
        }
        throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", lm), lm);
    }

    @Internal
    void onTerminate(Task<RequestReplyMessagePublisherImpl> preTerminationTask, Task<RequestReplyMessagePublisherImpl> postTerminationTask) {
        try {
            if (preTerminationTask != null) {
                preTerminationTask.run(this);
            }
            this.stateHolder.set(2);
            this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
            this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " publisher is shutdown"));
            }
        }
        finally {
            this.terminationNotificationDispatcher.close();
            this.errorNotificationDispatcher.close();
            if (postTerminationTask != null) {
                postTerminationTask.run(this);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Internal
    <C> void onTerminate(Task<RequestReplyMessagePublisherImpl> preTerminationTask, BiTask<RequestReplyMessagePublisherImpl, C> postTerminationTask, C c) {
        try {
            if (preTerminationTask != null) {
                preTerminationTask.run(this);
            }
            this.stateHolder.set(2);
            this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
            this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " publisher is shutdown"));
            }
        }
        finally {
            this.terminationNotificationDispatcher.close();
            this.errorNotificationDispatcher.close();
            if (postTerminationTask != null) {
                postTerminationTask.run(this, c);
            }
        }
    }

    @Internal
    @ProviderType
    public class OutStandingRequestTracker {
        private final AtomicInteger outstandingRequests = new AtomicInteger(0);
        private final Lock outstandingRequestsLock = new ReentrantLock();
        private final Condition zeroOutstandingRequestsCondition = this.outstandingRequestsLock.newCondition();

        public void incrementOutstandingRequestCount() {
            this.outstandingRequests.incrementAndGet();
        }

        public void decrementOutstandingRequestCount() {
            this.outstandingRequests.decrementAndGet();
            if (RequestReplyMessagePublisherImpl.this.gracefulShutdownInProgress) {
                this.outstandingRequestsLock.lock();
                try {
                    this.zeroOutstandingRequestsCondition.signalAll();
                }
                finally {
                    this.outstandingRequestsLock.unlock();
                }
            }
        }

        public void awaitTermination(long gracePeriod) throws InterruptedException {
            if (this.outstandingRequests.get() < 1) {
                return;
            }
            long remainingGracePeriod = gracePeriod;
            long endTime = System.currentTimeMillis() + gracePeriod;
            this.outstandingRequestsLock.lockInterruptibly();
            try {
                while (true) {
                    if (this.outstandingRequests.get() < 1 || remainingGracePeriod <= 0L) {
                        return;
                    }
                    boolean returnedBeforeTimeLapsed = this.zeroOutstandingRequestsCondition.await(remainingGracePeriod, TimeUnit.MILLISECONDS);
                    if (returnedBeforeTimeLapsed && this.outstandingRequests.get() < 1) {
                        return;
                    }
                    remainingGracePeriod = endTime - System.currentTimeMillis();
                }
            }
            finally {
                this.outstandingRequestsLock.unlock();
            }
        }

        public int getOutStandingRequests() {
            return this.outstandingRequests.get();
        }

        public void close() {
            this.outstandingRequests.set(0);
            try {
                this.zeroOutstandingRequestsCondition.signalAll();
            }
            finally {
                this.outstandingRequestsLock.unlock();
            }
        }
    }

    @Internal
    @ProviderType
    private class RequestReplyPublisherInfoImpl
    implements ManageablePublisher.PublisherInfo {
        private RequestReplyPublisherInfoImpl() {
        }

        @Override
        public String getInstanceName() {
            return RequestReplyMessagePublisherImpl.this.instanceName;
        }

        @Override
        public long getId() {
            return RequestReplyMessagePublisherImpl.this.id;
        }
    }

    @Internal
    @ProviderType
    private static class RequestReplyListenerImpl
    implements RequestReplyListener {
        final RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler;
        final RequestReplyMessagePublisherImpl parentPublisher;

        RequestReplyListenerImpl(RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, RequestReplyMessagePublisherImpl parentPublisher) {
            this.replyMessageHandler = replyMessageHandler;
            this.parentPublisher = parentPublisher;
        }

        public void onException(JCSMPException e, Object correlationKey) {
            this.invokeCallback(null, correlationKey, new PubSubPlusClientException(e));
        }

        public void onReply(BytesXMLMessage reply, Object correlationKey) {
            this.invokeCallback(reply, correlationKey, null);
        }

        public void onTimeout(Object correlationKey) {
            this.invokeCallback(null, correlationKey, new PubSubPlusClientException.TimeoutException("Request Timeout"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void invokeCallback(BytesXMLMessage reply, Object internalSolaceCorrelationKey, PubSubPlusClientException internalSolaceException) {
            try {
                PubSubPlusClientException apiException = null;
                if (internalSolaceCorrelationKey == null) {
                    apiException = new PubSubPlusClientException("correlation key can't be null");
                } else if (!(internalSolaceCorrelationKey instanceof CorrelationContext)) {
                    apiException = new PubSubPlusClientException("Unknown correlation key type: " + internalSolaceCorrelationKey.getClass().getCanonicalName());
                }
                PubSubPlusClientException exception = internalSolaceException != null ? internalSolaceException : apiException;
                CorrelationContext messageContext = (CorrelationContext)internalSolaceCorrelationKey;
                Object userContext = messageContext != null ? messageContext.userContext : null;
                InboundMessage replyMsg = reply != null ? MessageReceiver.InboundMessageImpl.toInboundMessage(reply) : null;
                try {
                    this.replyMessageHandler.onMessage(replyMsg, userContext, exception);
                }
                catch (Exception appException) {
                    if (logger.isWarnEnabled()) {
                        logger.warn((Object)"Application code threw an unhandled exception while processing reply in ReplyMessageHandler", (Throwable)appException);
                    }
                }
            }
            finally {
                this.parentPublisher.outstandingRequestsTracker.decrementOutstandingRequestCount();
            }
        }
    }

    @Internal
    @ProviderType
    static class CorrelationContext
    implements Serializable {
        private static final long serialVersionUID = -6070196827721125521L;
        private final Long correlationKey;
        private volatile Object userContext;
        private volatile PubSubPlusClientException exception;
        private volatile OutboundMessage linkedMessage;
        private transient RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler;

        CorrelationContext(Long correlationKey, Object userContext, OutboundMessage message, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
            this.correlationKey = correlationKey;
            this.userContext = userContext;
            this.linkedMessage = message;
            this.replyMessageHandler = replyMessageHandler;
        }

        public Long getCorrelationKey() {
            return this.correlationKey;
        }

        public Object getUserContext() {
            return this.userContext;
        }

        public PubSubPlusClientException getException() {
            return this.exception;
        }

        public RequestReplyMessagePublisher.ReplyMessageHandler getReplyMessageHandler() {
            return this.replyMessageHandler;
        }

        public void setException(PubSubPlusClientException exception) {
            this.exception = exception;
        }

        public OutboundMessage getLinkedMessage() {
            return this.linkedMessage;
        }

        public void clear() {
            this.linkedMessage = null;
            this.exception = null;
            this.userContext = null;
            this.replyMessageHandler = null;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            CorrelationContext context = (CorrelationContext)o;
            return Objects.equals(this.correlationKey, context.correlationKey);
        }

        public int hashCode() {
            return this.correlationKey != null ? this.correlationKey.hashCode() : 0;
        }
    }

    @Internal
    @ProviderType
    static class MessageCorrelationKeyProvider
    implements Serializable {
        private static final long serialVersionUID = -5420202750378500494L;
        final AtomicLong messageKeyProvider = new AtomicLong(ThreadLocalRandom.current().nextLong(1L, 1000L));

        MessageCorrelationKeyProvider() {
        }

        Long nextLongKey() {
            return this.messageKeyProvider.incrementAndGet();
        }
    }

    @Internal
    @ProviderType
    private class PublishFailureNotificationDispatcher {
        private final ExecutorService failureNotificationExecutorService;
        private final ReentrantLock closeLock;
        private final AtomicBoolean closed;

        private PublishFailureNotificationDispatcher() {
            this.failureNotificationExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(RequestReplyMessagePublisherImpl.this.instanceName + "-error-dispatcher"));
            this.closeLock = new ReentrantLock();
            this.closed = new AtomicBoolean(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void onException(PublisherBuffers.Publishable<Topic> publishable, Exception e, Object userContextObject, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
            if (e == null || publishable == null || replyMessageHandler == null) {
                return;
            }
            ScheduledFailureNotification notification = new ScheduledFailureNotification(publishable, e, userContextObject, replyMessageHandler);
            ReentrantLock cLock = this.closeLock;
            cLock.lock();
            try {
                if (this.closed.get()) {
                    return;
                }
                this.failureNotificationExecutorService.submit(notification);
            }
            catch (RejectedExecutionException ex) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)(RequestReplyMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread"));
                }
                try {
                    notification.call();
                }
                catch (Exception exc) {
                    logger.debug((Object)"Exception by customer callback during publish error notification processing", (Throwable)exc);
                }
            }
            finally {
                cLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void onException(Exception e, Object userContextObject, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
            if (e == null || replyMessageHandler == null) {
                return;
            }
            ScheduledFailureNotification notification = new ScheduledFailureNotification(e, userContextObject, replyMessageHandler);
            ReentrantLock cLock = this.closeLock;
            cLock.lock();
            try {
                if (this.closed.get()) {
                    return;
                }
                this.failureNotificationExecutorService.submit(notification);
            }
            catch (RejectedExecutionException ex) {
                logger.warn((Object)(RequestReplyMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread"));
                try {
                    notification.call();
                }
                catch (Exception exc) {
                    logger.debug((Object)"Exception by customer callback during publish error notification processing", (Throwable)exc);
                }
            }
            finally {
                cLock.unlock();
            }
        }

        void close() {
            ReentrantLock cLock = this.closeLock;
            cLock.lock();
            try {
                try {
                    if (this.closed.compareAndSet(false, true) && !this.failureNotificationExecutorService.isShutdown()) {
                        this.failureNotificationExecutorService.shutdown();
                    }
                }
                catch (Exception e) {
                    logger.warn((Object)"Problem with closing internal failure notification dispatcher", (Throwable)e);
                }
            }
            finally {
                cLock.unlock();
            }
        }

        @Internal
        @ProviderType
        class ScheduledFailureNotification
        implements Callable<Void> {
            final Exception e;
            final PublisherBuffers.Publishable<Topic> publishable;
            final Object userContextObject;
            final RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler;

            ScheduledFailureNotification(PublisherBuffers.Publishable<Topic> publishable, Exception e, Object userContextObject, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
                this.publishable = publishable;
                this.e = e;
                this.userContextObject = userContextObject;
                this.replyMessageHandler = replyMessageHandler;
            }

            ScheduledFailureNotification(Exception e, Object userContextObject, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
                this.e = e;
                this.userContextObject = userContextObject;
                this.replyMessageHandler = replyMessageHandler;
                this.publishable = PublisherBuffers.Publishable.none();
            }

            @Override
            public Void call() throws Exception {
                RequestReplyMessagePublisher.ReplyMessageHandler l = this.replyMessageHandler;
                if (l == null) {
                    return null;
                }
                l.onMessage(null, this.userContextObject, this.mapException(this.e));
                return null;
            }

            PubSubPlusClientException mapException(Exception e) {
                if (e instanceof PubSubPlusClientException) {
                    return (PubSubPlusClientException)e;
                }
                return new PubSubPlusClientException(e);
            }
        }
    }
}

