/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.vertx.core.Context;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;

public class PollableSubscriber
extends BaseSubscriber<Row> {
    private static final int REQUEST_BATCH_SIZE = 100;
    private static final long MAX_POLL_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
    private final BlockingQueue<Row> queue = new LinkedBlockingQueue<Row>();
    private final Consumer<Throwable> errorHandler;
    private int tokens;
    private volatile boolean complete;
    private volatile boolean closed;
    private volatile boolean failed;

    public PollableSubscriber(Context context, Consumer<Throwable> errorHandler) {
        super(context);
        this.errorHandler = Objects.requireNonNull(errorHandler);
    }

    protected void afterSubscribe(Subscription subscription) {
        this.checkRequestTokens();
    }

    protected void handleValue(Row row) {
        this.queue.add(row);
    }

    protected void handleError(Throwable t) {
        this.failed = true;
        this.errorHandler.accept(t);
    }

    protected void handleComplete() {
        this.complete = true;
    }

    public synchronized Row poll(Duration timeout) {
        long remainingTime;
        long end;
        if (this.closed || this.failed) {
            return null;
        }
        long timeoutNs = timeout.toNanos();
        if (timeoutNs > 0L) {
            end = System.nanoTime() + timeoutNs;
            remainingTime = timeoutNs;
        } else {
            end = Long.MAX_VALUE;
            remainingTime = Long.MAX_VALUE;
        }
        do {
            long pollTime = Math.min(remainingTime, MAX_POLL_NANOS);
            try {
                Row row = this.queue.poll(pollTime, TimeUnit.NANOSECONDS);
                if (row != null) {
                    --this.tokens;
                    this.checkRequestTokens();
                    return row;
                }
                if (this.complete) {
                    this.close();
                }
            }
            catch (InterruptedException e) {
                return null;
            }
            remainingTime = end - System.nanoTime();
        } while (!this.closed && !this.failed && remainingTime > 0L);
        return null;
    }

    public void close() {
        this.closed = true;
    }

    synchronized boolean isClosed() {
        return this.closed;
    }

    private void checkRequestTokens() {
        if (this.tokens == 0) {
            this.tokens += 100;
            this.makeRequest(100L);
        }
    }
}

