/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;

public class ReactorReceiver
implements AmqpReceiveLink {
    private final AtomicBoolean hasAuthorized = new AtomicBoolean(true);
    private final String entityPath;
    private final Receiver receiver;
    private final ReceiveLinkHandler handler;
    private final TokenManager tokenManager;
    private final ReactorDispatcher dispatcher;
    private final Disposable.Composite subscriptions;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final EmitterProcessor<Message> messagesProcessor = EmitterProcessor.create();
    private FluxSink<Message> messageSink = this.messagesProcessor.sink();
    private final ClientLogger logger = new ClientLogger(ReactorReceiver.class);
    private final ReplayProcessor<AmqpEndpointState> endpointStates = ReplayProcessor.cacheLastOrDefault((Object)((Object)AmqpEndpointState.UNINITIALIZED));
    private FluxSink<AmqpEndpointState> endpointStateSink = this.endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
    private volatile Supplier<Integer> creditSupplier;

    ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher) {
        this.entityPath = entityPath;
        this.receiver = receiver;
        this.handler = handler;
        this.tokenManager = tokenManager;
        this.dispatcher = dispatcher;
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.handler.getDeliveredMessages().subscribe(this::decodeDelivery), this.handler.getEndpointStates().subscribe(state -> {
            this.logger.verbose("Connection state: {}", new Object[]{state});
            this.endpointStateSink.next((Object)AmqpEndpointStateUtil.getConnectionState(state));
        }, error -> {
            this.logger.error("linkName[{}] entityPath[{}] Error occurred in connection.", new Object[]{receiver.getName(), entityPath, error});
            this.endpointStateSink.error(error);
            this.dispose();
        }, () -> {
            this.endpointStateSink.next((Object)AmqpEndpointState.CLOSED);
            this.dispose();
        }), this.handler.getErrors().subscribe(error -> {
            this.logger.error("Error occurred in link.", new Object[]{error});
            this.endpointStateSink.error(error);
            this.dispose();
        }), this.tokenManager.getAuthorizationResults().subscribe(response -> {
            this.logger.verbose("Token refreshed: {}", new Object[]{response});
            this.hasAuthorized.set(true);
        }, error -> {
            this.logger.info("clientId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", new Object[]{handler.getConnectionId(), this.entityPath, this.getLinkName(), error.getMessage()});
            this.hasAuthorized.set(false);
        }, () -> this.hasAuthorized.set(false))});
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override
    public Flux<Message> receive() {
        return this.messagesProcessor;
    }

    @Override
    public void addCredits(int credits) {
        this.receiver.flow(credits);
    }

    @Override
    public int getCredits() {
        return this.receiver.getRemoteCredit();
    }

    @Override
    public void setEmptyCreditListener(Supplier<Integer> creditSupplier) {
        Objects.requireNonNull(creditSupplier);
        this.creditSupplier = creditSupplier;
    }

    @Override
    public String getLinkName() {
        return this.receiver.getName();
    }

    @Override
    public String getEntityPath() {
        return this.entityPath;
    }

    @Override
    public String getHostname() {
        return this.handler.getHostname();
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.subscriptions.dispose();
        this.endpointStateSink.complete();
        this.messageSink.complete();
        this.tokenManager.close();
        this.receiver.close();
        try {
            this.dispatcher.invoke(() -> {
                this.receiver.free();
                this.handler.close();
            });
        }
        catch (IOException e) {
            this.logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", new Object[]{e});
            this.handler.close();
        }
    }

    private void decodeDelivery(Delivery delivery) {
        Integer credits;
        int messageSize = delivery.pending();
        byte[] buffer = new byte[messageSize];
        int read = this.receiver.recv(buffer, 0, messageSize);
        this.receiver.advance();
        Message message = Proton.message();
        message.decode(buffer, 0, read);
        delivery.settle();
        this.messageSink.next((Object)message);
        if (this.receiver.getRemoteCredit() == 0 && this.creditSupplier != null && (credits = this.creditSupplier.get()) != null && credits > 0) {
            this.addCredits(credits);
        }
    }
}

