/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatistics;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbfsOutputStream
extends OutputStream
implements Syncable,
StreamCapabilities,
IOStatisticsSource {
    private final AbfsClient client;
    private final String path;
    private long position;
    private boolean closed;
    private boolean supportFlush;
    private boolean disableOutputStreamFlush;
    private boolean enableSmallWriteOptimization;
    private boolean isAppendBlob;
    private volatile IOException lastError;
    private long lastFlushOffset;
    private long lastTotalAppendOffset = 0L;
    private final int bufferSize;
    private byte[] buffer;
    private int bufferIndex;
    private int numOfAppendsToServerSinceLastFlush;
    private final int maxConcurrentRequestCount;
    private final int maxRequestsThatCanBeQueued;
    private ConcurrentLinkedDeque<WriteOperation> writeOperations;
    private final ThreadPoolExecutor threadExecutor;
    private final ExecutorCompletionService<Void> completionService;
    private CachedSASToken cachedSasToken;
    private AbfsLease lease;
    private String leaseId;
    private ElasticByteBufferPool byteBufferPool = new ElasticByteBufferPool();
    private final FileSystem.Statistics statistics;
    private final AbfsOutputStreamStatistics outputStreamStatistics;
    private IOStatistics ioStatistics;
    private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class);

    public AbfsOutputStream(AbfsClient client, FileSystem.Statistics statistics, String path, long position, AbfsOutputStreamContext abfsOutputStreamContext) {
        this.client = client;
        this.statistics = statistics;
        this.path = path;
        this.position = position;
        this.closed = false;
        this.supportFlush = abfsOutputStreamContext.isEnableFlush();
        this.disableOutputStreamFlush = abfsOutputStreamContext.isDisableOutputStreamFlush();
        this.enableSmallWriteOptimization = abfsOutputStreamContext.isEnableSmallWriteOptimization();
        this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
        this.lastError = null;
        this.lastFlushOffset = 0L;
        this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
        this.buffer = this.byteBufferPool.getBuffer(false, this.bufferSize).array();
        this.bufferIndex = 0;
        this.numOfAppendsToServerSinceLastFlush = 0;
        this.writeOperations = new ConcurrentLinkedDeque();
        this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
        this.maxConcurrentRequestCount = this.isAppendBlob ? 1 : abfsOutputStreamContext.getWriteMaxConcurrentRequestCount();
        this.maxRequestsThatCanBeQueued = abfsOutputStreamContext.getMaxWriteRequestsToQueue();
        this.lease = abfsOutputStreamContext.getLease();
        this.leaseId = abfsOutputStreamContext.getLeaseId();
        this.threadExecutor = new ThreadPoolExecutor(this.maxConcurrentRequestCount, this.maxConcurrentRequestCount, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        this.completionService = new ExecutorCompletionService(this.threadExecutor);
        this.cachedSasToken = new CachedSASToken(abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
        if (this.outputStreamStatistics != null) {
            this.ioStatistics = this.outputStreamStatistics.getIOStatistics();
        }
    }

    public boolean hasCapability(String capability) {
        return this.supportFlush && StoreImplementationUtils.isProbeForSyncable((String)capability);
    }

    @Override
    public void write(int byteVal) throws IOException {
        this.write(new byte[]{(byte)(byteVal & 0xFF)});
    }

    @Override
    public synchronized void write(byte[] data, int off, int length) throws IOException {
        this.maybeThrowLastError();
        Preconditions.checkArgument((data != null ? 1 : 0) != 0, (Object)"null data");
        if (off < 0 || length < 0 || length > data.length - off) {
            throw new IndexOutOfBoundsException();
        }
        if (this.hasLease() && this.isLeaseFreed()) {
            throw new PathIOException(this.path, "Attempted to write to file without lease");
        }
        int currentOffset = off;
        int writableBytes = this.bufferSize - this.bufferIndex;
        int numberOfBytesToWrite = length;
        while (numberOfBytesToWrite > 0) {
            if (writableBytes <= numberOfBytesToWrite) {
                System.arraycopy(data, currentOffset, this.buffer, this.bufferIndex, writableBytes);
                this.bufferIndex += writableBytes;
                this.writeCurrentBufferToService();
                currentOffset += writableBytes;
                numberOfBytesToWrite -= writableBytes;
            } else {
                System.arraycopy(data, currentOffset, this.buffer, this.bufferIndex, numberOfBytesToWrite);
                this.bufferIndex += numberOfBytesToWrite;
                numberOfBytesToWrite = 0;
            }
            writableBytes = this.bufferSize - this.bufferIndex;
        }
        this.incrementWriteOps();
    }

    private void incrementWriteOps() {
        if (this.statistics != null) {
            this.statistics.incrementWriteOps(1);
        }
    }

    private void maybeThrowLastError() throws IOException {
        if (this.lastError != null) {
            throw this.lastError;
        }
    }

    @Override
    public void flush() throws IOException {
        if (!this.disableOutputStreamFlush) {
            this.flushInternalAsync();
        }
    }

    public void hsync() throws IOException {
        if (this.supportFlush) {
            this.flushInternal(false);
        }
    }

    public void hflush() throws IOException {
        if (this.supportFlush) {
            this.flushInternal(false);
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        try {
            this.flushInternal(true);
            this.threadExecutor.shutdown();
        }
        catch (IOException e) {
            throw IOUtils.wrapException((String)this.path, (String)e.getMessage(), (IOException)e);
        }
        finally {
            if (this.hasLease()) {
                this.lease.free();
                this.lease = null;
            }
            this.lastError = new IOException("Stream is closed!");
            this.buffer = null;
            this.bufferIndex = 0;
            this.closed = true;
            this.writeOperations.clear();
            this.byteBufferPool = null;
            if (!this.threadExecutor.isShutdown()) {
                this.threadExecutor.shutdownNow();
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closing AbfsOutputStream ", (Object)this.toString());
        }
    }

    private synchronized void flushInternal(boolean isClose) throws IOException {
        this.maybeThrowLastError();
        if (!this.isAppendBlob && this.enableSmallWriteOptimization && this.numOfAppendsToServerSinceLastFlush == 0 && this.writeOperations.size() == 0 && this.bufferIndex > 0) {
            this.smallWriteOptimizedflushInternal(isClose);
            return;
        }
        this.writeCurrentBufferToService();
        this.flushWrittenBytesToService(isClose);
        this.numOfAppendsToServerSinceLastFlush = 0;
    }

    private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
        this.writeCurrentBufferToService(true, isClose);
        this.waitForAppendsToComplete();
        this.shrinkWriteOperationQueue();
        this.maybeThrowLastError();
        this.numOfAppendsToServerSinceLastFlush = 0;
    }

    private synchronized void flushInternalAsync() throws IOException {
        this.maybeThrowLastError();
        this.writeCurrentBufferToService();
        this.flushWrittenBytesToServiceAsync();
    }

    private void writeAppendBlobCurrentBufferToService() throws IOException {
        if (this.bufferIndex == 0) {
            return;
        }
        byte[] bytes = this.buffer;
        int bytesLength = this.bufferIndex;
        if (this.outputStreamStatistics != null) {
            this.outputStreamStatistics.writeCurrentBuffer();
            this.outputStreamStatistics.bytesToUpload(bytesLength);
        }
        this.buffer = this.byteBufferPool.getBuffer(false, this.bufferSize).array();
        this.bufferIndex = 0;
        long offset = this.position;
        this.position += (long)bytesLength;
        AbfsPerfTracker tracker = this.client.getAbfsPerfTracker();
        try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append");){
            AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE, true, this.leaseId);
            AbfsRestOperation op = this.client.append(this.path, bytes, reqParams, this.cachedSasToken.get());
            this.cachedSasToken.update(op.getSasToken());
            if (this.outputStreamStatistics != null) {
                this.outputStreamStatistics.uploadSuccessful(bytesLength);
            }
            perfInfo.registerResult(op.getResult());
            this.byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
            perfInfo.registerSuccess(true);
            return;
        }
        catch (Exception ex2) {
            AzureBlobFileSystemException ex2;
            if (ex2 instanceof AbfsRestOperationException && ((AbfsRestOperationException)ex2).getStatusCode() == 404) {
                throw new FileNotFoundException(ex2.getMessage());
            }
            if (ex2 instanceof AzureBlobFileSystemException) {
                ex2 = (AzureBlobFileSystemException)ex2;
            }
            this.lastError = new IOException(ex2);
            throw this.lastError;
        }
    }

    private synchronized void writeCurrentBufferToService() throws IOException {
        this.writeCurrentBufferToService(false, false);
    }

    private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
        if (this.isAppendBlob) {
            this.writeAppendBlobCurrentBufferToService();
            return;
        }
        if (this.bufferIndex == 0) {
            return;
        }
        ++this.numOfAppendsToServerSinceLastFlush;
        byte[] bytes = this.buffer;
        int bytesLength = this.bufferIndex;
        if (this.outputStreamStatistics != null) {
            this.outputStreamStatistics.writeCurrentBuffer();
            this.outputStreamStatistics.bytesToUpload(bytesLength);
        }
        this.buffer = this.byteBufferPool.getBuffer(false, this.bufferSize).array();
        this.bufferIndex = 0;
        long offset = this.position;
        this.position += (long)bytesLength;
        if (this.threadExecutor.getQueue().size() >= this.maxRequestsThatCanBeQueued) {
            if (this.outputStreamStatistics != null) {
                try (DurationTracker ignored = this.outputStreamStatistics.timeSpentTaskWait();){
                    this.waitForTaskToComplete();
                }
            } else {
                this.waitForTaskToComplete();
            }
        }
        Future<Void> job = this.completionService.submit(() -> {
            AbfsPerfTracker tracker = this.client.getAbfsPerfTracker();
            try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append");){
                AppendRequestParameters.Mode mode = AppendRequestParameters.Mode.APPEND_MODE;
                if (isFlush & isClose) {
                    mode = AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
                } else if (isFlush) {
                    mode = AppendRequestParameters.Mode.FLUSH_MODE;
                }
                AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, bytesLength, mode, false, this.leaseId);
                AbfsRestOperation op = this.client.append(this.path, bytes, reqParams, this.cachedSasToken.get());
                this.cachedSasToken.update(op.getSasToken());
                perfInfo.registerResult(op.getResult());
                this.byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
                perfInfo.registerSuccess(true);
                Void void_ = null;
                return void_;
            }
        });
        if (this.outputStreamStatistics != null) {
            if (job.isCancelled()) {
                this.outputStreamStatistics.uploadFailed(bytesLength);
            } else {
                this.outputStreamStatistics.uploadSuccessful(bytesLength);
            }
        }
        this.writeOperations.add(new WriteOperation(job, offset, bytesLength));
        this.shrinkWriteOperationQueue();
    }

    private synchronized void waitForAppendsToComplete() throws IOException {
        for (WriteOperation writeOperation : this.writeOperations) {
            try {
                writeOperation.task.get();
            }
            catch (Exception ex2) {
                AzureBlobFileSystemException ex2;
                if (ex2.getCause() instanceof AbfsRestOperationException && ((AbfsRestOperationException)ex2.getCause()).getStatusCode() == 404) {
                    throw new FileNotFoundException(ex2.getMessage());
                }
                if (ex2.getCause() instanceof AzureBlobFileSystemException) {
                    ex2 = (AzureBlobFileSystemException)ex2.getCause();
                }
                this.lastError = new IOException(ex2);
                throw this.lastError;
            }
        }
    }

    private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
        this.waitForAppendsToComplete();
        this.flushWrittenBytesToServiceInternal(this.position, false, isClose);
    }

    private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
        this.shrinkWriteOperationQueue();
        if (this.lastTotalAppendOffset > this.lastFlushOffset) {
            this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true, false);
        }
    }

    private synchronized void flushWrittenBytesToServiceInternal(long offset, boolean retainUncommitedData, boolean isClose) throws IOException {
        if (this.isAppendBlob && !isClose) {
            return;
        }
        AbfsPerfTracker tracker = this.client.getAbfsPerfTracker();
        try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush");){
            AbfsRestOperation op = this.client.flush(this.path, offset, retainUncommitedData, isClose, this.cachedSasToken.get(), this.leaseId);
            this.cachedSasToken.update(op.getSasToken());
            perfInfo.registerResult(op.getResult()).registerSuccess(true);
        }
        catch (AzureBlobFileSystemException ex) {
            if (ex instanceof AbfsRestOperationException && ((AbfsRestOperationException)ex).getStatusCode() == 404) {
                throw new FileNotFoundException(ex.getMessage());
            }
            throw new IOException(ex);
        }
        this.lastFlushOffset = offset;
    }

    private synchronized void shrinkWriteOperationQueue() throws IOException {
        try {
            while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) {
                this.writeOperations.peek().task.get();
                this.lastTotalAppendOffset += this.writeOperations.peek().length;
                this.writeOperations.remove();
                if (this.outputStreamStatistics == null) continue;
                this.outputStreamStatistics.queueShrunk();
            }
        }
        catch (Exception e) {
            this.lastError = e.getCause() instanceof AzureBlobFileSystemException ? (AzureBlobFileSystemException)e.getCause() : new IOException(e);
            throw this.lastError;
        }
    }

    private void waitForTaskToComplete() throws IOException {
        boolean completed = false;
        while (this.completionService.poll() != null) {
            completed = true;
        }
        if (this.isAppendBlob) {
            completed = true;
        }
        if (!completed) {
            try {
                this.completionService.take();
            }
            catch (InterruptedException e) {
                this.lastError = (IOException)new InterruptedIOException(e.toString()).initCause(e);
                throw this.lastError;
            }
        }
    }

    @VisibleForTesting
    public synchronized void waitForPendingUploads() throws IOException {
        this.waitForTaskToComplete();
    }

    @VisibleForTesting
    public AbfsOutputStreamStatistics getOutputStreamStatistics() {
        return this.outputStreamStatistics;
    }

    @VisibleForTesting
    public int getWriteOperationsSize() {
        return this.writeOperations.size();
    }

    @VisibleForTesting
    int getMaxConcurrentRequestCount() {
        return this.maxConcurrentRequestCount;
    }

    @VisibleForTesting
    int getMaxRequestsThatCanBeQueued() {
        return this.maxRequestsThatCanBeQueued;
    }

    @VisibleForTesting
    Boolean isAppendBlobStream() {
        return this.isAppendBlob;
    }

    public IOStatistics getIOStatistics() {
        return this.ioStatistics;
    }

    @VisibleForTesting
    public boolean isLeaseFreed() {
        if (this.lease == null) {
            return true;
        }
        return this.lease.isFreed();
    }

    @VisibleForTesting
    public boolean hasLease() {
        return this.lease != null;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder(super.toString());
        if (this.outputStreamStatistics != null) {
            sb.append("AbfsOutputStream@").append(this.hashCode());
            sb.append("){");
            sb.append(this.outputStreamStatistics.toString());
            sb.append("}");
        }
        return sb.toString();
    }

    private static class WriteOperation {
        private final Future<Void> task;
        private final long startOffset;
        private final long length;

        WriteOperation(Future<Void> task, long startOffset, long length) {
            Preconditions.checkNotNull(task, (Object)"task");
            Preconditions.checkArgument((startOffset >= 0L ? 1 : 0) != 0, (Object)"startOffset");
            Preconditions.checkArgument((length >= 0L ? 1 : 0) != 0, (Object)"length");
            this.task = task;
            this.startOffset = startOffset;
            this.length = length;
        }
    }
}

