/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;

public class RequestResponseChannel
implements Disposable {
    private static final String STATUS_CODE = "status-code";
    private static final String STATUS_DESCRIPTION = "status-description";
    private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends = new ConcurrentSkipListMap();
    private final ClientLogger logger = new ClientLogger(RequestResponseChannel.class);
    private final ReplayProcessor<AmqpEndpointState> endpointStates = ReplayProcessor.cacheLastOrDefault((Object)((Object)AmqpEndpointState.UNINITIALIZED));
    private final FluxSink<AmqpEndpointState> endpointStatesSink = this.endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
    private final Sender sendLink;
    private final Receiver receiveLink;
    private final String replyTo;
    private final MessageSerializer messageSerializer;
    private final ReactorProvider provider;
    private final Duration operationTimeout;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean hasOpened = new AtomicBoolean();
    private final AtomicLong requestId = new AtomicLong(0L);
    private final SendLinkHandler sendLinkHandler;
    private final ReceiveLinkHandler receiveLinkHandler;
    private final Disposable.Composite subscriptions;
    private final AmqpRetryPolicy retryPolicy;

    RequestResponseChannel(String connectionId, String fullyQualifiedNamespace, String linkName, String entityPath, Session session, AmqpRetryOptions retryOptions, ReactorHandlerProvider handlerProvider, ReactorProvider provider, MessageSerializer messageSerializer) {
        this.provider = provider;
        this.operationTimeout = retryOptions.getTryTimeout();
        this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
        this.replyTo = entityPath.replace("$", "") + "-client-reply-to";
        this.messageSerializer = messageSerializer;
        this.sendLink = session.sender(linkName + ":sender");
        Target target = new Target();
        target.setAddress(entityPath);
        this.sendLink.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        this.sendLink.setSource((org.apache.qpid.proton.amqp.transport.Source)new Source());
        this.sendLink.setSenderSettleMode(SenderSettleMode.SETTLED);
        this.sendLinkHandler = handlerProvider.createSendLinkHandler(connectionId, fullyQualifiedNamespace, linkName, entityPath);
        BaseHandler.setHandler((Extendable)this.sendLink, (Handler)this.sendLinkHandler);
        this.receiveLink = session.receiver(linkName + ":receiver");
        Source source = new Source();
        source.setAddress(entityPath);
        this.receiveLink.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        Target receiverTarget = new Target();
        receiverTarget.setAddress(this.replyTo);
        this.receiveLink.setTarget((org.apache.qpid.proton.amqp.transport.Target)receiverTarget);
        this.receiveLink.setSenderSettleMode(SenderSettleMode.SETTLED);
        this.receiveLink.setReceiverSettleMode(ReceiverSettleMode.SECOND);
        this.receiveLinkHandler = handlerProvider.createReceiveLinkHandler(connectionId, fullyQualifiedNamespace, linkName, entityPath);
        BaseHandler.setHandler((Extendable)this.receiveLink, (Handler)this.receiveLinkHandler);
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.receiveLinkHandler.getDeliveredMessages().map(this::decodeDelivery).subscribe(message -> {
            this.logger.verbose("Settling message: {}", new Object[]{message.getCorrelationId()});
            this.settleMessage((Message)message);
        }, this::handleException), this.receiveLinkHandler.getEndpointStates().subscribe(state -> this.endpointStatesSink.next((Object)AmqpEndpointStateUtil.getConnectionState(state)), error -> {
            this.endpointStatesSink.error(error);
            this.dispose();
        }, () -> this.dispose()), this.receiveLinkHandler.getErrors().subscribe(error -> {
            this.endpointStatesSink.error(error);
            this.dispose();
        }), this.sendLinkHandler.getEndpointStates().subscribe(state -> this.endpointStatesSink.next((Object)AmqpEndpointStateUtil.getConnectionState(state)), error -> {
            this.endpointStatesSink.error(error);
            this.dispose();
        }, () -> this.dispose()), this.sendLinkHandler.getErrors().subscribe(error -> {
            this.endpointStatesSink.error(error);
            this.dispose();
        })});
    }

    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.distinct();
    }

    public void dispose() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.subscriptions.dispose();
        this.sendLink.close();
        this.receiveLink.close();
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public Mono<Message> sendWithAck(Message message) {
        if (this.isDisposed()) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Cannot send a message when request response channel is disposed.")));
        }
        if (!this.hasOpened.getAndSet(true)) {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    this.sendLink.open();
                    this.receiveLink.open();
                });
            }
            catch (IOException e) {
                return Mono.error((Throwable)new RuntimeException("Unable to open send and receive link.", e));
            }
        }
        if (message == null) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("message cannot be null")));
        }
        if (message.getMessageId() != null) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("message.getMessageId() should be null")));
        }
        if (message.getReplyTo() != null) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("message.getReplyTo() should be null")));
        }
        UnsignedLong messageId = UnsignedLong.valueOf((long)this.requestId.incrementAndGet());
        message.setMessageId((Object)messageId);
        message.setReplyTo(this.replyTo);
        return RetryUtil.withRetry(Mono.when((Publisher[])new Publisher[]{this.sendLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE), this.receiveLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE)}), this.operationTimeout, this.retryPolicy).then(Mono.create(sink -> {
            try {
                this.logger.verbose("Scheduling on dispatcher. Message Id {}", new Object[]{messageId});
                this.unconfirmedSends.putIfAbsent(messageId, (MonoSink<Message>)sink);
                this.provider.getReactorDispatcher().invoke(() -> {
                    this.sendLink.delivery(UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8));
                    int payloadSize = this.messageSerializer.getSize(message) + 512;
                    byte[] bytes = new byte[payloadSize];
                    int encodedSize = message.encode(bytes, 0, payloadSize);
                    this.receiveLink.flow(1);
                    this.sendLink.send(bytes, 0, encodedSize);
                    this.sendLink.advance();
                });
            }
            catch (IOException e) {
                sink.error((Throwable)e);
            }
        }));
    }

    private Message decodeDelivery(Delivery delivery) {
        Message response = Proton.message();
        int msgSize = delivery.pending();
        byte[] buffer = new byte[msgSize];
        int read = this.receiveLink.recv(buffer, 0, msgSize);
        response.decode(buffer, 0, read);
        delivery.settle();
        return response;
    }

    private void settleMessage(Message message) {
        String id = String.valueOf(message.getCorrelationId());
        UnsignedLong correlationId = UnsignedLong.valueOf((String)id);
        MonoSink<Message> sink = this.unconfirmedSends.remove(correlationId);
        if (sink == null) {
            int size = this.unconfirmedSends.size();
            this.logger.warning("Received delivery without pending messageId[{}]. Size[{}]", new Object[]{id, size});
            return;
        }
        int statusCode = (Integer)message.getApplicationProperties().getValue().get(STATUS_CODE);
        if (statusCode != AmqpResponseCode.ACCEPTED.getValue() && statusCode != AmqpResponseCode.OK.getValue()) {
            String statusDescription = (String)message.getApplicationProperties().getValue().get(STATUS_DESCRIPTION);
            sink.error((Throwable)ExceptionUtil.amqpResponseCodeToException(statusCode, statusDescription, this.receiveLinkHandler.getErrorContext((Link)this.receiveLink)));
        } else {
            sink.success((Object)message);
        }
    }

    private void handleException(Throwable error) {
        AmqpException exception;
        if (error instanceof AmqpException && !(exception = (AmqpException)((Object)error)).isTransient()) {
            this.logger.error("Exception encountered. Closing channel and clearing unconfirmed sends.", new Object[]{exception});
            this.dispose();
            this.unconfirmedSends.forEach((key, value) -> value.error(error));
        }
    }
}

