/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
import com.azure.messaging.eventhubs.implementation.UncheckedExecutionException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

class EventDataAggregator
extends FluxOperator<EventData, EventDataBatch> {
    private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class);
    private final AtomicReference<EventDataAggregatorMain> downstreamSubscription = new AtomicReference();
    private final Supplier<EventDataBatch> batchSupplier;
    private final String namespace;
    private final EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions options;
    private final String partitionId;

    EventDataAggregator(Flux<? extends EventData> source, Supplier<EventDataBatch> batchSupplier, String namespace, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions options, String partitionId) {
        super(source);
        this.partitionId = partitionId;
        this.batchSupplier = batchSupplier;
        this.namespace = namespace;
        this.options = options;
    }

    public void subscribe(CoreSubscriber<? super EventDataBatch> actual) {
        EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, this.namespace, this.options, this.batchSupplier, this.partitionId, LOGGER);
        if (!this.downstreamSubscription.compareAndSet(null, subscription)) {
            throw (IllegalArgumentException)LOGGER.logThrowableAsError((Throwable)new IllegalArgumentException("Cannot resubscribe to multiple upstreams."));
        }
        this.source.subscribe((CoreSubscriber)subscription);
    }

    static class EventDataAggregatorMain
    implements Subscription,
    CoreSubscriber<EventData> {
        private volatile long requested;
        private static final AtomicLongFieldUpdater<EventDataAggregatorMain> REQUESTED = AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested");
        private final Sinks.Many<Long> eventSink;
        private final Disposable disposable;
        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
        private final CoreSubscriber<? super EventDataBatch> downstream;
        private final String partitionId;
        private final ClientLogger logger;
        private final Supplier<EventDataBatch> batchSupplier;
        private final String namespace;
        private final Object lock = new Object();
        private Subscription subscription;
        private EventDataBatch currentBatch;
        private volatile Throwable lastError;

        EventDataAggregatorMain(CoreSubscriber<? super EventDataBatch> downstream, String namespace, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions options, Supplier<EventDataBatch> batchSupplier, String partitionId, ClientLogger logger) {
            this.namespace = namespace;
            this.downstream = downstream;
            this.partitionId = partitionId;
            this.logger = logger;
            this.batchSupplier = batchSupplier;
            this.currentBatch = batchSupplier.get();
            this.eventSink = Sinks.many().unicast().onBackpressureError();
            this.disposable = Flux.switchOnNext((Publisher)this.eventSink.asFlux().map(e -> Flux.interval((Duration)options.getMaxWaitTime()).takeUntil(index -> this.isCompleted.get()))).subscribe(index -> {
                logger.atVerbose().addKeyValue("partitionId", partitionId).log("Time elapsed. Attempt to publish downstream.");
                this.updateOrPublishBatch(null, true);
            });
        }

        public void request(long n) {
            if (!Operators.validate((long)n)) {
                return;
            }
            Operators.addCap(REQUESTED, (Object)this, (long)n);
            this.subscription.request(n);
        }

        public void cancel() {
            if (!this.isCompleted.compareAndSet(false, true)) {
                return;
            }
            this.logger.atVerbose().addKeyValue("partitionId", this.partitionId).log("Disposing of aggregator.");
            this.subscription.cancel();
            this.updateOrPublishBatch(null, true);
            this.downstream.onComplete();
            this.disposable.dispose();
        }

        public void onSubscribe(Subscription s) {
            if (this.subscription != null) {
                this.logger.warning("Subscription was already set. Cancelling existing subscription.");
                this.subscription.cancel();
            } else {
                this.subscription = s;
                this.downstream.onSubscribe((Subscription)this);
            }
        }

        public void onNext(EventData eventData) {
            this.updateOrPublishBatch(eventData, false);
            this.eventSink.emitNext((Object)1L, Sinks.EmitFailureHandler.FAIL_FAST);
            long left = REQUESTED.get(this);
            if (left > 0L) {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable t) {
            if (!this.isCompleted.compareAndSet(false, true)) {
                Operators.onErrorDropped((Throwable)t, (Context)this.downstream.currentContext());
                return;
            }
            this.updateOrPublishBatch(null, true);
            this.downstream.onError(t);
        }

        public void onComplete() {
            if (this.isCompleted.compareAndSet(false, true)) {
                this.updateOrPublishBatch(null, true);
                this.downstream.onComplete();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) {
            boolean added;
            if (alwaysPublish) {
                this.publishDownstream();
                return;
            }
            if (eventData == null) {
                return;
            }
            Object object = this.lock;
            synchronized (object) {
                added = this.currentBatch.tryAdd(eventData);
                if (added) {
                    return;
                }
                this.publishDownstream();
                added = this.currentBatch.tryAdd(eventData);
            }
            if (!added) {
                AmqpException error = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, "EventData exceeded maximum size.", new AmqpErrorContext(this.namespace));
                this.onError((Throwable)error);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void publishDownstream() {
            block13: {
                EventDataBatch previous = null;
                try {
                    Object object = this.lock;
                    synchronized (object) {
                        previous = this.currentBatch;
                        if (previous == null) {
                            this.logger.warning("Batch should not be null, setting a new batch.");
                            this.currentBatch = this.batchSupplier.get();
                            return;
                        }
                        if (previous.getEvents().isEmpty()) {
                            return;
                        }
                        this.downstream.onNext((Object)previous);
                        long batchesLeft = REQUESTED.updateAndGet(this, v -> {
                            if (v == Long.MAX_VALUE) {
                                return v;
                            }
                            return v - 1L;
                        });
                        this.logger.verbose(previous + ": Batch published. Requested batches left: {}", new Object[]{batchesLeft});
                        if (!this.isCompleted.get()) {
                            this.currentBatch = this.batchSupplier.get();
                        } else {
                            this.logger.verbose("Aggregator is completed. Not setting another batch.");
                            this.currentBatch = null;
                        }
                    }
                }
                catch (UncheckedExecutionException exception) {
                    this.logger.info("An exception occurred while trying to create a new batch.", new Object[]{exception});
                    if (this.lastError != null) {
                        this.logger.info("Exception has been set already, terminating EventDataAggregator.");
                        Throwable error = Operators.onNextError((Object)previous, (Throwable)exception, (Context)this.downstream.currentContext(), (Subscription)this.subscription);
                        if (error != null) {
                            this.onError(error);
                        }
                    } else {
                        this.lastError = exception;
                    }
                }
                catch (Throwable e) {
                    Throwable error = Operators.onNextError((Object)previous, (Throwable)e, (Context)this.downstream.currentContext(), (Subscription)this.subscription);
                    this.logger.warning("Unable to push batch downstream to publish.", new Object[]{error});
                    if (error == null) break block13;
                    this.onError(error);
                }
            }
        }
    }
}

