/*
 * Decompiled with CFR 0.152.
 */
package com.azure.storage.file.datalake;

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.ResponseBase;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.ProgressReceiver;
import com.azure.storage.common.ProgressReporter;
import com.azure.storage.common.implementation.UploadBufferPool;
import com.azure.storage.common.implementation.UploadUtils;
import com.azure.storage.file.datalake.DataLakePathAsyncClient;
import com.azure.storage.file.datalake.DataLakeServiceVersion;
import com.azure.storage.file.datalake.Transforms;
import com.azure.storage.file.datalake.implementation.models.LeaseAccessConditions;
import com.azure.storage.file.datalake.implementation.models.ModifiedAccessConditions;
import com.azure.storage.file.datalake.implementation.models.PathFlushDataHeaders;
import com.azure.storage.file.datalake.implementation.models.PathResourceType;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.implementation.util.ModelHelper;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileRange;
import com.azure.storage.file.datalake.models.FileReadAsyncResponse;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
import com.azure.storage.file.datalake.models.PathInfo;
import com.azure.storage.file.datalake.models.PathProperties;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

public class DataLakeFileAsyncClient
extends DataLakePathAsyncClient {
    static final int MAX_APPEND_FILE_BYTES = 0x6400000;
    private final ClientLogger logger = new ClientLogger(DataLakeFileAsyncClient.class);

    DataLakeFileAsyncClient(HttpPipeline pipeline, String url, DataLakeServiceVersion serviceVersion, String accountName, String fileSystemName, String fileName, BlockBlobAsyncClient blockBlobAsyncClient) {
        super(pipeline, url, serviceVersion, accountName, fileSystemName, fileName, PathResourceType.FILE, blockBlobAsyncClient);
    }

    DataLakeFileAsyncClient(DataLakePathAsyncClient pathAsyncClient) {
        super(pathAsyncClient.getHttpPipeline(), pathAsyncClient.getPathUrl(), pathAsyncClient.getServiceVersion(), pathAsyncClient.getAccountName(), pathAsyncClient.getFileSystemName(), pathAsyncClient.getObjectPath(), PathResourceType.FILE, pathAsyncClient.getBlockBlobAsyncClient());
    }

    public String getFileUrl() {
        return this.getPathUrl();
    }

    public String getFilePath() {
        return this.getObjectPath();
    }

    public String getFileName() {
        return this.getObjectName();
    }

    public Mono<Void> delete() {
        try {
            return this.deleteWithResponse(null).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<Response<Void>> deleteWithResponse(DataLakeRequestConditions requestConditions) {
        try {
            return FluxUtil.withContext(context -> this.deleteWithResponse(null, requestConditions, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions) {
        return this.upload(data, parallelTransferOptions, false);
    }

    public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, boolean overwrite) {
        DataLakeRequestConditions requestConditions;
        Mono overwriteCheck;
        if (overwrite) {
            overwriteCheck = Mono.empty();
            requestConditions = null;
        } else {
            overwriteCheck = this.exists().flatMap(exists -> exists != false ? FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("Blob already exists. Specify overwrite to true to force update the blob.")) : Mono.empty());
            requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
        }
        return overwriteCheck.then(this.uploadWithResponse(data, parallelTransferOptions, null, null, requestConditions)).flatMap(FluxUtil::toMono);
    }

    public Mono<Response<PathInfo>> uploadWithResponse(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, PathHttpHeaders headers, Map<String, String> metadata, DataLakeRequestConditions requestConditions) {
        try {
            Objects.requireNonNull(data, "'data' must not be null");
            DataLakeRequestConditions validatedRequestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
            DataLakeRequestConditions validatedUploadRequestConditions = new DataLakeRequestConditions().setLeaseId(validatedRequestConditions.getLeaseId());
            ParallelTransferOptions validatedParallelTransferOptions = ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
            long fileOffset = 0L;
            Function<Flux, Mono> uploadInChunksFunction = stream -> this.uploadInChunks((Flux<ByteBuffer>)stream, fileOffset, validatedParallelTransferOptions, headers, validatedUploadRequestConditions);
            BiFunction<Flux, Long, Mono> uploadFullMethod = (stream, length) -> this.uploadWithResponse((Flux<ByteBuffer>)ProgressReporter.addProgressReporting((Flux)stream, (ProgressReceiver)validatedParallelTransferOptions.getProgressReceiver()), fileOffset, (long)length, headers, validatedUploadRequestConditions);
            return this.createWithResponse(null, null, headers, metadata, validatedRequestConditions).then(UploadUtils.uploadFullOrChunked(data, (ParallelTransferOptions)validatedParallelTransferOptions, uploadInChunksFunction, uploadFullMethod));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long fileOffset, ParallelTransferOptions parallelTransferOptions, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
        AtomicLong totalProgress = new AtomicLong();
        ReentrantLock progressLock = new ReentrantLock();
        UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers().intValue(), parallelTransferOptions.getBlockSize().intValue(), 0x6400000);
        Flux chunkedSource = UploadUtils.chunkSource(data, (ParallelTransferOptions)parallelTransferOptions);
        return chunkedSource.concatMap(arg_0 -> ((UploadBufferPool)pool).write(arg_0)).concatWith((Publisher)Flux.defer(() -> ((UploadBufferPool)pool).flush())).map(buffer -> Tuples.of((Object)buffer, (Object)buffer.remaining(), (Object)0L)).scan((result, source) -> {
            ByteBuffer buffer = (ByteBuffer)source.getT1();
            long currentBufferLength = buffer.remaining();
            long lastBytesWritten = (Long)result.getT2();
            long lastOffset = (Long)result.getT3();
            return Tuples.of((Object)buffer, (Object)currentBufferLength, (Object)(lastBytesWritten + lastOffset));
        }).flatMapSequential(tuple3 -> {
            ByteBuffer buffer = (ByteBuffer)tuple3.getT1();
            long currentBufferLength = buffer.remaining();
            long currentOffset = (Long)tuple3.getT3() + fileOffset;
            Flux progressData = ProgressReporter.addParallelProgressReporting((Flux)Flux.just((Object)buffer), (ProgressReceiver)parallelTransferOptions.getProgressReceiver(), (Lock)progressLock, (AtomicLong)totalProgress);
            return this.appendWithResponse((Flux<ByteBuffer>)progressData, currentOffset, currentBufferLength, null, requestConditions.getLeaseId()).doFinally(x -> pool.returnBuffer(buffer)).map(resp -> currentBufferLength + currentOffset).flux();
        }).last().flatMap(length -> this.flushWithResponse((long)length, false, false, httpHeaders, requestConditions));
    }

    private Mono<Response<PathInfo>> uploadWithResponse(Flux<ByteBuffer> data, long fileOffset, long length, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
        return this.appendWithResponse(data, fileOffset, length, null, requestConditions.getLeaseId()).flatMap(resp -> this.flushWithResponse(fileOffset + length, false, false, httpHeaders, requestConditions));
    }

    public Mono<Void> uploadFromFile(String filePath) {
        try {
            return this.uploadFromFile(filePath, false);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {
        try {
            Mono overwriteCheck = Mono.empty();
            DataLakeRequestConditions requestConditions = null;
            if (!overwrite) {
                if (UploadUtils.shouldUploadInChunks((String)filePath, (Integer)0x6400000, (ClientLogger)this.logger)) {
                    overwriteCheck = this.exists().flatMap(exists -> exists != false ? FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("File already exists. Specify overwrite to true to force update the file.")) : Mono.empty());
                }
                requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
            }
            return overwriteCheck.then(this.uploadFromFile(filePath, null, null, null, requestConditions));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions, PathHttpHeaders headers, Map<String, String> metadata, DataLakeRequestConditions requestConditions) {
        Integer originalBlockSize = parallelTransferOptions == null ? null : parallelTransferOptions.getBlockSize();
        DataLakeRequestConditions validatedRequestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
        DataLakeRequestConditions validatedUploadRequestConditions = new DataLakeRequestConditions().setLeaseId(validatedRequestConditions.getLeaseId());
        ParallelTransferOptions finalParallelTransferOptions = ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
        long fileOffset = 0L;
        try {
            return Mono.using(() -> UploadUtils.uploadFileResourceSupplier((String)filePath, (ClientLogger)this.logger), channel -> {
                try {
                    long fileSize = channel.size();
                    if (fileSize == 0L) {
                        throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("Size of the file must be greater than 0."));
                    }
                    if (UploadUtils.shouldUploadInChunks((String)filePath, (Integer)finalParallelTransferOptions.getMaxSingleUploadSize(), (ClientLogger)this.logger)) {
                        return this.createWithResponse(null, null, headers, metadata, validatedRequestConditions).then(this.uploadFileChunks(fileOffset, fileSize, finalParallelTransferOptions, originalBlockSize, headers, validatedUploadRequestConditions, (AsynchronousFileChannel)channel));
                    }
                    return this.createWithResponse(null, null, headers, metadata, validatedRequestConditions).then(this.uploadWithResponse((Flux<ByteBuffer>)FluxUtil.readFile((AsynchronousFileChannel)channel), fileOffset, fileSize, headers, validatedUploadRequestConditions)).then();
                }
                catch (IOException ex) {
                    return Mono.error((Throwable)ex);
                }
            }, channel -> UploadUtils.uploadFileCleanup((AsynchronousFileChannel)channel, (ClientLogger)this.logger));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    private Mono<Void> uploadFileChunks(long fileOffset, long fileSize, ParallelTransferOptions parallelTransferOptions, Integer originalBlockSize, PathHttpHeaders headers, DataLakeRequestConditions requestConditions, AsynchronousFileChannel channel) {
        AtomicLong totalProgress = new AtomicLong();
        ReentrantLock progressLock = new ReentrantLock();
        return Flux.fromIterable(this.sliceFile(fileSize, originalBlockSize, parallelTransferOptions.getBlockSize())).flatMap(chunk -> {
            Flux progressData = ProgressReporter.addParallelProgressReporting((Flux)FluxUtil.readFile((AsynchronousFileChannel)channel, (long)chunk.getOffset(), (long)chunk.getCount()), (ProgressReceiver)parallelTransferOptions.getProgressReceiver(), (Lock)progressLock, (AtomicLong)totalProgress);
            return this.appendWithResponse((Flux<ByteBuffer>)progressData, fileOffset + chunk.getOffset(), chunk.getCount(), null, requestConditions.getLeaseId());
        }).then(Mono.defer(() -> this.flushWithResponse(fileSize, false, false, headers, requestConditions))).then();
    }

    private List<FileRange> sliceFile(long fileSize, Integer originalBlockSize, int blockSize) {
        ArrayList<FileRange> ranges = new ArrayList<FileRange>();
        if (fileSize > 0x6400000L && originalBlockSize == null) {
            blockSize = 0x800000;
        }
        for (long pos = 0L; pos < fileSize; pos += (long)blockSize) {
            long count = blockSize;
            if (pos + count > fileSize) {
                count = fileSize - pos;
            }
            ranges.add(new FileRange(pos, count));
        }
        return ranges;
    }

    public Mono<Void> append(Flux<ByteBuffer> data, long fileOffset, long length) {
        try {
            return this.appendWithResponse(data, fileOffset, length, null, null).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset, long length, byte[] contentMd5, String leaseId) {
        try {
            return FluxUtil.withContext(context -> this.appendWithResponse(data, fileOffset, length, contentMd5, leaseId, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset, long length, byte[] contentMd5, String leaseId, Context context) {
        LeaseAccessConditions leaseAccessConditions = new LeaseAccessConditions().setLeaseId(leaseId);
        PathHttpHeaders headers = new PathHttpHeaders().setTransactionalContentHash(contentMd5);
        return this.dataLakeStorage.paths().appendDataWithRestResponseAsync(data, fileOffset, null, length, null, headers, leaseAccessConditions, context).map(response -> new SimpleResponse((Response)response, null));
    }

    public Mono<PathInfo> flush(long position) {
        try {
            return this.flush(position, false);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<PathInfo> flush(long position, boolean overwrite) {
        try {
            DataLakeRequestConditions requestConditions = null;
            if (!overwrite) {
                requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
            }
            return this.flushWithResponse(position, false, false, null, requestConditions).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
        try {
            return FluxUtil.withContext(context -> this.flushWithResponse(position, retainUncommittedData, close, httpHeaders, requestConditions, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions, Context context) {
        httpHeaders = httpHeaders == null ? new PathHttpHeaders() : httpHeaders;
        requestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
        LeaseAccessConditions lac = new LeaseAccessConditions().setLeaseId(requestConditions.getLeaseId());
        ModifiedAccessConditions mac = new ModifiedAccessConditions().setIfMatch(requestConditions.getIfMatch()).setIfNoneMatch(requestConditions.getIfNoneMatch()).setIfModifiedSince(requestConditions.getIfModifiedSince()).setIfUnmodifiedSince(requestConditions.getIfUnmodifiedSince());
        return this.dataLakeStorage.paths().flushDataWithRestResponseAsync(null, position, retainUncommittedData, close, 0L, null, httpHeaders, lac, mac, context).map(response -> new SimpleResponse((Response)response, (Object)new PathInfo(((PathFlushDataHeaders)response.getDeserializedHeaders()).getETag(), ((PathFlushDataHeaders)response.getDeserializedHeaders()).getLastModified())));
    }

    public Flux<ByteBuffer> read() {
        try {
            return this.readWithResponse(null, null, null, false).flatMapMany(ResponseBase::getValue);
        }
        catch (RuntimeException ex) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<FileReadAsyncResponse> readWithResponse(FileRange range, DownloadRetryOptions options, DataLakeRequestConditions requestConditions, boolean getRangeContentMd5) {
        try {
            return this.blockBlobAsyncClient.downloadWithResponse(Transforms.toBlobRange(range), Transforms.toBlobDownloadRetryOptions(options), Transforms.toBlobRequestConditions(requestConditions), getRangeContentMd5).map(Transforms::toFileReadAsyncResponse).onErrorMap(DataLakeImplUtils::transformBlobStorageException);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<PathProperties> readToFile(String filePath) {
        return this.readToFile(filePath, false);
    }

    public Mono<PathProperties> readToFile(String filePath, boolean overwrite) {
        HashSet<OpenOption> openOptions = null;
        if (overwrite) {
            openOptions = new HashSet<OpenOption>();
            openOptions.add(StandardOpenOption.CREATE);
            openOptions.add(StandardOpenOption.TRUNCATE_EXISTING);
            openOptions.add(StandardOpenOption.READ);
            openOptions.add(StandardOpenOption.WRITE);
        }
        return this.readToFileWithResponse(filePath, null, null, null, null, false, openOptions).flatMap(FluxUtil::toMono);
    }

    public Mono<Response<PathProperties>> readToFileWithResponse(String filePath, FileRange range, ParallelTransferOptions parallelTransferOptions, DownloadRetryOptions options, DataLakeRequestConditions requestConditions, boolean rangeGetContentMd5, Set<OpenOption> openOptions) {
        return this.blockBlobAsyncClient.downloadToFileWithResponse(filePath, Transforms.toBlobRange(range), Transforms.toBlobParallelTransferOptions(parallelTransferOptions), Transforms.toBlobDownloadRetryOptions(options), Transforms.toBlobRequestConditions(requestConditions), rangeGetContentMd5, openOptions).onErrorMap(DataLakeImplUtils::transformBlobStorageException).map(response -> new SimpleResponse(response, (Object)Transforms.toPathProperties((BlobProperties)response.getValue())));
    }

    public Mono<DataLakeFileAsyncClient> rename(String destinationFileSystem, String destinationPath) {
        try {
            return this.renameWithResponse(destinationFileSystem, destinationPath, null, null).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }

    public Mono<Response<DataLakeFileAsyncClient>> renameWithResponse(String destinationFileSystem, String destinationPath, DataLakeRequestConditions sourceRequestConditions, DataLakeRequestConditions destinationRequestConditions) {
        try {
            return FluxUtil.withContext(context -> this.renameWithResponse(destinationFileSystem, destinationPath, sourceRequestConditions, destinationRequestConditions, (Context)context)).map(response -> new SimpleResponse(response, (Object)new DataLakeFileAsyncClient((DataLakePathAsyncClient)response.getValue())));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)ex);
        }
    }
}

