/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kinesis;

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kinesis.CustomOptional;
import org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.kinesis.ShardRecordsIterator;
import org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.kinesis.TransientKinesisException;
import org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ShardReadersPool {
    private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
    private static final int DEFAULT_CAPACITY_PER_SHARD = 10000;
    private ExecutorService executorService;
    private BlockingQueue<KinesisRecord> recordsQueue;
    private Map<String, ShardRecordsIterator> shardIteratorsMap;
    private SimplifiedKinesisClient kinesis;
    private KinesisReaderCheckpoint initialCheckpoint;
    private final int queueCapacityPerShard;
    private AtomicBoolean poolOpened = new AtomicBoolean(true);

    ShardReadersPool(SimplifiedKinesisClient kinesis, KinesisReaderCheckpoint initialCheckpoint) {
        this(kinesis, initialCheckpoint, 10000);
    }

    ShardReadersPool(SimplifiedKinesisClient kinesis, KinesisReaderCheckpoint initialCheckpoint, int queueCapacityPerShard) {
        this.kinesis = kinesis;
        this.initialCheckpoint = initialCheckpoint;
        this.queueCapacityPerShard = queueCapacityPerShard;
    }

    void start() throws TransientKinesisException {
        ImmutableMap.Builder<String, ShardRecordsIterator> shardsMap = ImmutableMap.builder();
        for (ShardCheckpoint checkpoint : this.initialCheckpoint) {
            shardsMap.put(checkpoint.getShardId(), this.createShardIterator(this.kinesis, checkpoint));
        }
        this.shardIteratorsMap = shardsMap.build();
        this.executorService = Executors.newFixedThreadPool(this.shardIteratorsMap.size());
        this.recordsQueue = new LinkedBlockingQueue<KinesisRecord>(this.queueCapacityPerShard * this.shardIteratorsMap.size());
        for (ShardRecordsIterator shardRecordsIterator : this.shardIteratorsMap.values()) {
            this.executorService.submit(() -> this.readLoop(shardRecordsIterator));
        }
    }

    private void readLoop(ShardRecordsIterator shardRecordsIterator) {
        while (this.poolOpened.get()) {
            try {
                List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
                for (KinesisRecord kinesisRecord : kinesisRecords) {
                    this.recordsQueue.put(kinesisRecord);
                }
            }
            catch (TransientKinesisException e) {
                LOG.warn("Transient exception occurred.", (Throwable)e);
            }
            catch (InterruptedException e) {
                LOG.warn("Thread was interrupted, finishing the read loop", (Throwable)e);
                break;
            }
            catch (Throwable e) {
                LOG.error("Unexpected exception occurred", e);
            }
        }
        LOG.info("Kinesis Shard read loop has finished");
    }

    CustomOptional<KinesisRecord> nextRecord() {
        try {
            KinesisRecord record = this.recordsQueue.poll(1L, TimeUnit.SECONDS);
            if (record == null) {
                return CustomOptional.absent();
            }
            this.shardIteratorsMap.get(record.getShardId()).ackRecord(record);
            return CustomOptional.of(record);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
            return CustomOptional.absent();
        }
    }

    void stop() {
        LOG.info("Closing shard iterators pool");
        this.poolOpened.set(false);
        this.executorService.shutdownNow();
        boolean isShutdown = false;
        int attemptsLeft = 3;
        while (!isShutdown && attemptsLeft-- > 0) {
            try {
                isShutdown = this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for the executor service to shutdown");
                throw new RuntimeException(e);
            }
            if (isShutdown || attemptsLeft <= 0) continue;
            LOG.warn("Executor service is taking long time to shutdown, will retry. {} attempts left", (Object)attemptsLeft);
        }
    }

    boolean allShardsUpToDate() {
        boolean shardsUpToDate = true;
        for (ShardRecordsIterator shardRecordsIterator : this.shardIteratorsMap.values()) {
            shardsUpToDate &= shardRecordsIterator.isUpToDate();
        }
        return shardsUpToDate;
    }

    KinesisReaderCheckpoint getCheckpointMark() {
        return new KinesisReaderCheckpoint(this.shardIteratorsMap.values().stream().map(shardRecordsIterator -> {
            Preconditions.checkArgument(shardRecordsIterator != null, "shardRecordsIterator can not be null");
            return shardRecordsIterator.getCheckpoint();
        }).collect(Collectors.toList()));
    }

    ShardRecordsIterator createShardIterator(SimplifiedKinesisClient kinesis, ShardCheckpoint checkpoint) throws TransientKinesisException {
        return new ShardRecordsIterator(checkpoint, kinesis);
    }
}

