/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.dsbulk.executor.api.subscription;

import com.datastax.oss.driver.api.core.AsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException;
import com.datastax.oss.dsbulk.executor.api.listener.DefaultExecutionContext;
import com.datastax.oss.dsbulk.executor.api.listener.ExecutionContext;
import com.datastax.oss.dsbulk.executor.api.listener.ExecutionListener;
import com.datastax.oss.dsbulk.executor.api.result.Result;
import com.datastax.oss.dsbulk.executor.api.subscription.Operators;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jctools.queues.SpscArrayQueue;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ResultSubscription<R extends Result, P extends AsyncPagingIterable<Row, P>>
implements Subscription {
    private static final Logger LOG = LoggerFactory.getLogger(ResultSubscription.class);
    private static final int MAX_ENQUEUED_PAGES = 4;
    private Subscriber<? super R> subscriber;
    final Statement<?> statement;
    @Nullable
    final ExecutionListener listener;
    @Nullable
    private final Semaphore maxConcurrentRequests;
    @Nullable
    final RateLimiter rateLimiter;
    private final boolean failFast;
    final int batchSize;
    private final AtomicLong requested = new AtomicLong(0L);
    final Queue<Page> pages = new SpscArrayQueue(4);
    private volatile Page last = null;
    private final AtomicInteger pagesSize = new AtomicInteger(0);
    private final AtomicInteger draining = new AtomicInteger(0);
    private final DefaultExecutionContext global = new DefaultExecutionContext();
    private final CompletableFuture<Void> initial = new CompletableFuture();
    private volatile boolean cancelled = false;

    ResultSubscription(@NonNull Subscriber<? super R> subscriber, @NonNull Statement<?> statement, @Nullable ExecutionListener listener, @Nullable Semaphore maxConcurrentRequests, @Nullable RateLimiter rateLimiter, boolean failFast) {
        this.statement = statement;
        this.subscriber = subscriber;
        this.listener = listener;
        this.maxConcurrentRequests = maxConcurrentRequests;
        this.rateLimiter = rateLimiter;
        this.failFast = failFast;
        this.batchSize = statement instanceof BatchStatement ? ((BatchStatement)statement).size() : 1;
    }

    public void start(Callable<CompletionStage<? extends P>> initial) {
        this.global.start();
        if (this.listener != null) {
            this.listener.onExecutionStarted(this.statement, this.global);
        }
        this.fetchNextPage(new Page(initial));
    }

    public void request(long n) {
        if (!this.cancelled) {
            if (n < 1L) {
                this.doOnError(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
            } else {
                Operators.addCap(this.requested, n);
                if (!this.initial.isDone()) {
                    this.initial.complete(null);
                }
                this.drain();
            }
        }
    }

    public void cancel() {
        if (!this.cancelled) {
            this.cancelled = true;
            if (this.draining.getAndIncrement() == 0) {
                this.clear();
            }
        }
    }

    private void drain() {
        if (this.draining.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        do {
            long emitted;
            long r = this.requested.get();
            for (emitted = 0L; emitted != r; ++emitted) {
                if (this.cancelled) {
                    this.clear();
                    return;
                }
                R result = this.tryNext();
                if (result == null) break;
                if (result.isSuccess() || !this.failFast) {
                    this.doOnNext(result);
                }
                if (!this.isExhausted()) continue;
                this.stop(result.getError().orElse(null));
                this.clear();
                return;
            }
            if (this.isExhausted()) {
                this.stop(null);
                this.clear();
                return;
            }
            if (this.cancelled) {
                this.clear();
                return;
            }
            if (emitted == 0L) continue;
            Operators.subCap(this.requested, emitted);
        } while ((missed = this.draining.addAndGet(-missed)) != 0);
    }

    private R tryNext() {
        Page current = this.pages.peek();
        if (current != null) {
            if (current.hasMoreRows()) {
                return current.nextRow();
            }
            if (current.hasMorePages() && (current = this.dequeue()) != null && current.hasMoreRows()) {
                return current.nextRow();
            }
        }
        return null;
    }

    private boolean isExhausted() {
        if (this.cancelled) {
            return true;
        }
        Page current = this.pages.peek();
        return current != null && !current.hasMoreRows() && !current.hasMorePages();
    }

    private void fetchNextPage(Page current) {
        DefaultExecutionContext local = new DefaultExecutionContext();
        this.onBeforeRequestStarted();
        local.start();
        this.onRequestStarted(local);
        current.nextPage().whenComplete((rs, t) -> {
            if (this.maxConcurrentRequests != null) {
                this.maxConcurrentRequests.release();
            }
            local.stop();
            if (t == null) {
                this.onRequestSuccessful(rs, local);
            } else {
                this.onRequestFailed((Throwable)t, local);
            }
        }).handle((rs, t) -> {
            Page page;
            if (t == null) {
                page = this.toPage(rs, local);
            } else {
                if (t instanceof CompletionException) {
                    t = t.getCause();
                }
                page = this.toErrorPage((Throwable)t);
            }
            return page;
        }).thenCombine(current.fullyConsumed, (rs, v) -> rs).thenAccept(page -> {
            this.enqueue((Page)page);
            if (page.hasMorePages() && !this.cancelled) {
                this.fetchNextPage((Page)page);
            }
            this.drain();
        });
    }

    void onBeforeRequestStarted() {
        if (this.maxConcurrentRequests != null) {
            this.maxConcurrentRequests.acquireUninterruptibly();
        }
    }

    abstract void onRequestStarted(ExecutionContext var1);

    abstract void onRequestSuccessful(P var1, ExecutionContext var2);

    abstract void onRequestFailed(Throwable var1, ExecutionContext var2);

    private void enqueue(Page page) {
        if (!this.pages.offer(page)) {
            throw new AssertionError((Object)"Queue is full, this should not happen");
        }
        this.last = page;
        if (this.pagesSize.incrementAndGet() < 4) {
            page.fullyConsumed.complete(null);
        }
    }

    private Page dequeue() {
        Page current = this.pages.poll();
        if (current == null) {
            throw new AssertionError((Object)"Queue is empty, this should not happen");
        }
        this.pagesSize.decrementAndGet();
        this.last.fullyConsumed.complete(null);
        return this.pages.peek();
    }

    private void doOnNext(R result) {
        try {
            this.onBeforeResultEmitted(result);
            this.subscriber.onNext(result);
        }
        catch (Throwable t) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext.", t);
            this.cancel();
        }
    }

    void onBeforeResultEmitted(R result) {
    }

    private void stop(@Nullable BulkExecutionException error) {
        this.global.stop();
        if (this.listener != null) {
            if (error != null) {
                this.listener.onExecutionFailed(error, this.global);
            } else {
                this.listener.onExecutionSuccessful(this.statement, this.global);
            }
        }
        if (!this.failFast || error == null) {
            this.doOnComplete();
        } else {
            this.doOnError(error);
        }
    }

    private void doOnComplete() {
        try {
            this.subscriber.onComplete();
        }
        catch (Throwable t) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t);
        }
        this.cancel();
    }

    public void doOnError(Throwable error) {
        try {
            this.subscriber.onError(error);
        }
        catch (Throwable t) {
            t.addSuppressed(error);
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t);
        }
        this.cancel();
    }

    private void clear() {
        this.pages.clear();
        this.subscriber = null;
    }

    abstract Page toPage(P var1, ExecutionContext var2);

    private Page toErrorPage(Throwable t) {
        BulkExecutionException error = new BulkExecutionException(t, this.statement);
        return new Page(Collections.singleton(this.toErrorResult(error)).iterator(), null);
    }

    abstract R toErrorResult(BulkExecutionException var1);

    class Page {
        final Iterator<R> rows;
        final Callable<CompletionStage<? extends P>> nextPage;
        final CompletableFuture<Void> fullyConsumed;

        private Page(Callable<CompletionStage<? extends P>> nextPage) {
            this.nextPage = nextPage;
            this.rows = Collections.emptyIterator();
            this.fullyConsumed = ResultSubscription.this.initial;
        }

        Page(Iterator<R> rows, Callable<CompletionStage<? extends P>> nextPage) {
            this.nextPage = nextPage;
            this.rows = rows;
            this.fullyConsumed = new CompletableFuture();
        }

        boolean hasMorePages() {
            return this.nextPage != null;
        }

        CompletionStage<? extends P> nextPage() {
            try {
                return this.nextPage.call();
            }
            catch (Exception e) {
                CompletableFuture failed = new CompletableFuture();
                failed.completeExceptionally(e);
                return failed;
            }
        }

        boolean hasMoreRows() {
            return this.rows.hasNext();
        }

        R nextRow() {
            return (Result)this.rows.next();
        }
    }
}

