/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import java.util.List;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);
    private KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
    private final long requestTimeoutMs;
    private final int responseMinBytes;
    private final long maxResponseBytes;
    private final ConsumerReadCallback<ClientKeyT, ClientValueT> callback;
    private boolean finished;
    private List<ConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private long bytesConsumed = 0L;
    private boolean exceededMinResponseBytes = false;
    private boolean exceededMaxResponseBytes = false;
    private final long started;

    public KafkaConsumerReadTask(KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent, long timeout, long maxBytes, ConsumerReadCallback<ClientKeyT, ClientValueT> callback) {
        this.parent = parent;
        this.maxResponseBytes = Math.min(maxBytes, parent.getConfig().getLong("consumer.request.max.bytes"));
        long defaultRequestTimeout = parent.getConfig().getInt("consumer.request.timeout.ms").intValue();
        this.requestTimeoutMs = timeout <= 0L ? defaultRequestTimeout : Math.min(timeout, defaultRequestTimeout);
        int responseMinBytes = parent.getConfig().getInt("fetch.min.bytes");
        this.responseMinBytes = responseMinBytes < 0 ? Integer.MAX_VALUE : responseMinBytes;
        this.callback = callback;
        this.finished = false;
        this.started = parent.getConfig().getTime().milliseconds();
    }

    public void doPartialRead() {
        try {
            boolean requestTimedOut;
            if (this.messages == null) {
                this.messages = new Vector<ConsumerRecord<ClientKeyT, ClientValueT>>();
            }
            this.addRecords();
            log.trace("KafkaConsumerReadTask exiting read with id={} messages={} bytes={}, backing off if not complete", new Object[]{this, this.messages.size(), this.bytesConsumed});
            long now = this.parent.getConfig().getTime().milliseconds();
            long elapsed = now - this.started;
            boolean bl = requestTimedOut = elapsed >= this.requestTimeoutMs;
            if (requestTimedOut || this.exceededMaxResponseBytes || this.exceededMinResponseBytes) {
                log.trace("Finishing KafkaConsumerReadTask id={} requestTimedOut={} exceededMaxResponseBytes={} exceededMinResponseBytes={}", new Object[]{this, requestTimedOut, this.exceededMaxResponseBytes, this.exceededMinResponseBytes});
                this.finish();
            }
        }
        catch (Exception e) {
            this.finish(e);
            log.error("Unexpected exception in consumer read task id={} ", (Object)this, (Object)e);
        }
    }

    public boolean isDone() {
        return this.finished;
    }

    private void addRecords() {
        while (!this.exceededMinResponseBytes && !this.exceededMaxResponseBytes && this.parent.hasNext()) {
            this.maybeAddRecord();
        }
        while (!this.exceededMaxResponseBytes && this.parent.hasNextCached()) {
            this.maybeAddRecord();
        }
    }

    private void maybeAddRecord() {
        ConsumerRecordAndSize<ClientKeyT, ClientValueT> recordAndSize = this.parent.createConsumerRecord(this.parent.peek());
        long roughMsgSize = recordAndSize.getSize();
        if (this.bytesConsumed + roughMsgSize >= this.maxResponseBytes) {
            this.exceededMaxResponseBytes = true;
            return;
        }
        this.messages.add(recordAndSize.getRecord());
        this.parent.next();
        this.bytesConsumed += roughMsgSize;
        if (!this.exceededMinResponseBytes && this.bytesConsumed > (long)this.responseMinBytes) {
            this.exceededMinResponseBytes = true;
        }
    }

    void finish() {
        this.finish(null);
    }

    private void finish(Exception e) {
        log.trace("Finishing KafkaConsumerReadTask id={}", (Object)this, (Object)e);
        try {
            this.callback.onCompletion(e == null ? this.messages : null, e);
        }
        catch (Throwable t) {
            log.error("Consumer read callback threw an unhandled exception id={}", (Object)this, (Object)e);
        }
        this.finished = true;
    }
}

