/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

public class AmqpReceiveLinkProcessor
extends FluxProcessor<AmqpReceiveLink, Message>
implements Subscription {
    private static final int MINIMUM_REQUEST = 0;
    private static final int MAXIMUM_REQUEST = 100;
    private final ClientLogger logger = new ClientLogger(AmqpReceiveLinkProcessor.class);
    private final Object lock = new Object();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final AtomicBoolean hasDownstream = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final AtomicBoolean isRequested = new AtomicBoolean();
    private final AtomicInteger linkCreditRequest = new AtomicInteger(1);
    private final int prefetch;
    private final AmqpRetryPolicy retryPolicy;
    private Disposable parentConnection;
    private volatile Subscription upstream;
    private volatile CoreSubscriber<? super Message> downstream;
    private volatile Throwable lastError;
    private volatile AmqpReceiveLink currentLink;
    private volatile Disposable currentLinkSubscriptions;
    private volatile Disposable retrySubscription;

    public AmqpReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy, Disposable parentConnection) {
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        this.parentConnection = Objects.requireNonNull(parentConnection, "'parentConnection' cannot be null.");
        if (prefetch < 0) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'prefetch' cannot be less than 0."));
        }
        this.prefetch = prefetch;
    }

    public Throwable getError() {
        return this.lastError;
    }

    public boolean isTerminated() {
        return this.isTerminated.get();
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null");
        if (this.isTerminated()) {
            return;
        }
        this.logger.verbose("Subscribing to upstream.", new Object[0]);
        this.upstream = subscription;
        subscription.request(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(AmqpReceiveLink next) {
        Disposable oldSubscription;
        AmqpReceiveLink oldChannel;
        Objects.requireNonNull(next, "'next' cannot be null.");
        if (this.isTerminated()) {
            this.logger.warning("Got another link when we have already terminated processor. Link: {}", new Object[]{next.getEntityPath()});
            return;
        }
        this.logger.info("Setting next AMQP receive link.", new Object[0]);
        Object object = this.lock;
        synchronized (object) {
            oldChannel = this.currentLink;
            oldSubscription = this.currentLinkSubscriptions;
            this.currentLink = next;
            next.addCredits(this.prefetch);
            next.setEmptyCreditListener(() -> {
                if (this.hasDownstream.get()) {
                    return this.linkCreditRequest.get();
                }
                this.logger.verbose("Emitter has no downstream subscribers. Not adding credits.", new Object[0]);
                return 0;
            });
            this.currentLinkSubscriptions = Disposables.composite((Disposable[])new Disposable[]{next.getEndpointStates().subscribe(state -> {
                if (state == AmqpEndpointState.ACTIVE) {
                    this.retryAttempts.set(0);
                }
            }, error -> {
                this.currentLink = null;
                this.onError((Throwable)error);
            }, () -> {
                if (this.parentConnection.isDisposed()) {
                    this.logger.info("Parent connection is disposed.", new Object[0]);
                } else if (this.isTerminated()) {
                    this.logger.info("Processor is disposed.", new Object[0]);
                } else {
                    this.logger.info("Receive link endpoint states are closed.", new Object[0]);
                    AmqpReceiveLink existing = this.currentLink;
                    this.currentLink = null;
                    if (existing != null) {
                        existing.dispose();
                    }
                    this.requestUpstream();
                }
            }), next.receive().subscribe(message -> {
                this.logger.verbose("Pushing next message downstream.", new Object[0]);
                this.downstream.onNext(message);
            })});
        }
        if (oldChannel != null) {
            oldChannel.dispose();
        }
        if (oldSubscription != null) {
            oldSubscription.dispose();
        }
        this.isRequested.set(false);
    }

    public void subscribe(CoreSubscriber<? super Message> actual) {
        Objects.requireNonNull(actual, "'actual' cannot be null.");
        if (this.isTerminated()) {
            this.logger.info("AmqpReceiveLink is already terminated.", new Object[0]);
            actual.onSubscribe(Operators.emptySubscription());
            if (this.hasError()) {
                actual.onError(this.lastError);
            } else {
                actual.onComplete();
            }
            return;
        }
        if (!this.hasDownstream.getAndSet(true)) {
            this.downstream = actual;
            actual.onSubscribe((Subscription)this);
            this.requestUpstream();
        } else {
            Operators.error(actual, (Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("There is already one downstream subscriber.'")));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable, "'throwable' is required.");
        if (this.isTerminated()) {
            this.logger.info("AmqpReceiveLinkProcessor is terminated. Not reopening on error.", new Object[0]);
            return;
        }
        int attempt = this.retryAttempts.incrementAndGet();
        Duration retryInterval = this.retryPolicy.calculateRetryDelay(throwable, attempt);
        if (retryInterval != null && !this.parentConnection.isDisposed()) {
            this.logger.warning("Transient error occurred. Attempt: {}. Retrying after {} ms.", new Object[]{attempt, retryInterval.toMillis(), throwable});
            this.retrySubscription = Mono.delay((Duration)retryInterval).subscribe(i -> this.requestUpstream());
            return;
        }
        if (this.parentConnection.isDisposed()) {
            this.logger.info("Parent connection is disposed. Not reopening on error.", new Object[0]);
        }
        this.logger.warning("Non-retryable error occurred in AMQP receive link.", new Object[]{throwable});
        this.lastError = throwable;
        this.isTerminated.set(true);
        Object object = this.lock;
        synchronized (object) {
            if (this.downstream != null) {
                this.downstream.onError(throwable);
            }
        }
        this.terminate();
    }

    public void onComplete() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        if (this.hasDownstream.get()) {
            this.downstream.onComplete();
        }
        this.terminate();
    }

    public void request(long request) {
        if (this.isTerminated.get()) {
            this.logger.info("Cannot request more from AMQP link processor that is disposed.", new Object[0]);
            return;
        }
        if (request < 0L) {
            this.logger.warning(Messages.REQUEST_VALUE_NOT_VALID, new Object[]{0, 100});
            return;
        }
        int newRequest = request > 100L ? 100 : (int)request;
        this.logger.verbose("Back pressure request. Old value: {}. New value: {}", new Object[]{this.linkCreditRequest.get(), newRequest});
        this.linkCreditRequest.set(newRequest);
    }

    public void cancel() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        if (this.hasDownstream.get()) {
            this.downstream.onComplete();
        }
        this.terminate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestUpstream() {
        if (this.isTerminated()) {
            this.logger.verbose("Terminated. Not requesting another.", new Object[0]);
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.currentLink != null) {
                this.logger.info("AmqpReceiveLink exists, not requesting another.", new Object[0]);
                return;
            }
            if (this.upstream == null) {
                this.logger.verbose("There is no upstream. Not requesting", new Object[0]);
                return;
            }
        }
        if (!this.isRequested.getAndSet(true)) {
            this.logger.info("AmqpReceiveLink not requested, yet. Requesting one.", new Object[0]);
            this.upstream.request(1L);
        } else {
            this.logger.info("AmqpRecieveLink already requested.", new Object[0]);
        }
    }

    private void terminate() {
        if (this.retrySubscription != null && !this.retrySubscription.isDisposed()) {
            this.retrySubscription.dispose();
        }
        if (this.currentLink != null) {
            this.currentLink.dispose();
        }
        this.currentLink = null;
        if (this.currentLinkSubscriptions != null) {
            this.currentLinkSubscriptions.dispose();
        }
    }
}

