/*
 * Decompiled with CFR 0.152.
 */
package com.solace.messaging.publisher;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.DirectMessagePublisher;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PublisherBuffers;
import com.solace.messaging.publisher.PublisherHealthCheck;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.trace.propagation.MessageTracingSupport;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageablePublisher;
import com.solace.messaging.util.PublisherCongestionNotificationDispatcher;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.BiTask;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPInterruptedException;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import com.solacesystems.jcsmp.statistics.StatType;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
public class DirectMessagePublisherImpl
implements DirectMessagePublisher {
    private volatile XMLMessageProducer producer;
    private final PublishFailureNotificationDispatcher errorNotificationDispatcher;
    private final MessagingServiceInternalView serviceInternalView;
    private final TypedProperties publisherConfiguration;
    private final PublisherBuffers.PublisherBuffer<Topic> buffer;
    private final JCSMPStreamingPublishCorrelatingEventHandler publishEventHandler;
    private final OutboundMessageBuilder messageBuilder;
    private final ExecutorService publisherBotExecutorService;
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTED = 1;
    static final int STATE_TERMINATED = 2;
    final AtomicInteger stateHolder = new AtomicInteger(0);
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "DirectMessagePublisher@" + this.id;
    private volatile boolean gracefulShutdownInProgress = false;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0L);
    private static final Log logger = LogFactory.getLog(DirectMessagePublisherImpl.class);
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener;
    private final ClientSession.ClientSessionStateListener closedSessionListener;
    private final Task<DirectMessagePublisherImpl> postTerminationClearBufferSilentTask;
    private final ManageablePublisher.DirectPublisherInfo publisherInfo;
    private final TerminationNotificationDispatcher terminationNotificationDispatcher;
    private final PublisherCongestionNotificationDispatcher bufferCongestionNotificationDispatcher;
    private static final BiTask<DirectMessagePublisherImpl, AtomicInteger> postTerminateNowTask = (publisher, lostMessages) -> {
        block5: {
            try {
                int bufferSize = publisher.buffer.size();
                boolean bufferEmpty = bufferSize < 1;
                publisher.buffer.close(publisher::invokePublishFailureHookForDiscardedMessage);
                if (!bufferEmpty) {
                    lostMessages.set(bufferSize);
                    publisher.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, bufferSize);
                    if (logger.isWarnEnabled()) {
                        logger.warn((Object)(publisher.instanceName + " not gracefully terminated before all buffered messages were processed."));
                    }
                }
            }
            catch (PubSubPlusClientException.RequestInterruptedException e) {
                if (!logger.isWarnEnabled()) break block5;
                logger.warn((Object)("Non-graceful termination of " + publisher.instanceName + " was interrupted"));
            }
        }
        if (!publisher.publisherBotExecutorService.isShutdown()) {
            publisher.publisherBotExecutorService.shutdownNow();
        }
    };

    public DirectMessagePublisherImpl(MessagingServiceInternalView serviceInternalView, TypedProperties publisherConfiguration, OutboundMessageBuilder messageBuilder) {
        this.publisherBotExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-message-dispatcher"));
        this.publisherInfo = new DirectPublisherInfoImpl();
        this.serviceInternalView = serviceInternalView;
        this.terminationNotificationDispatcher = new TerminationNotificationDispatcher();
        this.errorNotificationDispatcher = new PublishFailureNotificationDispatcher();
        this.messageBuilder = this.configureMessageBuilder(messageBuilder);
        this.publishEventHandler = new JCSMPStreamingPublishCorrelatingEventHandler(){

            public void responseReceivedEx(Object key) {
            }

            public void handleErrorEx(Object key, JCSMPException cause, long timestamp) {
                Object context;
                if (key instanceof MessageTracingSupport && (context = ((MessageTracingSupport)key).getContext()) != null) {
                    DirectMessagePublisherImpl.this.postSendErrorHook(context, cause.toString());
                }
                if (logger.isErrorEnabled()) {
                    logger.error((Object)(DirectMessagePublisherImpl.this.instanceName + " encountered problem during message publishing."), (Throwable)cause);
                }
                DirectMessagePublisherImpl.this.errorNotificationDispatcher.onException((Exception)((Object)cause), timestamp);
            }

            public void handleError(String s, JCSMPException e, long l) {
            }

            public void responseReceived(String s) {
            }
        };
        this.publisherConfiguration = publisherConfiguration;
        this.buffer = PublisherBuffers.createBuffer(this.publisherConfiguration, this.serviceInternalView.getApiMetricsCollector());
        this.postTerminationClearBufferSilentTask = publisher -> {
            block5: {
                try {
                    int lostMessages = publisher.buffer.size();
                    boolean bufferEmpty = lostMessages < 1;
                    publisher.buffer.close(publisher::invokePublishFailureHookForDiscardedMessage);
                    if (!bufferEmpty) {
                        publisher.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, lostMessages);
                        if (logger.isWarnEnabled()) {
                            logger.warn((Object)(publisher.instanceName + " non-gracefully terminated before all buffered messages were processed."));
                        }
                    }
                }
                catch (PubSubPlusClientException.RequestInterruptedException e) {
                    if (!logger.isWarnEnabled()) break block5;
                    logger.warn((Object)("Non-graceful termination of " + publisher.instanceName + " was interrupted"));
                }
            }
            if (!this.publisherBotExecutorService.isShutdown()) {
                this.publisherBotExecutorService.shutdownNow();
            }
        };
        this.serviceInterruptionListener = new MessagingService.ServiceInterruptionListener(){

            @Override
            public void onServiceInterrupted(MessagingService.ServiceEvent e) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)"Shutting down publisher due to Service interruption");
                }
                if (DirectMessagePublisherImpl.this.stateHolder.getAndSet(2) < 2) {
                    DirectMessagePublisherImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(e.getTimestamp(), e.getMessage(), e.getCause()));
                    DirectMessagePublisherImpl.this.onTerminate(null, DirectMessagePublisherImpl.this.postTerminationClearBufferSilentTask);
                }
                if (DirectMessagePublisherImpl.this.producer != null) {
                    DirectMessagePublisherImpl.this.producer.close();
                }
            }
        };
        this.closedSessionListener = new ClientSession.ClientSessionStateListener(){

            @Override
            public void onClientSessionStateChange(ClientSession.ClientSessionStateChangeEvent event) {
                if (logger.isWarnEnabled()) {
                    logger.info((Object)"Shutting down publisher due to service closure");
                }
                if (DirectMessagePublisherImpl.this.stateHolder.getAndSet(2) < 2) {
                    DirectMessagePublisherImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(event.getTimestamp(), event.getMessage(), event.getCause()));
                    DirectMessagePublisherImpl.this.onTerminate(null, DirectMessagePublisherImpl.this.postTerminationClearBufferSilentTask);
                }
                if (DirectMessagePublisherImpl.this.producer != null) {
                    DirectMessagePublisherImpl.this.producer.close();
                }
            }
        };
        this.bufferCongestionNotificationDispatcher = new PublisherCongestionNotificationDispatcher(this);
        this.buffer.setBufferCongestionMonitor(this.bufferCongestionNotificationDispatcher, 1);
    }

    public void publishCompleteHook(Object context) {
    }

    public void publishFailureHook(Object context, String errorMessage) {
    }

    public void postSendErrorHook(Object context, String errorMessage) {
    }

    @Override
    public ManageablePublisher.DirectPublisherInfo publisherInfo() {
        return this.publisherInfo;
    }

    @Override
    public DirectMessagePublisher start() {
        if (1 == this.stateHolder.get()) {
            return this;
        }
        try {
            this.startAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " failed to start"));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        }
        catch (CancellationException e) {
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        }
        return this;
    }

    @Override
    public void terminate(long gracePeriod) throws PubSubPlusClientException {
        if (gracePeriod == 0L) {
            this.terminateNow();
            return;
        }
        try {
            this.terminateAsync(gracePeriod).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            throw new PubSubPlusClientException(e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher termination was canceled", e);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        int lm;
        AtomicInteger lostMessages = new AtomicInteger(0);
        try {
            logger.debug((Object)(this.instanceName + " is being non gracefully terminated"));
            this.onTerminate(null, postTerminateNowTask, lostMessages);
            logger.debug((Object)(this.instanceName + " is terminated"));
            lm = lostMessages.get();
            if (lm <= 0) return;
        }
        catch (Throwable throwable) {
            int lm2 = lostMessages.get();
            if (lm2 <= 0) throw throwable;
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", lm2), lm2);
        }
        throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", lm), lm);
    }

    @Override
    public boolean isRunning() {
        return 1 == this.stateHolder.get();
    }

    @Override
    public boolean isTerminated() {
        return 2 == this.stateHolder.get();
    }

    @Override
    public boolean isTerminating() {
        return this.gracefulShutdownInProgress;
    }

    @Override
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener listener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(listener);
    }

    @Override
    public <DirectMessagePublisher> CompletableFuture<DirectMessagePublisher> startAsync() throws PubSubPlusClientException {
        ExtendedCompletableFuture<DirectMessagePublisherImpl> onStart;
        boolean started;
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Publisher can't be started before it is connected to a messaging service");
        }
        int state = this.stateHolder.get();
        if (state == 2) {
            throw new IllegalStateException("Message publisher is already terminated");
        }
        do {
            boolean terminated;
            onStart = new ExtendedCompletableFuture<DirectMessagePublisherImpl>();
            boolean bl = terminated = 2 == this.stateHolder.get();
            if (terminated) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Publisher is already terminated"));
            }
            boolean starting = this.stateHolder.compareAndSet(0, 1);
            if (!starting) continue;
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " is being started"));
            }
            try {
                this.onStart();
                onStart.complete(this);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " is started"));
                }
            }
            catch (Exception e) {
                logger.error((Object)(this.instanceName + " failed to start and is terminating"), (Throwable)e);
                this.onTerminate(null, this.postTerminationClearBufferSilentTask);
                onStart.completeExceptionally(new IllegalStateException("Publisher is already closed due to internal error"));
            }
            return ExtendedCompletableFuture.onCancellation(onStart, (service, throwable) -> {
                this.stateHolder.set(2);
                this.onTerminate(null, this.postTerminationClearBufferSilentTask);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " async start was canceled"));
                }
            });
        } while (!(started = 1 == this.stateHolder.get()));
        onStart.complete(this);
        return onStart;
    }

    @Override
    public <DirectMessagePublisher> void startAsync(CompletionListener<DirectMessagePublisher> startListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(startListener, "Start listener can't be null");
        CompletableFuture<DirectMessagePublisher> onceConnected = this.startAsync();
        onceConnected.whenComplete((publisher, throwable) -> {
            block2: {
                try {
                    startListener.onCompletion(publisher, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    if (!logger.isErrorEnabled()) break block2;
                    logger.error((Object)(this.instanceName + " failed to start"), (Throwable)e);
                }
            }
        });
    }

    @Override
    public void setPublishFailureListener(DirectMessagePublisher.PublishFailureListener listener) {
        this.errorNotificationDispatcher.setPublishFailureListener(listener);
    }

    @Override
    public CompletableFuture<Void> terminateAsync(long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, gracePeriod, "Grace period < 1");
        ExtendedCompletableFuture<Void> term = new ExtendedCompletableFuture<Void>();
        Task<DirectMessagePublisherImpl> preTerminationTask = publisher -> {
            publisher.stateHolder.set(2);
            publisher.gracefulShutdownInProgress = true;
        };
        AtomicInteger lostMessages = new AtomicInteger(0);
        Task<DirectMessagePublisherImpl> postTerminationTask = publisher -> {
            block6: {
                try {
                    boolean publisherBufferEmpty = publisher.buffer.awaitEmpty(gracePeriod, TimeUnit.MILLISECONDS);
                    publisher.gracefulShutdownInProgress = false;
                    if (!publisherBufferEmpty) {
                        if (logger.isWarnEnabled()) {
                            logger.warn((Object)(publisher.instanceName + " gracefully terminated before all buffered messages were processed, not sufficient grace period of " + gracePeriod));
                        }
                        lostMessages.set(publisher.buffer.size());
                        this.onGracefulTerminateEmptyRemainingBuffer(publisher.buffer);
                    } else {
                        this.onGracefulTerminateEmptyRemainingBuffer(publisher.buffer);
                    }
                }
                catch (PubSubPlusClientException.RequestInterruptedException e) {
                    if (!logger.isWarnEnabled()) break block6;
                    logger.warn((Object)("Graceful termination of " + publisher.instanceName + " was interrupted"));
                }
            }
            if (!this.publisherBotExecutorService.isShutdown()) {
                this.publisherBotExecutorService.shutdownNow();
            }
        };
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is being terminated"));
        }
        this.onTerminate(preTerminationTask, postTerminationTask);
        int lm = lostMessages.get();
        if (lm > 0) {
            this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, lm);
            term.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to expiration of a grace period", lm), lm));
        } else {
            term.complete(null);
        }
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is terminated"));
        }
        return term;
    }

    @Override
    public void terminateAsync(CompletionListener<Void> terminationListener, long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(terminationListener, "Termination listener can't be null");
        CompletableFuture<Void> onceDisconnected = this.terminateAsync(gracePeriod);
        onceDisconnected.whenComplete((nothing, throwable) -> {
            try {
                terminationListener.onCompletion(null, throwable == null ? null : throwable.getCause());
            }
            catch (Exception e) {
                logger.warn((Object)(this.instanceName + "  encountered problem during termination."), (Throwable)e);
            }
        });
    }

    @Override
    public boolean isReady() {
        return this.isRunning() && this.buffer.remainingCapacity() > 0 && !this.gracefulShutdownInProgress;
    }

    @Override
    public void setPublisherReadinessListener(PublisherHealthCheck.PublisherReadinessListener listener) {
        this.bufferCongestionNotificationDispatcher.setPublisherReadinessListener(listener);
    }

    @Override
    public void notifyWhenReady() {
        this.bufferCongestionNotificationDispatcher.notifyWhenReady();
    }

    @Override
    public void publish(byte[] message, Topic destination) throws PubSubPlusClientException {
        this.validatePublisher();
        Validation.nullIllegal(message, "Message can't be null");
        Validation.nullIllegal(destination, "Message destination can't be null");
        OutboundMessage outboundMessage = this.messageBuilder.build(message);
        this.publishInternalMessage(outboundMessage, destination, null);
    }

    @Override
    public void publish(String message, Topic destination) throws PubSubPlusClientException {
        this.validatePublisher();
        Validation.nullIllegal(message, "Message can't be null");
        Validation.nullIllegal(destination, "Message destination can't be null");
        OutboundMessage outboundMessage = this.messageBuilder.build(message);
        this.publishInternalMessage(outboundMessage, destination, null);
    }

    @Override
    public void publish(OutboundMessage message, Topic destination) throws PubSubPlusClientException {
        this.publish(message, destination, null);
    }

    @Override
    public void publish(OutboundMessage message, Topic destination, Properties additionalMessageProperties) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        this.validatePublisher();
        Validation.nullIllegal(message, "Message can't be null");
        Validation.nullIllegal(destination, "Message destination can't be null");
        this.publishExternalMessage(message, destination, additionalMessageProperties);
    }

    @Internal
    void publishExternalMessage(OutboundMessage message, Topic destination, Properties additionalMessageProperties) {
        OutboundMessage outboundMessage = additionalMessageProperties == null || additionalMessageProperties.isEmpty() ? message : OutboundMessageBuilder.deepCopy(message, additionalMessageProperties);
        this.buffer.insert(PublisherBuffers.Publishable.of(outboundMessage, destination));
    }

    @Internal
    void publishInternalMessage(OutboundMessage message, Topic destination, Properties additionalMessageProperties) {
        OutboundMessage outboundMessage = additionalMessageProperties == null || additionalMessageProperties.isEmpty() ? message : OutboundMessageBuilder.OutboundMessageBuilderImpl.injectExtendedMessageProperties(message, additionalMessageProperties);
        this.buffer.insert(PublisherBuffers.Publishable.of(outboundMessage, destination));
    }

    void validatePublisher() {
        if (this.isTerminating() || this.isTerminated()) {
            throw new IllegalStateException("Message publisher was terminated");
        }
        if (!this.isRunning()) {
            throw new IllegalStateException("Message publisher not started");
        }
    }

    OutboundMessageBuilder configureMessageBuilder(OutboundMessageBuilder messageBuilder) {
        if (messageBuilder instanceof OutboundMessageBuilder.OutboundMessageBuilderImpl) {
            OutboundMessageBuilder.OutboundMessageBuilderImpl mBuilder = (OutboundMessageBuilder.OutboundMessageBuilderImpl)messageBuilder;
            mBuilder.forDirectMessagePublisher();
        }
        return messageBuilder;
    }

    void onStart() throws PubSubPlusClientException {
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        this.producer = this.createMessageProducer((JCSMPStreamingPublishEventHandler)this.publishEventHandler);
        try {
            this.publisherBotExecutorService.submit(() -> {
                XMLMessageProducer prOptimized = this.producer;
                boolean inGracefulShutdown = false;
                while ((this.isRunning() || (inGracefulShutdown = this.gracefulShutdownInProgress)) && !this.buffer.isClosed()) {
                    if (prOptimized != null) {
                        if (!prOptimized.isClosed()) {
                            if (Thread.currentThread().isInterrupted()) break;
                            PublisherBuffers.Publishable<Topic> nextMessageTask = this.buffer.consume();
                            if (nextMessageTask != null) {
                                Object context;
                                try {
                                    if (!prOptimized.isClosed()) {
                                        OutboundMessage message = nextMessageTask.getMessage();
                                        prOptimized.send((XMLMessage)OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage(message), (Destination)nextMessageTask.getDestination());
                                        context = ((MessageTracingSupport)message).getContext();
                                        if (context == null) continue;
                                        this.publishCompleteHook(context);
                                        continue;
                                    }
                                    this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                                    Object context2 = ((MessageTracingSupport)nextMessageTask.getMessage()).getContext();
                                    if (context2 != null) {
                                        this.publishFailureHook(context2, "Producer is closed");
                                    }
                                    if (this.gracefulShutdownInProgress) break;
                                    prOptimized = this.producer;
                                }
                                catch (Exception e) {
                                    this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                                    context = ((MessageTracingSupport)nextMessageTask.getMessage()).getContext();
                                    if (context != null) {
                                        this.publishFailureHook(context, e.getMessage());
                                    }
                                    if (!(e instanceof ClosedFacilityException) ? (e instanceof InterruptedException || e instanceof JCSMPInterruptedException) && (this.buffer.isClosed() || this.isTerminated()) : this.gracefulShutdownInProgress) break;
                                    if (logger.isErrorEnabled()) {
                                        logger.error((Object)(this.instanceName + " could not publish message to a broker. Message:" + nextMessageTask.getMessage() + ", destination: " + nextMessageTask.getDestination()), (Throwable)e);
                                    }
                                    prOptimized = this.producer;
                                    this.errorNotificationDispatcher.onException(nextMessageTask, e);
                                }
                                continue;
                            }
                            if (!this.buffer.isClosed() && !this.isTerminated()) continue;
                            break;
                        }
                        if (inGracefulShutdown) break;
                        try {
                            Thread.sleep(500L);
                        }
                        catch (InterruptedException e) {
                            if (this.buffer.isClosed() || this.isTerminated()) break;
                        }
                        prOptimized = this.producer;
                        continue;
                    }
                    this.onTerminate(null, this.postTerminationClearBufferSilentTask);
                    if (!logger.isErrorEnabled()) break;
                    logger.error((Object)(this.instanceName + " could not create an internal service to publish messages to a broker."));
                    break;
                }
                return null;
            });
        }
        catch (RejectedExecutionException e) {
            this.onTerminate(null, this.postTerminationClearBufferSilentTask);
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not create an internal service to publish messages to a broker."), (Throwable)e);
            }
            throw new PubSubPlusClientException("Internal service to publish messages to a broker could not be created due to rejection by internal executor service", e);
        }
    }

    void onTerminate(Task<DirectMessagePublisherImpl> preTerminationTask, Task<DirectMessagePublisherImpl> postTerminationTask) {
        try {
            if (preTerminationTask != null) {
                preTerminationTask.run(this);
            }
            this.stateHolder.set(2);
            this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
            this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
            this.buffer.setBufferCongestionMonitor(null, -1);
            this.bufferCongestionNotificationDispatcher.close();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " publisher is shutdown"));
            }
        }
        finally {
            this.terminationNotificationDispatcher.close();
            if (postTerminationTask != null) {
                postTerminationTask.run(this);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <C> void onTerminate(Task<DirectMessagePublisherImpl> preTerminationTask, BiTask<DirectMessagePublisherImpl, C> postTerminationTask, C c) {
        try {
            if (preTerminationTask != null) {
                preTerminationTask.run(this);
            }
            this.stateHolder.set(2);
            this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
            this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
            this.buffer.setBufferCongestionMonitor(null, -1);
            this.bufferCongestionNotificationDispatcher.close();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " publisher is shutdown"));
            }
        }
        finally {
            this.terminationNotificationDispatcher.close();
            if (postTerminationTask != null) {
                postTerminationTask.run(this, c);
            }
        }
    }

    XMLMessageProducer createMessageProducer(JCSMPStreamingPublishEventHandler eventHandler) throws PubSubPlusClientException {
        try {
            return this.serviceInternalView.getClientSession().getDirectMessageProducer(eventHandler);
        }
        catch (Exception e) {
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    private void invokePublishFailureHookForDiscardedMessage(PublisherBuffers.Publishable<Topic> publishable) {
        Object context = ((MessageTracingSupport)publishable.getMessage()).getContext();
        if (context != null) {
            this.publishFailureHook(context, "Publisher terminated before message could be sent");
        }
    }

    void onGracefulTerminateEmptyRemainingBuffer(PublisherBuffers.PublisherBuffer<Topic> buffer) {
        buffer.close(this::invokePublishFailureHookForDiscardedMessage);
    }

    @Internal
    @ProviderType
    private class PublishFailureNotificationDispatcher {
        private final ExecutorService failureNotificationExecutorService;
        private volatile DirectMessagePublisher.PublishFailureListener publishFailureListener;

        private PublishFailureNotificationDispatcher() {
            this.failureNotificationExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(DirectMessagePublisherImpl.this.instanceName + "-error-dispatcher"));
        }

        void onException(PublisherBuffers.Publishable<Topic> publishable, Exception e) {
            if (e == null || publishable == null) {
                return;
            }
            DirectMessagePublisher.PublishFailureListener l = this.publishFailureListener;
            if (l == null) {
                return;
            }
            ScheduledFailureNotification notification = new ScheduledFailureNotification(publishable, e, Instant.now().toEpochMilli());
            try {
                this.failureNotificationExecutorService.submit(notification);
            }
            catch (RejectedExecutionException ex) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)(DirectMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread"));
                }
                try {
                    notification.call();
                }
                catch (Exception exc) {
                    logger.debug((Object)"Exception by customer callback during publish error notification processing", (Throwable)exc);
                }
            }
        }

        void onException(Exception e) {
            this.onException(e, Instant.now().toEpochMilli());
        }

        void onException(Exception e, long timeStamp) {
            if (e == null) {
                return;
            }
            DirectMessagePublisher.PublishFailureListener l = this.publishFailureListener;
            if (l == null) {
                return;
            }
            ScheduledFailureNotification notification = new ScheduledFailureNotification(e, timeStamp);
            try {
                this.failureNotificationExecutorService.submit(notification);
            }
            catch (RejectedExecutionException ex) {
                logger.warn((Object)(DirectMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread"));
                try {
                    notification.call();
                }
                catch (Exception exc) {
                    logger.debug((Object)"Exception by customer callback during publish error notification processing", (Throwable)exc);
                }
            }
        }

        void setPublishFailureListener(DirectMessagePublisher.PublishFailureListener publishFailureListener) {
            this.publishFailureListener = publishFailureListener;
        }

        @Internal
        @ProviderType
        class ScheduledFailureNotification
        implements Callable<Void> {
            final Exception e;
            final long timeStamp;
            final PublisherBuffers.Publishable<Topic> publishable;

            ScheduledFailureNotification(PublisherBuffers.Publishable<Topic> publishable, Exception e, long timeStamp) {
                this.e = e;
                this.timeStamp = timeStamp;
                this.publishable = publishable;
            }

            ScheduledFailureNotification(Exception e, long timeStamp) {
                this.e = e;
                this.timeStamp = timeStamp;
                this.publishable = PublisherBuffers.Publishable.none();
            }

            @Override
            public Void call() throws Exception {
                DirectMessagePublisher.PublishFailureListener l = PublishFailureNotificationDispatcher.this.publishFailureListener;
                if (l == null) {
                    return null;
                }
                l.onFailedPublish(new DirectMessagePublisher.FailedPublishEvent(this.publishable.getMessage(), this.publishable.getDestination(), this.mapException(this.e), this.timeStamp));
                return null;
            }

            PubSubPlusClientException mapException(Exception e) {
                if (e instanceof PubSubPlusClientException) {
                    return (PubSubPlusClientException)e;
                }
                return new PubSubPlusClientException(e);
            }
        }
    }

    @Internal
    @ProviderType
    private class DirectPublisherInfoImpl
    implements ManageablePublisher.DirectPublisherInfo {
        private DirectPublisherInfoImpl() {
        }

        @Override
        public String getInstanceName() {
            return DirectMessagePublisherImpl.this.instanceName;
        }

        @Override
        public long getId() {
            return DirectMessagePublisherImpl.this.id;
        }
    }
}

