/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kinesis;

import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kinesis.CheckpointGenerator;
import org.apache.beam.sdk.io.kinesis.CustomOptional;
import org.apache.beam.sdk.io.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.kinesis.KinesisSource;
import org.apache.beam.sdk.io.kinesis.ShardReadersPool;
import org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.kinesis.TransientKinesisException;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KinesisReader
extends UnboundedSource.UnboundedReader<KinesisRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
    private static final Duration SAMPLE_PERIOD = Duration.standardMinutes((long)1L);
    private static final Duration SAMPLE_UPDATE = Duration.standardSeconds((long)5L);
    static final int MIN_WATERMARK_MESSAGES = 10;
    private static final int MIN_WATERMARK_SPREAD = 2;
    private final SimplifiedKinesisClient kinesis;
    private final KinesisSource source;
    private final CheckpointGenerator initialCheckpointGenerator;
    private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
    private MovingFunction minReadTimestampMsSinceEpoch;
    private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private long lastBacklogBytes;
    private Instant backlogBytesLastCheckTime = new Instant(0L);
    private Duration upToDateThreshold;
    private Duration backlogBytesCheckThreshold;
    private ShardReadersPool shardReadersPool;

    KinesisReader(SimplifiedKinesisClient kinesis, CheckpointGenerator initialCheckpointGenerator, KinesisSource source, Duration upToDateThreshold) {
        this(kinesis, initialCheckpointGenerator, source, upToDateThreshold, Duration.standardSeconds((long)30L));
    }

    KinesisReader(SimplifiedKinesisClient kinesis, CheckpointGenerator initialCheckpointGenerator, KinesisSource source, Duration upToDateThreshold, Duration backlogBytesCheckThreshold) {
        this.kinesis = Preconditions.checkNotNull(kinesis, "kinesis");
        this.initialCheckpointGenerator = Preconditions.checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
        this.source = source;
        this.minReadTimestampMsSinceEpoch = new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), 2, 10, Min.ofLongs());
        this.upToDateThreshold = upToDateThreshold;
        this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
    }

    public boolean start() throws IOException {
        LOG.info("Starting reader using {}", (Object)this.initialCheckpointGenerator);
        try {
            this.shardReadersPool = this.createShardReadersPool();
            this.shardReadersPool.start();
        }
        catch (TransientKinesisException e) {
            throw new IOException(e);
        }
        return this.advance();
    }

    public boolean advance() throws IOException {
        this.currentRecord = this.shardReadersPool.nextRecord();
        if (this.currentRecord.isPresent()) {
            Instant approximateArrivalTimestamp = this.currentRecord.get().getApproximateArrivalTimestamp();
            this.minReadTimestampMsSinceEpoch.add(Instant.now().getMillis(), approximateArrivalTimestamp.getMillis());
            return true;
        }
        return false;
    }

    public byte[] getCurrentRecordId() throws NoSuchElementException {
        return this.currentRecord.get().getUniqueId();
    }

    public KinesisRecord getCurrent() throws NoSuchElementException {
        return this.currentRecord.get();
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return this.currentRecord.get().getApproximateArrivalTimestamp();
    }

    public void close() throws IOException {
        this.shardReadersPool.stop();
    }

    public Instant getWatermark() {
        Instant minReadTime;
        Instant now = Instant.now();
        long readMin = this.minReadTimestampMsSinceEpoch.get(now.getMillis());
        if (readMin == Long.MAX_VALUE && this.shardReadersPool.allShardsUpToDate()) {
            this.lastWatermark = now;
        } else if (this.minReadTimestampMsSinceEpoch.isSignificant() && (minReadTime = new Instant(readMin)).isAfter((ReadableInstant)this.lastWatermark)) {
            this.lastWatermark = minReadTime;
        }
        return this.lastWatermark;
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        return this.shardReadersPool.getCheckpointMark();
    }

    public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
        return this.source;
    }

    public long getTotalBacklogBytes() {
        Instant watermark = this.getWatermark();
        if (watermark.plus((ReadableDuration)this.upToDateThreshold).isAfterNow()) {
            return 0L;
        }
        if (this.backlogBytesLastCheckTime.plus((ReadableDuration)this.backlogBytesCheckThreshold).isAfterNow()) {
            return this.lastBacklogBytes;
        }
        try {
            this.lastBacklogBytes = this.kinesis.getBacklogBytes(this.source.getStreamName(), watermark);
            this.backlogBytesLastCheckTime = Instant.now();
        }
        catch (TransientKinesisException e) {
            LOG.warn("Transient exception occurred.", (Throwable)e);
        }
        LOG.info("Total backlog bytes for {} stream with {} watermark: {}", new Object[]{this.source.getStreamName(), watermark, this.lastBacklogBytes});
        return this.lastBacklogBytes;
    }

    ShardReadersPool createShardReadersPool() throws TransientKinesisException {
        return new ShardReadersPool(this.kinesis, this.initialCheckpointGenerator.generate(this.kinesis));
    }
}

