/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

final class ProtonSession {
    private static final String SESSION_NOT_OPENED = "session has not been opened.";
    private static final String NOT_OPENING_DISPOSED_SESSION = "session is already disposed, not opening.";
    private static final String DISPOSED_MESSAGE_FORMAT = "Cannot create %s from a closed session.";
    private static final String REACTOR_CLOSED_MESSAGE_FORMAT = "connectionId:[%s] sessionName:[%s] connection-reactor is disposed.";
    private static final String OBTAIN_CHANNEL_TIMEOUT_MESSAGE_FORMAT = "connectionId:[%s] sessionName:[%s] obtaining channel (%s) timed out.";
    private final AtomicReference<Resource> resource = new AtomicReference<Resource>(Resource.access$000());
    private final AtomicBoolean opened = new AtomicBoolean(false);
    private final Sinks.Empty<Void> openAwaiter = Sinks.empty();
    private final Connection connection;
    private final ReactorProvider reactorProvider;
    private final SessionHandler handler;
    private final String id;
    private final ClientLogger logger;

    ProtonSession(String connectionId, String hostname, Connection connection, ReactorHandlerProvider handlerProvider, ReactorProvider reactorProvider, String sessionName, Duration openTimeout, ClientLogger logger) {
        this.connection = Objects.requireNonNull(connection, "'connection' cannot be null.");
        this.reactorProvider = Objects.requireNonNull(reactorProvider, "'reactorProvider' cannot be null.");
        Objects.requireNonNull(handlerProvider, "'handlerProvider' cannot be null.");
        this.handler = handlerProvider.createSessionHandler(connectionId, hostname, sessionName, openTimeout);
        this.id = this.handler.getId();
        this.logger = Objects.requireNonNull(logger, "'logger' cannot be null.");
    }

    String getId() {
        return this.id;
    }

    String getName() {
        return this.handler.getSessionName();
    }

    String getConnectionId() {
        return this.handler.getConnectionId();
    }

    String getHostname() {
        return this.handler.getHostname();
    }

    AmqpErrorContext getErrorContext() {
        return this.handler.getErrorContext();
    }

    Flux<EndpointState> getEndpointStates() {
        return this.handler.getEndpointStates();
    }

    ReactorProvider getReactorProvider() {
        return this.reactorProvider;
    }

    Mono<Void> open() {
        if (this.opened.getAndSet(true)) {
            return this.openAwaiter.asMono();
        }
        try {
            this.getReactorProvider().getReactorDispatcher().invoke(() -> {
                Session session = this.connection.session();
                BaseHandler.setHandler((Extendable)session, (Handler)this.handler);
                session.open();
                this.logger.atInfo().addKeyValue("sessionName", this.handler.getSessionName()).addKeyValue("sessionId", this.id).log("session local open scheduled.");
                if (this.resource.compareAndSet(Resource.EMPTY, new Resource(session))) {
                    this.openAwaiter.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    session.close();
                    if (this.resource.get() == Resource.DISPOSED) {
                        this.openAwaiter.emitError((Throwable)((Object)new ProtonSessionClosedException(NOT_OPENING_DISPOSED_SESSION)), Sinks.EmitFailureHandler.FAIL_FAST);
                    } else {
                        this.openAwaiter.emitError((Throwable)new IllegalStateException("session is already opened."), Sinks.EmitFailureHandler.FAIL_FAST);
                    }
                }
            });
        }
        catch (Exception e) {
            if (e instanceof IOException | e instanceof RejectedExecutionException) {
                String message = String.format(REACTOR_CLOSED_MESSAGE_FORMAT, this.getConnectionId(), this.getName());
                this.openAwaiter.emitError((Throwable)((Object)this.retriableAmqpError(null, message, e)), Sinks.EmitFailureHandler.FAIL_FAST);
            }
            this.openAwaiter.emitError((Throwable)e, Sinks.EmitFailureHandler.FAIL_FAST);
        }
        return this.openAwaiter.asMono();
    }

    Mono<ProtonChannel> channel(String name, Duration timeout) {
        Mono channel = Mono.create(sink -> {
            if (name == null) {
                sink.error((Throwable)new NullPointerException("'name' cannot be null."));
                return;
            }
            try {
                this.resource.get().validate(this.logger, "channel");
            }
            catch (ProtonSessionClosedException | IllegalStateException e) {
                sink.error((Throwable)e);
                return;
            }
            try {
                this.getReactorProvider().getReactorDispatcher().invoke(() -> {
                    Session session;
                    try {
                        session = this.getSession("channel");
                    }
                    catch (ProtonSessionClosedException | IllegalStateException e) {
                        sink.error((Throwable)e);
                        return;
                    }
                    String senderName = name + ":sender";
                    String receiverName = name + ":receiver";
                    sink.success((Object)new ProtonChannel(name, session.sender(senderName), session.receiver(receiverName)));
                });
            }
            catch (Exception e) {
                if (e instanceof IOException | e instanceof RejectedExecutionException) {
                    String message = String.format(REACTOR_CLOSED_MESSAGE_FORMAT, this.getConnectionId(), this.getName());
                    sink.error((Throwable)((Object)this.retriableAmqpError(null, message, e)));
                }
                sink.error((Throwable)e);
            }
        });
        return channel.timeout(timeout, Mono.error(() -> {
            String message = String.format(OBTAIN_CHANNEL_TIMEOUT_MESSAGE_FORMAT, this.getConnectionId(), this.getName(), name);
            return this.retriableAmqpError(AmqpErrorCondition.TIMEOUT_ERROR, message, null);
        }));
    }

    Sender senderUnsafe(String name) {
        Session session = this.getSession("sender link");
        return session.sender(name);
    }

    Receiver receiverUnsafe(String name) {
        Session session = this.getSession("receive link");
        return session.receiver(name);
    }

    void beginClose(ErrorCondition errorCondition) {
        Resource s = this.resource.getAndSet(Resource.DISPOSED);
        if (s == Resource.EMPTY || s == Resource.DISPOSED) {
            return;
        }
        Session session = s.value();
        if (session.getLocalState() != EndpointState.CLOSED) {
            session.close();
            if (errorCondition != null && session.getCondition() == null) {
                session.setCondition(errorCondition);
            }
        }
    }

    void endClose() {
        this.handler.close();
    }

    private Session getSession(String endpointType) {
        Resource r = this.resource.get();
        r.validate(this.logger, endpointType);
        return r.value();
    }

    private AmqpException retriableAmqpError(AmqpErrorCondition condition, String message, Throwable cause) {
        return new AmqpException(true, condition, message, cause, this.handler.getErrorContext());
    }

    private static final class Resource {
        private static final Resource EMPTY = new Resource();
        private static final Resource DISPOSED = new Resource();
        private final Session session;

        Resource(Session session) {
            this.session = Objects.requireNonNull(session, "'session' cannot be null.");
        }

        Session value() {
            assert (this != EMPTY);
            return this.session;
        }

        void validate(ClientLogger logger, String endpointType) {
            if (this == EMPTY) {
                throw logger.logExceptionAsError((RuntimeException)new IllegalStateException(ProtonSession.SESSION_NOT_OPENED));
            }
            if (this == DISPOSED) {
                throw logger.logExceptionAsWarning((RuntimeException)((Object)new ProtonSessionClosedException(String.format(ProtonSession.DISPOSED_MESSAGE_FORMAT, endpointType))));
            }
        }

        private Resource() {
            this.session = null;
        }
    }

    static final class ProtonSessionClosedException
    extends AmqpException {
        private ProtonSessionClosedException(String message) {
            super(true, message, null);
        }
    }

    static final class ProtonChannel {
        private final String name;
        private final Sender sender;
        private final Receiver receiver;

        ProtonChannel(String name, Sender sender, Receiver receiver) {
            this.name = name;
            this.sender = sender;
            this.receiver = receiver;
        }

        String getName() {
            return this.name;
        }

        Sender getSender() {
            return this.sender;
        }

        Receiver getReceiver() {
            return this.receiver;
        }
    }
}

