/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpExceptionHandler;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

public class ReactorExecutor
implements AsyncCloseable {
    private final ClientLogger logger;
    private final AtomicBoolean hasStarted = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final Object lock = new Object();
    private final Reactor reactor;
    private final Scheduler scheduler;
    private final Duration timeout;
    private final AmqpExceptionHandler exceptionHandler;
    private final String hostname;

    ReactorExecutor(Reactor reactor, Scheduler scheduler, String connectionId, AmqpExceptionHandler exceptionHandler, Duration timeout, String hostname) {
        this.reactor = Objects.requireNonNull(reactor, "'reactor' cannot be null.");
        this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        this.timeout = Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        this.exceptionHandler = Objects.requireNonNull(exceptionHandler, "'exceptionHandler' cannot be null.");
        this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null.");
        this.logger = new ClientLogger(ReactorExecutor.class, AmqpLoggingUtils.createContextWithConnectionId(connectionId));
    }

    public void start() {
        if (this.isDisposed.get()) {
            this.logger.warning("Cannot start reactor when executor has been disposed.");
            return;
        }
        if (this.hasStarted.getAndSet(true)) {
            this.logger.warning("ReactorExecutor has already started.");
            return;
        }
        this.logger.info("Starting reactor.");
        this.reactor.start();
        this.scheduler.schedule(this::run);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() {
        if (!this.isDisposed.get() && !this.hasStarted.get()) {
            this.logger.warning("Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked.");
            return;
        }
        boolean rescheduledReactor = false;
        try {
            boolean shouldReschedule;
            Object object = this.lock;
            synchronized (object) {
                shouldReschedule = this.hasStarted.get() && !Thread.interrupted() && this.reactor.process();
            }
            if (shouldReschedule) {
                try {
                    this.scheduler.schedule(this::run);
                    rescheduledReactor = true;
                }
                catch (RejectedExecutionException exception) {
                    this.logger.warning("Scheduling reactor failed because the scheduler has been shut down.", new Object[]{exception});
                    this.reactor.attachments().set(RejectedExecutionException.class, RejectedExecutionException.class, (Object)exception);
                }
            }
        }
        catch (HandlerException handlerException) {
            Throwable cause = handlerException.getCause() == null ? handlerException : handlerException.getCause();
            this.logger.warning("Unhandled exception while processing events in reactor, report this error.", new Object[]{handlerException});
            String message = !CoreUtils.isNullOrEmpty((CharSequence)cause.getMessage()) ? cause.getMessage() : (!CoreUtils.isNullOrEmpty((CharSequence)handlerException.getMessage()) ? handlerException.getMessage() : "Reactor encountered unrecoverable error");
            AmqpErrorContext errorContext = new AmqpErrorContext(this.hostname);
            AmqpException exception = cause instanceof UnresolvedAddressException ? new AmqpException(true, String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Check correctness of namespace information. %s", message, StringUtil.getTrackingIdAndTimeToLog()), cause, errorContext) : new AmqpException(true, String.format(Locale.US, "%s, %s", message, StringUtil.getTrackingIdAndTimeToLog()), cause, errorContext);
            this.exceptionHandler.onConnectionError((Throwable)((Object)exception));
        }
        finally {
            if (!rescheduledReactor) {
                if (this.hasStarted.getAndSet(false)) {
                    this.logger.verbose("Scheduling reactor to complete pending tasks.");
                    this.scheduleCompletePendingTasks();
                } else {
                    String reason = "Stopping the reactor because thread was interrupted or the reactor has no more events to process.";
                    this.logger.info("Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                    this.close("Stopping the reactor because thread was interrupted or the reactor has no more events to process.", true);
                }
            }
        }
    }

    private void scheduleCompletePendingTasks() {
        Runnable work = () -> {
            this.logger.info("Processing all pending tasks and closing old reactor.");
            try {
                if (this.reactor.process()) {
                    this.logger.verbose("Had more tasks to process on reactor but it is shutting down.");
                }
                this.reactor.stop();
            }
            catch (HandlerException e) {
                this.logger.atWarning().log(() -> StringUtil.toStackTraceString(e, "scheduleCompletePendingTasks - exception occurred while  processing events."));
            }
            finally {
                try {
                    this.reactor.free();
                }
                catch (IllegalStateException illegalStateException) {}
                this.close("Finished processing pending tasks.", false);
            }
        };
        try {
            this.scheduler.schedule(work, this.timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            this.logger.warning("Scheduler was already closed. Manually releasing reactor.");
            work.run();
        }
    }

    private void close(String reason, boolean initiatedByClient) {
        this.logger.verbose("Completing close and disposing scheduler. {}", new Object[]{reason});
        this.scheduler.dispose();
        this.isClosedMono.emitEmpty((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).log("Unable to emit close event on reactor");
            return false;
        });
        this.exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, initiatedByClient, reason));
    }

    public Mono<Void> closeAsync() {
        if (this.isDisposed.getAndSet(true)) {
            return this.isClosedMono.asMono();
        }
        if (this.hasStarted.get()) {
            this.scheduleCompletePendingTasks();
        } else {
            this.close("Closing based on user-invoked close operation.", true);
        }
        return this.isClosedMono.asMono();
    }
}

