/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.implementation.handler.DispatchHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.Selectable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public final class ReactorDispatcher {
    private final ClientLogger logger = new ClientLogger(ReactorDispatcher.class);
    private final CloseHandler onClose;
    private final String connectionId;
    private final Reactor reactor;
    private final Pipe ioSignal;
    private final ConcurrentLinkedQueue<Work> workQueue;
    private final WorkScheduler workScheduler;
    private final AtomicInteger wip = new AtomicInteger();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final Sinks.One<AmqpShutdownSignal> shutdownSignal = Sinks.one();

    public ReactorDispatcher(String connectionId, Reactor reactor, Pipe ioSignal) {
        this.connectionId = connectionId;
        this.reactor = reactor;
        this.ioSignal = ioSignal;
        this.workQueue = new ConcurrentLinkedQueue();
        this.onClose = new CloseHandler();
        this.workScheduler = new WorkScheduler();
        Selectable schedulerSelectable = this.reactor.selectable();
        schedulerSelectable.setChannel((SelectableChannel)this.ioSignal.source());
        schedulerSelectable.onReadable((Selectable.Callback)this.workScheduler);
        schedulerSelectable.onFree((Selectable.Callback)this.onClose);
        schedulerSelectable.setReading(true);
        this.reactor.update(schedulerSelectable);
    }

    public Mono<AmqpShutdownSignal> getShutdownSignal() {
        return this.shutdownSignal.asMono();
    }

    public void invoke(Runnable work) throws IOException {
        this.throwIfSchedulerError();
        this.workQueue.offer(new Work(work));
        this.signalWorkQueue();
    }

    public void invoke(Runnable work, Duration delay) throws IOException {
        this.throwIfSchedulerError();
        this.workQueue.offer(new Work(work, delay));
        this.signalWorkQueue();
    }

    private void throwIfSchedulerError() {
        RejectedExecutionException rejectedException = (RejectedExecutionException)this.reactor.attachments().get(RejectedExecutionException.class, RejectedExecutionException.class);
        if (rejectedException != null) {
            throw this.logger.logExceptionAsError((RuntimeException)new RejectedExecutionException(rejectedException.getMessage(), rejectedException));
        }
        if (!this.ioSignal.sink().isOpen()) {
            throw this.logger.logExceptionAsError((RuntimeException)new RejectedExecutionException("ReactorDispatcher instance is closed."));
        }
    }

    private void signalWorkQueue() throws IOException {
        try {
            ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
            while (this.ioSignal.sink().write(oneByteBuffer) == 0) {
                oneByteBuffer = ByteBuffer.allocate(1);
            }
        }
        catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
            if (!this.isClosed.get()) {
                this.logger.warning("connectionId[{}] signalWorkQueue failed before reactor closed.", new Object[]{this.connectionId, ignorePipeClosedDuringReactorShutdown});
                this.shutdownSignal.emitError((Throwable)new RuntimeException(String.format("connectionId[%s] IO Sink was interrupted before reactor closed.", this.connectionId), ignorePipeClosedDuringReactorShutdown), Sinks.EmitFailureHandler.FAIL_FAST);
            }
            this.logger.verbose("connectionId[{}] signalWorkQueue failed with an error after closed. Can be ignored.", new Object[]{this.connectionId, ignorePipeClosedDuringReactorShutdown});
        }
    }

    private static final class Work {
        private final DispatchHandler dispatchHandler;
        private final Duration delay;

        Work(Runnable work) {
            this(work, null);
        }

        Work(Runnable work, Duration delay) {
            this.dispatchHandler = new DispatchHandler(work);
            this.delay = delay;
        }
    }

    private final class CloseHandler
    implements Selectable.Callback {
        private CloseHandler() {
        }

        public void run(Selectable selectable) {
            if (ReactorDispatcher.this.isClosed.getAndSet(true)) {
                return;
            }
            ReactorDispatcher.this.logger.info("connectionId[{}] Reactor selectable is being disposed.", new Object[]{ReactorDispatcher.this.connectionId});
            ReactorDispatcher.this.shutdownSignal.emitValue((Object)new AmqpShutdownSignal(false, false, String.format("connectionId[%s] Reactor selectable is disposed.", ReactorDispatcher.this.connectionId)), Sinks.EmitFailureHandler.FAIL_FAST);
            try {
                if (ReactorDispatcher.this.ioSignal.sink().isOpen()) {
                    ReactorDispatcher.this.ioSignal.sink().close();
                }
            }
            catch (IOException ioException) {
                ReactorDispatcher.this.logger.error("connectionId[{}] CloseHandler.sink().close() failed with an error.", new Object[]{ReactorDispatcher.this.connectionId, ioException});
            }
            ReactorDispatcher.this.workScheduler.run(null);
            try {
                if (ReactorDispatcher.this.ioSignal.source().isOpen()) {
                    ReactorDispatcher.this.ioSignal.source().close();
                }
            }
            catch (IOException ioException) {
                ReactorDispatcher.this.logger.error("connectionId[{}] CloseHandler.source().close() failed with an error.", new Object[]{ReactorDispatcher.this.connectionId, ioException});
            }
        }
    }

    private final class WorkScheduler
    implements Selectable.Callback {
        private WorkScheduler() {
        }

        public void run(Selectable selectable) {
            if (ReactorDispatcher.this.wip.getAndIncrement() != 0) {
                return;
            }
            int missed = 1;
            while (missed != 0) {
                Work topWork;
                try {
                    ByteBuffer oneKbByteBuffer = ByteBuffer.allocate(1024);
                    while (ReactorDispatcher.this.ioSignal.source().read(oneKbByteBuffer) > 0) {
                        oneKbByteBuffer = ByteBuffer.allocate(1024);
                    }
                }
                catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
                    if (!ReactorDispatcher.this.isClosed.get()) {
                        ReactorDispatcher.this.logger.warning("connectionId[{}] WorkScheduler.run() failed before reactor was closed.", new Object[]{ReactorDispatcher.this.connectionId, ignorePipeClosedDuringReactorShutdown});
                        ReactorDispatcher.this.shutdownSignal.emitError((Throwable)new RuntimeException(String.format("connectionId[%s] IO Source was interrupted before reactor closed.", ReactorDispatcher.this.connectionId), ignorePipeClosedDuringReactorShutdown), Sinks.EmitFailureHandler.FAIL_FAST);
                        break;
                    }
                    ReactorDispatcher.this.logger.verbose("connectionId[{}] WorkScheduler.run() failed with an error. Can be ignored.", new Object[]{ReactorDispatcher.this.connectionId, ignorePipeClosedDuringReactorShutdown});
                    break;
                }
                catch (IOException ioException) {
                    ReactorDispatcher.this.shutdownSignal.emitError((Throwable)ReactorDispatcher.this.logger.logExceptionAsError(new RuntimeException(String.format("connectionId[%s] WorkScheduler.run() failed with an error.", ReactorDispatcher.this.connectionId), ioException)), Sinks.EmitFailureHandler.FAIL_FAST);
                    break;
                }
                while ((topWork = (Work)ReactorDispatcher.this.workQueue.poll()) != null) {
                    if (topWork.delay != null) {
                        ReactorDispatcher.this.reactor.schedule((int)topWork.delay.toMillis(), (Handler)topWork.dispatchHandler);
                        continue;
                    }
                    topWork.dispatchHandler.onTimerTask(null);
                }
                missed = ReactorDispatcher.this.wip.addAndGet(-missed);
            }
        }
    }
}

