/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import com.azure.messaging.eventhubs.implementation.EventHubReactorSession;
import com.azure.messaging.eventhubs.implementation.EventHubSession;
import com.azure.messaging.eventhubs.implementation.ManagementChannel;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.util.HashMap;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class EventHubReactorAmqpConnection
extends ReactorConnection
implements EventHubAmqpConnection {
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private final ClientLogger logger;
    private final TokenCredential tokenCredential;
    private final String connectionId;
    private final ReactorProvider reactorProvider;
    private final ReactorHandlerProvider handlerProvider;
    private final AmqpLinkProvider linkProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final AmqpRetryOptions retryOptions;
    private final MessageSerializer messageSerializer;
    private final Scheduler scheduler;
    private final String eventHubName;
    private volatile ManagementChannel managementChannel;

    public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions connectionOptions, String eventHubName, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) {
        super(connectionId, connectionOptions, reactorProvider, handlerProvider, linkProvider, tokenManagerProvider, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, true);
        this.connectionId = connectionId;
        this.reactorProvider = reactorProvider;
        this.handlerProvider = handlerProvider;
        this.linkProvider = linkProvider;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.eventHubName = eventHubName;
        this.retryOptions = connectionOptions.getRetry();
        this.tokenCredential = connectionOptions.getTokenCredential();
        this.scheduler = connectionOptions.getScheduler();
        HashMap<String, String> loggingContext = new HashMap<String, String>(1);
        loggingContext.put("connectionId", connectionId);
        this.logger = new ClientLogger(EventHubReactorAmqpConnection.class, loggingContext);
    }

    @Override
    public Mono<EventHubManagementNode> getManagementNode() {
        if (this.isDisposed()) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot get management instance", this.connectionId))));
        }
        return this.getReactorConnection().then(Mono.fromCallable(this::getOrCreateManagementChannel));
    }

    @Override
    public Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions, String clientIdentifier) {
        return this.createSession(entityPath).cast(EventHubSession.class).flatMap(session -> {
            this.logger.atVerbose().addKeyValue("entityPath", entityPath).addKeyValue("clientIdentifier", clientIdentifier).log("Get or create producer.");
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)retryOptions);
            return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy, clientIdentifier);
        });
    }

    @Override
    public Mono<AmqpReceiveLink> createReceiveLink(String linkName, String entityPath, EventPosition eventPosition, ReceiveOptions options, String clientIdentifier) {
        return this.createSession(entityPath).cast(EventHubSession.class).flatMap(session -> {
            this.logger.atVerbose().addKeyValue("entityPath", entityPath).addKeyValue("clientIdentifier", clientIdentifier).log("Get or create consumer.");
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.retryOptions);
            return session.createConsumer(linkName, entityPath, this.retryOptions.getTryTimeout(), retryPolicy, eventPosition, options, clientIdentifier);
        });
    }

    public void dispose() {
        if (this.isDisposed()) {
            return;
        }
        if (this.managementChannel != null) {
            this.managementChannel.close();
        }
        super.dispose();
    }

    protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) {
        return new EventHubReactorSession(this, session, handler, sessionName, this.reactorProvider, this.handlerProvider, this.linkProvider, (Mono<ClaimsBasedSecurityNode>)this.getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.retryOptions, this.messageSerializer);
    }

    private synchronized ManagementChannel getOrCreateManagementChannel() {
        if (this.managementChannel == null) {
            this.managementChannel = new ManagementChannel((Mono<RequestResponseChannel>)this.createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS), this.eventHubName, this.tokenCredential, this.tokenManagerProvider, this.messageSerializer, this.scheduler);
        }
        return this.managementChannel;
    }
}

