/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.ws.emr.hadoop.fs.s3;

import com.amazon.ws.emr.hadoop.fs.EmrFsStore;
import com.amazon.ws.emr.hadoop.fs.consistency.ItemKeys;
import com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException;
import com.amazon.ws.emr.hadoop.fs.cse.CSEUtils;
import com.amazon.ws.emr.hadoop.fs.dynamodb.Entity;
import com.amazon.ws.emr.hadoop.fs.dynamodb.EntityStore;
import com.amazon.ws.emr.hadoop.fs.property.RetryPolicyType;
import com.amazon.ws.emr.hadoop.fs.s3.InputStreamWithInfo;
import com.amazon.ws.emr.hadoop.fs.s3.InputStreamWithInfoFactory;
import com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3Lite;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonServiceException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.annotations.VisibleForTesting;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Preconditions;
import com.amazon.ws.emr.hadoop.fs.util.ConfigurationUtils;
import com.amazon.ws.emr.hadoop.fs.util.EmrFsUtils;
import com.amazon.ws.emr.hadoop.fs.util.RetryUtils;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3FSInputStream
extends FSInputStream
implements CanUnbuffer {
    private static final Logger LOG = LoggerFactory.getLogger(S3FSInputStream.class);
    private InputStreamWithInfo in;
    private AmazonS3Lite s3;
    private long lastReadPos = 0L;
    private long nextReadPos = 0L;
    private EntityStore entityStore;
    private String bucketName;
    private boolean throwOnInconsistency;
    private Configuration conf;
    private final boolean lazySeek;
    private long readRetryIntervalMS;
    private int fastFirstRetryDelayMS;
    private final InputStreamWithInfoFactory inputStreamWithInfoFactory;
    private final RetryPolicyType defaultRetryPolicy;
    private final AtomicBoolean shouldTryInitialTimeout;

    public S3FSInputStream(@NonNull String bucketName, @NonNull String key, @NonNull AmazonS3Lite s3, @NonNull EntityStore entityStore, boolean throwOnInconsistency, @NonNull Configuration conf, @NonNull InputStreamWithInfoFactory inputStreamWithInfoFactory) throws ConsistencyException, IOException {
        if (bucketName == null) {
            throw new NullPointerException("bucketName");
        }
        if (key == null) {
            throw new NullPointerException("key");
        }
        if (s3 == null) {
            throw new NullPointerException("s3");
        }
        if (entityStore == null) {
            throw new NullPointerException("entityStore");
        }
        if (conf == null) {
            throw new NullPointerException("conf");
        }
        if (inputStreamWithInfoFactory == null) {
            throw new NullPointerException("inputStreamWithInfoFactory");
        }
        this.bucketName = bucketName;
        this.s3 = s3;
        this.entityStore = entityStore;
        this.throwOnInconsistency = throwOnInconsistency;
        this.conf = conf;
        this.lazySeek = ConfigurationUtils.isLazySeekEnabled(conf);
        this.readRetryIntervalMS = ConfigurationUtils.getConsistencyRetryPeriodSeconds(conf) * 1000;
        this.fastFirstRetryDelayMS = ConfigurationUtils.getFastFirstRetryPeriodMs(conf);
        this.shouldTryInitialTimeout = new AtomicBoolean(true);
        this.inputStreamWithInfoFactory = inputStreamWithInfoFactory;
        this.open(key, 0L);
        this.defaultRetryPolicy = ConfigurationUtils.getRetryPolicyType(conf);
    }

    private void open(String key, long byteStartRange) throws ConsistencyException, IOException {
        Preconditions.checkArgument(byteStartRange >= 0L, "Cannot seek to a negative position");
        try {
            long plaintextLength = CSEUtils.getPlaintextLength(this.s3, this.bucketName, key, null, this.conf);
            LOG.debug("Stream for key '{}' seeking to position '{}'", (Object)key, (Object)byteStartRange);
            this.in = this.inputStreamWithInfoFactory.create(this.bucketName, key, byteStartRange, plaintextLength, this.shouldTryInitialTimeout.get());
        }
        catch (AmazonServiceException e) {
            if (e.getStatusCode() == 404) {
                EmrFsStore.MetadataFile metadataFile;
                Object entity = this.entityStore.retrieve(ItemKeys.toItemKey(this.bucketName, key));
                if (entity != null && (metadataFile = EmrFsStore.MetadataFile.parseFrom(((Entity)entity).getPayload())).getState() != EmrFsStore.MetadataFile.State.DELETED) {
                    String errorMessage = String.format("Unable to get object '%s/%s' from s3", this.bucketName, key);
                    throw new ConsistencyException(errorMessage, e, Collections.singletonList(EmrFsUtils.getPathForS3Object(this.bucketName, key)));
                }
                throw new FileNotFoundException(String.format("File '%s/%s' has been deleted in both metadata and s3", this.bucketName, key));
            }
            throw new IOException(e);
        }
        this.lastReadPos = byteStartRange;
        this.nextReadPos = byteStartRange;
    }

    public synchronized int read() throws IOException {
        throw new UnsupportedOperationException("Single byte read() not implemented");
    }

    private void advance(int amount) {
        this.lastReadPos += (long)amount;
        this.nextReadPos += (long)amount;
    }

    public synchronized int read(byte[] b, int off, int len) throws IOException {
        Preconditions.checkNotNull(b, "byte array 'b' is required");
        if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        }
        if (len == 0) {
            return 0;
        }
        if (this.atEndOfStreamIfKnown()) {
            return -1;
        }
        int numRetries = 5;
        int result = -1;
        Exception lastException = null;
        for (int attempt = 0; attempt < 5; ++attempt) {
            try {
                if (attempt <= 0) {
                    if (this.lazySeek) {
                        this.seekStream();
                    }
                } else {
                    this.reopenStream();
                }
                if ((result = this.in.read(b, off, len)) > 0) {
                    this.advance(result);
                    break;
                }
                if (this.in.shouldBreakReadRetry(this.nextReadPos)) break;
                LOG.warn(this.generateUnexpectedEndOfStreamMsg());
            }
            catch (FileNotFoundException fnfe) {
                LOG.info("Encountered an exception while reading '{}', file not present", (Object)this.in.getKey(), (Object)fnfe);
                throw new FileNotFoundException("File not present on S3");
            }
            catch (AmazonClientException | IOException e) {
                this.shouldTryInitialTimeout.set(false);
                lastException = e;
                if (attempt >= 4) {
                    LOG.info("Encountered exception while reading '{}', max retries exceeded.", (Object)this.in.getKey(), (Object)e);
                }
                LOG.info("Encountered exception while reading '{}', will retry by attempting to reopen stream.", (Object)this.in.getKey(), (Object)e);
                long retryInterval = this.getRetryInterval(attempt, e);
                LOG.debug("Back off {} ms for retrying open stream while reading due to s3 GET-After-PUT consistency issue OR another IO related exception such as SocketReset. For best practice please see https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel", (Object)retryInterval);
                EmrFsUtils.sleep(retryInterval);
            }
            if (attempt < 4) continue;
            LOG.error("Unable to recover reading from stream");
            throw new IOException(this.generateUnexpectedEndOfStreamMsg(), lastException);
        }
        return result;
    }

    @VisibleForTesting
    protected long getRetryInterval(int attempt, Exception e) {
        return RetryUtils.calcRetryInterval(this.defaultRetryPolicy, this.readRetryIntervalMS, attempt, this.fastFirstRetryDelayMS, e);
    }

    private String generateUnexpectedEndOfStreamMsg() {
        StringBuilder messageBuilder = new StringBuilder("Unexpected end of stream pos=" + this.lastReadPos);
        if (this.in.isSelect()) {
            messageBuilder.append(", byteScanned=" + this.in.getSelectByteScanned());
        }
        messageBuilder.append(", contentLength=" + this.in.getContentLength());
        return messageBuilder.toString();
    }

    public void close() throws IOException {
        this.in.close();
    }

    private synchronized void reopenStream() throws IOException {
        this.in.close();
        this.retrieveInputStreamWithInfo(this.nextReadPos);
        this.lastReadPos = this.nextReadPos;
    }

    private void retrieveInputStreamWithInfo(long pos) throws IOException {
        Preconditions.checkNotNull(this.in, "Requires last InputStreamWithInfo");
        if (this.atEndOfStreamIfKnown(pos)) {
            return;
        }
        try {
            this.open(this.in.getKey(), pos);
        }
        catch (ConsistencyException e) {
            if (this.throwOnInconsistency) {
                throw e;
            }
            LOG.warn(e.getMessage(), (Throwable)e);
            throw new FileNotFoundException(e.getMessage());
        }
    }

    private boolean atEndOfStreamIfKnown() {
        return this.atEndOfStreamIfKnown(this.nextReadPos);
    }

    private boolean atEndOfStreamIfKnown(long pos) {
        return this.in.atEndOfStreamIfKnown(pos);
    }

    private void throwPositionOutOfBoundsException(long pos) throws EOFException {
        throw new EOFException(String.format("Invalid position: %d, exceeds the bounds of the stream: [0, %d]", pos, this.in.getContentLength()));
    }

    public synchronized void seek(long pos) throws IOException {
        if (pos < 0L || pos > this.in.getContentLength()) {
            this.throwPositionOutOfBoundsException(pos);
        }
        this.nextReadPos = pos;
        if (!this.lazySeek) {
            this.seekStream();
        }
    }

    synchronized void seekStream() throws IOException {
        if (this.lastReadPos == this.nextReadPos && !this.in.wasClosedSuccessfully()) {
            return;
        }
        this.reopenStream();
    }

    public synchronized long getPos() throws IOException {
        return this.nextReadPos;
    }

    public boolean seekToNewSource(long targetPos) throws IOException {
        return false;
    }

    public void unbuffer() {
        try {
            this.in.close();
        }
        catch (IOException e) {
            LOG.warn("Exception while trying to unbuffer input stream: ", (Throwable)e);
        }
    }
}

