/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.ws.emr.hadoop.fs.s3;

import com.amazon.ws.emr.hadoop.fs.EmrFsStore;
import com.amazon.ws.emr.hadoop.fs.consistency.ItemKeys;
import com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException;
import com.amazon.ws.emr.hadoop.fs.dynamodb.Entity;
import com.amazon.ws.emr.hadoop.fs.dynamodb.EntityStore;
import com.amazon.ws.emr.hadoop.fs.retry.BackoffStrategies;
import com.amazon.ws.emr.hadoop.fs.retry.BackoffStrategy;
import com.amazon.ws.emr.hadoop.fs.s3.AbstractS3FSInputStream;
import com.amazon.ws.emr.hadoop.fs.s3.ContentLengthSupplier;
import com.amazon.ws.emr.hadoop.fs.s3.InputStreamWithInfo;
import com.amazon.ws.emr.hadoop.fs.s3.InputStreamWithInfoFactory;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonServiceException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.annotations.VisibleForTesting;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Preconditions;
import com.amazon.ws.emr.hadoop.fs.util.ConfigurationUtils;
import com.amazon.ws.emr.hadoop.fs.util.EmrFsUtils;
import com.amazon.ws.emr.hadoop.fs.util.S3UriUtils;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3FSInputStream
extends AbstractS3FSInputStream
implements CanUnbuffer {
    private static final Logger logger = LoggerFactory.getLogger(S3FSInputStream.class);
    @VisibleForTesting
    public static final int NUM_READ_RETRIES = 5;
    private final String bucketName;
    private final String key;
    private final ContentLengthSupplier contentLengthSupplier;
    private final InputStreamWithInfoFactory inputStreamWithInfoFactory;
    @Nullable
    private final FileSystem.Statistics statistics;
    @Nullable
    private final EntityStore entityStore;
    private final boolean throwOnInconsistency;
    private final boolean lazySeek;
    private final BackoffStrategy backoffStrategy;
    private final AtomicBoolean shouldTryInitialTimeout;
    private InputStreamWithInfo in;
    private boolean wasLazilyOpened;
    private long lastReadPos = 0L;
    private long nextReadPos = 0L;
    @Nullable
    private Long maxLength;
    private volatile long contentLength;

    public S3FSInputStream(@NonNull String bucketName, @NonNull String key, @NonNull ContentLengthSupplier contentLengthSupplier, @NonNull InputStreamWithInfoFactory inputStreamWithInfoFactory, @NonNull Configuration conf, @Nullable FileSystem.Statistics statistics, @Nullable EntityStore entityStore, boolean throwOnInconsistency, boolean lazyOpen, long contentLength) throws IOException {
        this(bucketName, key, contentLengthSupplier, inputStreamWithInfoFactory, statistics, entityStore, 0L, contentLength, null, throwOnInconsistency, lazyOpen, ConfigurationUtils.isLazySeekEnabled(conf), ConfigurationUtils.isPositionedReadOptimizationEnabled(conf), ConfigurationUtils.isReadFullyIntoBuffersOptimizationEnabled(conf), BackoffStrategies.get(conf), new AtomicBoolean(true));
        if (bucketName == null) {
            throw new NullPointerException("bucketName is marked non-null but is null");
        }
        if (key == null) {
            throw new NullPointerException("key is marked non-null but is null");
        }
        if (contentLengthSupplier == null) {
            throw new NullPointerException("contentLengthSupplier is marked non-null but is null");
        }
        if (inputStreamWithInfoFactory == null) {
            throw new NullPointerException("inputStreamWithInfoFactory is marked non-null but is null");
        }
        if (conf == null) {
            throw new NullPointerException("conf is marked non-null but is null");
        }
    }

    private S3FSInputStream(String bucketName, String key, ContentLengthSupplier contentLengthSupplier, InputStreamWithInfoFactory inputStreamWithInfoFactory, FileSystem.Statistics statistics, EntityStore entityStore, long position, long contentLength, @Nullable Long maxLength, boolean throwOnInconsistency, boolean lazyOpen, boolean lazySeek, boolean isPositionedReadOptimizationEnabled, boolean isReadFullyIntoBuffersOptimizationEnabled, BackoffStrategy backoffStrategy, AtomicBoolean shouldTryInitialTimeout) throws IOException {
        super(bucketName, key, inputStreamWithInfoFactory.supportsMaxLength() && isPositionedReadOptimizationEnabled, inputStreamWithInfoFactory.supportsMaxLength() && isReadFullyIntoBuffersOptimizationEnabled && lazySeek);
        Preconditions.checkArgument(entityStore != null || !throwOnInconsistency, "Entity store is required when setting throwOnInconsistency to true");
        Preconditions.checkArgument(maxLength == null || inputStreamWithInfoFactory.supportsMaxLength(), "Max length (%d) is given, but the given input stream factory does not support it", maxLength);
        this.bucketName = bucketName;
        this.key = key;
        this.contentLengthSupplier = contentLengthSupplier;
        this.inputStreamWithInfoFactory = inputStreamWithInfoFactory;
        this.statistics = statistics;
        this.entityStore = entityStore;
        this.contentLength = contentLength;
        this.maxLength = maxLength;
        this.throwOnInconsistency = throwOnInconsistency;
        this.lazySeek = lazySeek;
        this.backoffStrategy = backoffStrategy;
        this.shouldTryInitialTimeout = shouldTryInitialTimeout;
        this.open(position, lazyOpen);
    }

    private void open(long byteStartRange, boolean isLazy) throws IOException {
        Preconditions.checkArgument(byteStartRange >= 0L, "Cannot seek to a negative position");
        try {
            boolean keepContentLength = this.in == null || this.wasLazilyOpened;
            long newContentLength = keepContentLength ? this.contentLength : this.contentLengthSupplier.get(this.bucketName, this.key);
            logger.debug("Stream for key '{}' seeking to position '{}'", (Object)this.key, (Object)byteStartRange);
            this.in = isLazy ? this.inputStreamWithInfoFactory.createClosedStream(this.bucketName, this.key, newContentLength) : this.inputStreamWithInfoFactory.create(this.bucketName, this.key, byteStartRange, newContentLength, this.maxLength, this.shouldTryInitialTimeout.get());
            this.wasLazilyOpened = isLazy;
            this.contentLength = this.in.getContentLength();
        }
        catch (AmazonServiceException e) {
            if (e.getStatusCode() == 404) {
                EmrFsStore.MetadataFile metadataFile;
                if (this.entityStore == null) {
                    throw new FileNotFoundException(String.format("File '%s/%s' does not exist in S3", this.bucketName, this.key));
                }
                Object entity = this.entityStore.retrieve(ItemKeys.toItemKey(this.bucketName, this.key));
                if (entity != null && (metadataFile = EmrFsStore.MetadataFile.parseFrom(((Entity)entity).getPayload())).getState() != EmrFsStore.MetadataFile.State.DELETED) {
                    String errorMessage = String.format("Unable to get object '%s/%s' from s3", this.bucketName, this.key);
                    throw new ConsistencyException(errorMessage, e, Collections.singletonList(S3UriUtils.getPathForS3Object(this.bucketName, this.key)));
                }
                throw new FileNotFoundException(String.format("File '%s/%s' has been deleted in both metadata and s3", this.bucketName, this.key));
            }
            throw new IOException(e);
        }
        this.lastReadPos = byteStartRange;
        this.nextReadPos = byteStartRange;
    }

    public synchronized int read() throws IOException {
        throw new UnsupportedOperationException("Single byte read() not implemented");
    }

    public synchronized int read(@NonNull byte[] bytes, int off, int len) throws IOException {
        if (bytes == null) {
            throw new NullPointerException("bytes is marked non-null but is null");
        }
        if (off < 0 || len < 0 || len > bytes.length - off) {
            throw new IndexOutOfBoundsException();
        }
        if (len == 0) {
            return 0;
        }
        if (this.atEndOfStreamIfKnown()) {
            return -1;
        }
        int numRetries = 5;
        int result = -1;
        Exception lastException = null;
        for (int attempt = 0; attempt < 5; ++attempt) {
            try {
                if (attempt <= 0) {
                    if (this.lazySeek) {
                        this.seekStream();
                    } else {
                        this.ensureStreamNotClosed();
                    }
                } else {
                    this.reopenStream();
                }
                result = this.in.read(bytes, off, len);
                if (result > 0) {
                    this.advance(result);
                    break;
                }
                if (this.in.shouldBreakReadRetry(this.nextReadPos)) break;
                logger.warn(this.generateUnexpectedEndOfStreamMsg());
            }
            catch (FileNotFoundException fnfe) {
                logger.info("Encountered an exception while reading '{}', file not present", (Object)this.in.getKey(), (Object)fnfe);
                throw new FileNotFoundException("File not present on S3");
            }
            catch (AmazonClientException | IOException e) {
                this.shouldTryInitialTimeout.set(false);
                lastException = e;
                if (attempt >= 4) {
                    logger.info("Encountered exception while reading '{}', max retries exceeded.", (Object)this.in.getKey(), (Object)e);
                }
                logger.info("Encountered exception while reading '{}', will retry by attempting to reopen stream.", (Object)this.in.getKey(), (Object)e);
                long retryInterval = this.getRetryInterval(e, attempt);
                logger.debug("Back off {} ms for retrying open stream while reading due to s3 GET-After-PUT consistency issue OR another IO related exception such as SocketReset. For best practice please see https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel", (Object)retryInterval);
                EmrFsUtils.sleep(retryInterval);
            }
            if (attempt < 4) continue;
            logger.error("Unable to recover reading from stream");
            throw new IOException(this.generateUnexpectedEndOfStreamMsg(), lastException);
        }
        return result;
    }

    private void advance(int amount) {
        if (this.maxLength != null) {
            Preconditions.checkArgument((long)amount <= this.maxLength, "Cannot advance beyond maxLength");
            this.maxLength = this.maxLength - (long)amount;
        }
        this.lastReadPos += (long)amount;
        this.nextReadPos += (long)amount;
        if (this.statistics != null) {
            this.statistics.incrementBytesRead((long)amount);
        }
    }

    @VisibleForTesting
    long getRetryInterval(Exception e, int attempt) {
        return this.backoffStrategy.getBackoffMillis(e, attempt);
    }

    @Override
    protected InputStream forkStream(long position, long maxLength) throws IOException {
        long contentLengthSnapshot = this.contentLength;
        if (maxLength == 0L || position >= contentLengthSnapshot) {
            return new ByteArrayInputStream(new byte[0]);
        }
        return new S3FSInputStream(this.bucketName, this.key, this.contentLengthSupplier, this.inputStreamWithInfoFactory, this.statistics, this.entityStore, position, contentLengthSnapshot, maxLength, this.throwOnInconsistency, true, this.lazySeek, false, false, this.backoffStrategy, this.shouldTryInitialTimeout);
    }

    private String generateUnexpectedEndOfStreamMsg() {
        StringBuilder messageBuilder = new StringBuilder("Unexpected end of stream pos=" + this.lastReadPos);
        if (this.in.isSelect()) {
            messageBuilder.append(", byteScanned=" + this.in.getSelectByteScanned());
        }
        messageBuilder.append(", contentLength=" + this.in.getContentLength());
        return messageBuilder.toString();
    }

    private void ensureStreamNotClosed() throws IOException {
        if (this.in.wasClosedSuccessfully()) {
            this.reopenStream();
        }
    }

    public void close() throws IOException {
        this.in.close();
    }

    private synchronized void reopenStream() throws IOException {
        this.in.close();
        this.retrieveInputStreamWithInfo(this.nextReadPos);
        this.lastReadPos = this.nextReadPos;
    }

    private void retrieveInputStreamWithInfo(long pos) throws IOException {
        Preconditions.checkNotNull(this.in, "Requires last InputStreamWithInfo");
        if (pos > this.in.getContentLength()) {
            this.throwPositionOutOfBoundsException(pos);
        }
        if (this.atEndOfStreamIfKnown(pos)) {
            return;
        }
        try {
            this.open(pos, false);
        }
        catch (ConsistencyException e) {
            if (this.throwOnInconsistency) {
                throw e;
            }
            logger.warn(e.getMessage(), (Throwable)e);
            throw new FileNotFoundException(e.getMessage());
        }
    }

    private boolean atEndOfStreamIfKnown() {
        return this.atEndOfStreamIfKnown(this.nextReadPos);
    }

    private boolean atEndOfStreamIfKnown(long pos) {
        return this.in.atEndOfStreamIfKnown(pos);
    }

    private void throwPositionOutOfBoundsException(long pos) throws EOFException {
        throw new EOFException(String.format("Invalid position: %d, exceeds the bounds of the stream: [0, %d]", pos, this.in.getContentLength()));
    }

    public synchronized void seek(long pos) throws IOException {
        if (this.maxLength != null) {
            throw new UnsupportedOperationException("Seeking is not supported when maxLength is specified");
        }
        if (pos < 0L || pos > this.in.getContentLength()) {
            this.throwPositionOutOfBoundsException(pos);
        }
        this.nextReadPos = pos;
        if (!this.lazySeek) {
            this.seekStream();
        }
    }

    private synchronized void seekStream() throws IOException {
        if (this.lastReadPos == this.nextReadPos && !this.in.wasClosedSuccessfully()) {
            return;
        }
        this.reopenStream();
    }

    public synchronized long getPos() throws IOException {
        return this.nextReadPos;
    }

    public boolean seekToNewSource(long targetPos) throws IOException {
        return false;
    }

    public void unbuffer() {
        try {
            this.in.close();
        }
        catch (IOException e) {
            logger.warn("Exception while trying to unbuffer input stream: ", (Throwable)e);
        }
    }
}

