/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.utils.async;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.async.DelegatingSubscriber;

@SdkProtectedApi
public class FlatteningSubscriber<U>
extends DelegatingSubscriber<Iterable<U>, U> {
    private final AtomicLong demand = new AtomicLong(0L);
    private final Object lock = new Object();
    private boolean requestedNextBatch;
    private Queue<U> currentBatch = new LinkedList<U>();
    private boolean onCompleteCalled = false;
    private Subscription sourceSubscription;

    public FlatteningSubscriber(Subscriber<? super U> subscriber) {
        super(subscriber);
    }

    @Override
    public void onSubscribe(final Subscription subscription) {
        this.sourceSubscription = subscription;
        this.subscriber.onSubscribe(new Subscription(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void request(long l) {
                Object object = FlatteningSubscriber.this.lock;
                synchronized (object) {
                    FlatteningSubscriber.this.demand.addAndGet(l);
                    if (FlatteningSubscriber.this.currentBatch.isEmpty() && !FlatteningSubscriber.this.requestedNextBatch) {
                        FlatteningSubscriber.this.requestedNextBatch = true;
                        FlatteningSubscriber.this.sourceSubscription.request(1L);
                    } else {
                        FlatteningSubscriber.this.fulfillDemand();
                    }
                }
            }

            public void cancel() {
                subscription.cancel();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(Iterable<U> nextItems) {
        Object object = this.lock;
        synchronized (object) {
            this.currentBatch = StreamSupport.stream(nextItems.spliterator(), false).collect(Collectors.toCollection(LinkedList::new));
            this.fulfillDemand();
        }
    }

    private void fulfillDemand() {
        while (this.demand.decrementAndGet() > 0L && !this.currentBatch.isEmpty()) {
            this.subscriber.onNext(this.currentBatch.poll());
        }
        if (this.onCompleteCalled && this.currentBatch.isEmpty()) {
            this.subscriber.onComplete();
        } else if (this.currentBatch.isEmpty() && this.demand.get() > 0L) {
            this.requestedNextBatch = true;
            this.sourceSubscription.request(1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onComplete() {
        Object object = this.lock;
        synchronized (object) {
            this.onCompleteCalled = true;
            if (this.currentBatch.isEmpty()) {
                this.subscriber.onComplete();
            }
        }
    }
}

