/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.batch;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.work.batch.BaseRawBatchBuffer;
import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnlimitedRawBatchBuffer
extends BaseRawBatchBuffer<RawFragmentBatch> {
    private static final Logger logger = LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
    private final int softlimit;
    private final int startlimit;
    private int runtimeSoftLimit = -1;
    private int runtimeAckCredit = 1;
    private int sampleTimes = 0;
    private long totalBatchSize = 0L;
    private final int fragmentCount;
    private final int maxSampleTimes;
    private final long thresholdNetworkMem;

    public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount, boolean enableDynamicFC) {
        super(context, fragmentCount, enableDynamicFC);
        this.softlimit = this.bufferSizePerSocket * fragmentCount;
        this.startlimit = Math.max(this.softlimit / 2, 1);
        logger.trace("softLimit: {}, startLimit: {}", (Object)this.softlimit, (Object)this.startlimit);
        this.bufferQueue = new UnlimitedBufferQueue();
        this.fragmentCount = fragmentCount;
        this.sampleTimes = fragmentCount;
        this.maxSampleTimes = fragmentCount;
        this.thresholdNetworkMem = context.getConfig().getLong("drill.exec.buffer.unlimited_receiver.max_size");
    }

    private void doFlowControl(RawFragmentBatch batch) {
        if (this.enableDynamicFC) {
            this.calculateDynamicCredit(batch);
            if (this.runtimeSoftLimit > 0) {
                if (this.bufferQueue.size() < this.runtimeSoftLimit) {
                    batch.sendOk(this.runtimeAckCredit);
                }
            } else if (this.bufferQueue.size() < this.softlimit) {
                batch.sendOk();
            }
        } else if (this.bufferQueue.size() < this.softlimit) {
            batch.sendOk();
        }
    }

    private void calculateDynamicCredit(RawFragmentBatch batch) {
        long averageBatchSize;
        long batchByteSize;
        int recordCount = batch.getHeader().getDef().getRecordCount();
        long l = batchByteSize = batch.getBody() == null ? 0L : (long)batch.getBody().capacity();
        if (recordCount != 0) {
            this.totalBatchSize += batchByteSize;
            ++this.sampleTimes;
        }
        if (this.sampleTimes == this.maxSampleTimes && (averageBatchSize = this.totalBatchSize / (long)this.sampleTimes) > 0L) {
            this.runtimeSoftLimit = (int)(this.thresholdNetworkMem / averageBatchSize);
            this.runtimeAckCredit = this.runtimeSoftLimit / this.fragmentCount;
            this.runtimeAckCredit = Math.max(this.runtimeAckCredit, 1);
        }
    }

    @Override
    protected void enqueueInner(RawFragmentBatch batch) throws IOException {
        this.bufferQueue.add(batch);
    }

    @Override
    protected void upkeep(RawFragmentBatch batch) {
    }

    private class UnlimitedBufferQueue
    implements BaseRawBatchBuffer.BufferQueue<RawFragmentBatch> {
        private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();

        private UnlimitedBufferQueue() {
        }

        @Override
        public void addOomBatch(RawFragmentBatch batch) {
            this.buffer.addFirst(batch);
        }

        @Override
        public RawFragmentBatch poll() throws IOException {
            RawFragmentBatch batch = this.buffer.poll();
            if (batch != null) {
                batch.sendOk();
            }
            return batch;
        }

        @Override
        public RawFragmentBatch take() throws IOException, InterruptedException {
            RawFragmentBatch batch = this.buffer.take();
            batch.sendOk();
            return batch;
        }

        @Override
        public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException {
            RawFragmentBatch batch = this.buffer.poll(timeout, timeUnit);
            if (batch != null) {
                batch.sendOk();
            }
            return batch;
        }

        @Override
        public boolean checkForOutOfMemory() {
            return UnlimitedRawBatchBuffer.this.context.getAllocator().isOverLimit();
        }

        @Override
        public int size() {
            return this.buffer.size();
        }

        @Override
        public boolean isEmpty() {
            return this.buffer.size() == 0;
        }

        @Override
        public void add(RawFragmentBatch batch) {
            UnlimitedRawBatchBuffer.this.doFlowControl(batch);
            this.buffer.add(batch);
        }
    }
}

