/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.dynamodb.preader;

import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dynamodb.preader.AbstractRecordReadRequest;
import org.apache.hadoop.dynamodb.preader.DynamoDBRecordReaderContext;
import org.apache.hadoop.dynamodb.preader.PageResultMultiplexer;
import org.apache.hadoop.dynamodb.preader.RateController;
import org.apache.hadoop.dynamodb.preader.ReadWorker;
import org.apache.hadoop.dynamodb.util.AbstractTimeSource;

public abstract class AbstractReadManager {
    protected static final Log log = LogFactory.getLog(AbstractReadManager.class);
    private static final int MIN_RCU_PER_REQ = 2;
    private static final int MIN_WORKER_COUNT = 1;
    private static final int MAX_WORKER_COUNT = 30;
    private static final int INITIAL_WORKER_COUNT = 1;
    private static final int EVALUATION_FREQ_MS = 5000;
    protected final DynamoDBRecordReaderContext context;
    protected final RateController rateController;
    protected final AbstractTimeSource time;
    protected final Deque<AbstractRecordReadRequest> readRequestQueue = new ConcurrentLinkedDeque<AbstractRecordReadRequest>();
    protected final AtomicInteger segmentsRemaining = new AtomicInteger(0);
    protected final Queue<ReadWorker> workers = new ArrayBlockingQueue<ReadWorker>(30);
    private final List<Report> reportedStats = new ArrayList<Report>();
    private final Object reportStatsLock = new Object();
    private final PageResultMultiplexer<Map<String, AttributeValue>> pageMux;
    private long lastEvaluatedTimeNano;

    public AbstractReadManager(RateController rateController, AbstractTimeSource time, DynamoDBRecordReaderContext context) {
        this.context = context;
        this.rateController = rateController;
        this.time = time;
        this.lastEvaluatedTimeNano = time.getNanoTime();
        this.pageMux = context.getPageResultMultiplexer();
        this.initializeReadRequests();
        for (int i = 0; i < 1; ++i) {
            this.addWorker();
        }
    }

    public void enqueueReadRequestToTail(AbstractRecordReadRequest req) {
        this.readRequestQueue.addLast(req);
    }

    public void enqueueReadRequestToHead(AbstractRecordReadRequest req) {
        this.readRequestQueue.addFirst(req);
    }

    public AbstractRecordReadRequest dequeueReadRequest() {
        return this.readRequestQueue.poll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void report(double permittedReadUnits, double consumedReadUnits, int items, int retries) {
        this.rateController.adjust(permittedReadUnits, consumedReadUnits, items);
        boolean addWorker = false;
        boolean removeWorker = false;
        Object object = this.reportStatsLock;
        synchronized (object) {
            this.reportedStats.add(new Report(consumedReadUnits, items, retries));
            long deltaMs = this.time.getTimeSinceMs(this.lastEvaluatedTimeNano);
            if (deltaMs < 5000L) {
                return;
            }
            int reportCount = this.reportedStats.size();
            if (reportCount == 0) {
                return;
            }
            Report sum = this.getReportedSum();
            double rcuPerRequest = sum.readUnits / (double)reportCount;
            double rcuPerSecond = sum.readUnits * 1000.0 / (double)deltaMs;
            this.recordEvaluationStats(reportCount, rcuPerRequest, rcuPerSecond);
            if (rcuPerRequest < 2.0 && rcuPerSecond * 1.1 > this.rateController.getTargetRate()) {
                removeWorker = true;
            } else if (rcuPerSecond * 1.1 <= this.rateController.getTargetRate()) {
                if (sum.retries > 0) {
                    log.warn((Object)("Not achieving throughput, but not adding workers due to retries (throttles or 500s) (cnt=" + sum.retries + ")"));
                } else {
                    addWorker = true;
                }
            }
            this.reportedStats.clear();
            this.lastEvaluatedTimeNano = this.time.getNanoTime();
        }
        if (removeWorker) {
            log.info((Object)"Removing a worker");
            this.removeWorker();
        } else if (addWorker) {
            log.info((Object)"Adding a worker");
            this.addWorker();
        }
    }

    void markSegmentComplete(int segment) {
        int remaining = this.segmentsRemaining.decrementAndGet();
        log.info((Object)("Segment " + segment + " complete. Remaining segments: " + remaining));
        if (remaining == 0) {
            this.pageMux.setDraining(true);
            this.shutdown();
        }
    }

    public void shutdown() {
        if (this.segmentsRemaining.get() > 0) {
            log.warn((Object)"Shutting down ReadManager while there are segments remaining.");
        } else {
            log.info((Object)"Shutting down record reader, no segments remaining.");
        }
        while (this.workers.size() > 0) {
            this.removeWorker(true);
        }
    }

    protected abstract void initializeReadRequests();

    protected void recordEvaluationStats(int reportCnt, double rcuPerRequest, double rcuPerSecond) {
        log.info((Object)("Evaluating rcuPerRequest=" + rcuPerRequest + ", rcuPerSecond=" + rcuPerSecond + ", reportCnt=" + reportCnt + ", workers=" + this.workers.size()));
    }

    protected void addWorker() {
        ReadWorker worker = new ReadWorker(this, this.context.getReporter());
        if (this.workers.offer(worker)) {
            worker.start();
        } else {
            log.info((Object)"Can't increase worker count, already at max worker count");
        }
    }

    protected void removeWorker() {
        this.removeWorker(false);
    }

    private void removeWorker(boolean force) {
        if (!force && this.workers.size() <= 1) {
            log.info((Object)"Can't reduce worker count, already at min worker count");
        } else {
            ReadWorker worker = this.workers.poll();
            if (worker != null) {
                worker.setAlive(false);
            }
        }
    }

    private Report getReportedSum() {
        double readUnits = 0.0;
        int items = 0;
        int retries = 0;
        for (Report r : this.reportedStats) {
            readUnits += r.readUnits;
            items += r.items;
            retries += r.retries;
        }
        return new Report(readUnits, items, retries);
    }

    private static class Report {
        public final double readUnits;
        public final int items;
        public final int retries;

        public Report(double readUnits, int items, int retries) {
            this.readUnits = readUnits;
            this.items = items;
            this.retries = retries;
        }
    }
}

