/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.dsbulk.executor.api.subscription;

import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.collect.AbstractIterator;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException;
import com.datastax.oss.dsbulk.executor.api.listener.ExecutionContext;
import com.datastax.oss.dsbulk.executor.api.listener.ExecutionListener;
import com.datastax.oss.dsbulk.executor.api.result.DefaultReadResult;
import com.datastax.oss.dsbulk.executor.api.result.ReadResult;
import com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Subscriber;

public class ContinuousReadResultSubscription
extends ResultSubscription<ReadResult, ContinuousAsyncResultSet> {
    public ContinuousReadResultSubscription(@NonNull Subscriber<? super ReadResult> subscriber, @NonNull Statement<?> statement, @Nullable ExecutionListener listener, @Nullable Semaphore maxConcurrentRequests, @Nullable RateLimiter rateLimiter, boolean failFast) {
        super(subscriber, statement, listener, maxConcurrentRequests, rateLimiter, failFast);
    }

    @Override
    ResultSubscription.Page toPage(final ContinuousAsyncResultSet rs, final ExecutionContext local) {
        final Iterator rows = rs.currentPage().iterator();
        AbstractIterator<ReadResult> results = new AbstractIterator<ReadResult>(){

            protected ReadResult computeNext() {
                if (rows.hasNext()) {
                    Row row = (Row)rows.next();
                    if (ContinuousReadResultSubscription.this.listener != null) {
                        ContinuousReadResultSubscription.this.listener.onRowReceived(row, local);
                    }
                    return new DefaultReadResult(ContinuousReadResultSubscription.this.statement, rs.getExecutionInfo(), row);
                }
                return (ReadResult)this.endOfData();
            }
        };
        return new ContinuousPage(rs, (Iterator)results);
    }

    @Override
    public void cancel() {
        ResultSubscription.Page current = (ResultSubscription.Page)this.pages.peek();
        if (current instanceof ContinuousPage) {
            ((ContinuousPage)current).rs.cancel();
        }
        super.cancel();
    }

    @Override
    void onRequestStarted(ExecutionContext local) {
        if (this.listener != null) {
            this.listener.onReadRequestStarted(this.statement, local);
        }
    }

    @Override
    void onRequestSuccessful(ContinuousAsyncResultSet page, ExecutionContext local) {
        if (this.listener != null) {
            this.listener.onReadRequestSuccessful(this.statement, local);
        }
    }

    @Override
    void onRequestFailed(Throwable t, ExecutionContext local) {
        if (this.listener != null) {
            this.listener.onReadRequestFailed(this.statement, t, local);
        }
    }

    @Override
    void onBeforeResultEmitted(ReadResult result) {
        if (this.rateLimiter != null) {
            this.rateLimiter.acquire();
        }
    }

    @Override
    protected ReadResult toErrorResult(BulkExecutionException error) {
        return new DefaultReadResult(error);
    }

    private class ContinuousPage
    extends ResultSubscription.Page {
        final ContinuousAsyncResultSet rs;

        private ContinuousPage(ContinuousAsyncResultSet rs, Iterator<ReadResult> rows) {
            super(rows, rs.hasMorePages() ? () -> ((ContinuousAsyncResultSet)rs).fetchNextPage() : null);
            this.rs = rs;
        }
    }
}

