/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import com.azure.messaging.eventhubs.implementation.EventHubReactorSession;
import com.azure.messaging.eventhubs.implementation.EventHubSession;
import com.azure.messaging.eventhubs.implementation.ManagementChannel;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;

public class EventHubReactorAmqpConnection
extends ReactorConnection
implements EventHubAmqpConnection {
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private final ClientLogger logger = new ClientLogger(EventHubReactorAmqpConnection.class);
    private final ConcurrentHashMap<String, AmqpSendLink> sendLinks = new ConcurrentHashMap();
    private final Mono<EventHubManagementNode> managementChannelMono;
    private final String connectionId;
    private final ReactorProvider reactorProvider;
    private final ReactorHandlerProvider handlerProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final AmqpRetryOptions retryOptions;
    private final MessageSerializer messageSerializer;

    public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, String product, String clientVersion) {
        super(connectionId, connectionOptions, reactorProvider, handlerProvider, tokenManagerProvider, messageSerializer, product, clientVersion);
        this.connectionId = connectionId;
        this.reactorProvider = reactorProvider;
        this.handlerProvider = handlerProvider;
        this.tokenManagerProvider = tokenManagerProvider;
        this.retryOptions = connectionOptions.getRetry();
        this.messageSerializer = messageSerializer;
        this.managementChannelMono = this.getReactorConnection().then(Mono.fromCallable(() -> new ManagementChannel((Mono<RequestResponseChannel>)this.createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS), connectionOptions.getEntityPath(), connectionOptions.getTokenCredential(), this.tokenManagerProvider, this.messageSerializer, connectionOptions.getScheduler()))).cache();
    }

    @Override
    public Mono<EventHubManagementNode> getManagementNode() {
        if (this.isDisposed()) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot get management instance", this.connectionId))));
        }
        return this.managementChannelMono;
    }

    @Override
    public Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions) {
        return this.createSession(entityPath).flatMap(session -> {
            this.logger.verbose("Get or create producer for path: '{}'", new Object[]{entityPath});
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)retryOptions);
            return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy).cast(AmqpSendLink.class);
        });
    }

    @Override
    public Mono<AmqpReceiveLink> createReceiveLink(String linkName, String entityPath, EventPosition eventPosition, ReceiveOptions options) {
        return this.createSession(entityPath).cast(EventHubSession.class).flatMap(session -> {
            this.logger.verbose("Get or create consumer for path: '{}'", new Object[]{entityPath});
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.retryOptions);
            return session.createConsumer(linkName, entityPath, this.retryOptions.getTryTimeout(), retryPolicy, eventPosition, options);
        });
    }

    public void dispose() {
        this.logger.info("Disposing of connection.", new Object[0]);
        this.sendLinks.forEach((key, value) -> value.dispose());
        this.sendLinks.clear();
        super.dispose();
    }

    protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) {
        return new EventHubReactorSession(session, handler, sessionName, this.reactorProvider, this.handlerProvider, (Mono<ClaimsBasedSecurityNode>)this.getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.retryOptions.getTryTimeout(), this.messageSerializer);
    }
}

