/*
 * Decompiled with CFR 0.152.
 */
package org.reactivestreams.example.unicast;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class AsyncIterablePublisher<T>
implements Publisher<T> {
    private static final int DEFAULT_BATCHSIZE = 1024;
    private final Iterable<T> elements;
    private final Executor executor;
    private final int batchSize;

    public AsyncIterablePublisher(Iterable<T> elements, Executor executor) {
        this(elements, 1024, executor);
    }

    public AsyncIterablePublisher(Iterable<T> elements, int batchSize, Executor executor) {
        if (elements == null) {
            throw null;
        }
        if (executor == null) {
            throw null;
        }
        if (batchSize < 1) {
            throw new IllegalArgumentException("batchSize must be greater than zero!");
        }
        this.elements = elements;
        this.executor = executor;
        this.batchSize = batchSize;
    }

    public void subscribe(Subscriber<? super T> s) {
        new SubscriptionImpl(s).init();
    }

    final class SubscriptionImpl
    implements Subscription,
    Runnable {
        final Subscriber<? super T> subscriber;
        private boolean cancelled = false;
        private long demand = 0L;
        private Iterator<T> iterator;
        private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue();
        private final AtomicBoolean on = new AtomicBoolean(false);

        SubscriptionImpl(Subscriber<? super T> subscriber) {
            if (subscriber == null) {
                throw null;
            }
            this.subscriber = subscriber;
        }

        private void doRequest(long n) {
            if (n < 1L) {
                this.terminateDueTo(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
            } else if (this.demand + n < 1L) {
                this.demand = Long.MAX_VALUE;
                this.doSend();
            } else {
                this.demand += n;
                this.doSend();
            }
        }

        private void doCancel() {
            this.cancelled = true;
        }

        private void doSubscribe() {
            try {
                this.iterator = AsyncIterablePublisher.this.elements.iterator();
                if (this.iterator == null) {
                    this.iterator = Collections.emptyList().iterator();
                }
            }
            catch (Throwable t) {
                this.subscriber.onSubscribe(new Subscription(){

                    public void cancel() {
                    }

                    public void request(long n) {
                    }
                });
                this.terminateDueTo(t);
            }
            if (!this.cancelled) {
                try {
                    this.subscriber.onSubscribe((Subscription)this);
                }
                catch (Throwable t) {
                    this.terminateDueTo(new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
                }
                boolean hasElements = false;
                try {
                    hasElements = this.iterator.hasNext();
                }
                catch (Throwable t) {
                    this.terminateDueTo(t);
                }
                if (!hasElements) {
                    try {
                        this.doCancel();
                        this.subscriber.onComplete();
                    }
                    catch (Throwable t) {
                        new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t).printStackTrace(System.err);
                    }
                }
            }
        }

        private void doSend() {
            try {
                int leftInBatch = AsyncIterablePublisher.this.batchSize;
                do {
                    boolean hasNext;
                    Object next;
                    try {
                        next = this.iterator.next();
                        hasNext = this.iterator.hasNext();
                    }
                    catch (Throwable t) {
                        this.terminateDueTo(t);
                        return;
                    }
                    this.subscriber.onNext(next);
                    if (hasNext) continue;
                    this.doCancel();
                    this.subscriber.onComplete();
                } while (!this.cancelled && --leftInBatch > 0 && --this.demand > 0L);
                if (!this.cancelled && this.demand > 0L) {
                    this.signal(Send.Instance);
                }
            }
            catch (Throwable t) {
                this.doCancel();
                new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t).printStackTrace(System.err);
            }
        }

        private void terminateDueTo(Throwable t) {
            this.cancelled = true;
            try {
                this.subscriber.onError(t);
            }
            catch (Throwable t2) {
                new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2).printStackTrace(System.err);
            }
        }

        private void signal(Signal signal) {
            if (this.inboundSignals.offer(signal)) {
                this.tryScheduleToExecute();
            }
        }

        @Override
        public final void run() {
            if (this.on.get()) {
                try {
                    Signal s = this.inboundSignals.poll();
                    if (!this.cancelled) {
                        if (s instanceof Request) {
                            this.doRequest(((Request)s).n);
                        } else if (s == Send.Instance) {
                            this.doSend();
                        } else if (s == Cancel.Instance) {
                            this.doCancel();
                        } else if (s == Subscribe.Instance) {
                            this.doSubscribe();
                        }
                    }
                }
                finally {
                    this.on.set(false);
                    if (!this.inboundSignals.isEmpty()) {
                        this.tryScheduleToExecute();
                    }
                }
            }
        }

        private final void tryScheduleToExecute() {
            block6: {
                if (this.on.compareAndSet(false, true)) {
                    try {
                        AsyncIterablePublisher.this.executor.execute(this);
                    }
                    catch (Throwable t) {
                        if (this.cancelled) break block6;
                        this.doCancel();
                        try {
                            this.terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
                        }
                        finally {
                            this.inboundSignals.clear();
                            this.on.set(false);
                        }
                    }
                }
            }
        }

        public void request(long n) {
            this.signal(new Request(n));
        }

        public void cancel() {
            this.signal(Cancel.Instance);
        }

        void init() {
            this.signal(Subscribe.Instance);
        }
    }

    static final class Request
    implements Signal {
        final long n;

        Request(long n) {
            this.n = n;
        }
    }

    static enum Send implements Signal
    {
        Instance;

    }

    static enum Subscribe implements Signal
    {
        Instance;

    }

    static enum Cancel implements Signal
    {
        Instance;

    }

    static interface Signal {
    }
}

