/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.batch;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.batch.BulkExecutorDiagnosticsTracker;
import com.azure.cosmos.implementation.batch.BulkExecutorUtil;
import com.azure.cosmos.implementation.batch.CosmosItemOperationBase;
import com.azure.cosmos.implementation.batch.FlushBuffersItemOperation;
import com.azure.cosmos.implementation.batch.ItemBulkOperation;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;
import com.azure.cosmos.implementation.batch.ServerOperationBatchRequest;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosItemOperationType;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.function.Tuple2;

public final class BulkExecutor<TContext>
implements Disposable {
    private static final ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor bulkOptionsAccessor = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor();
    private static final Logger logger = LoggerFactory.getLogger(BulkExecutor.class);
    private static final AtomicLong instanceCount = new AtomicLong(0L);
    private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor clientAccessor = ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor();
    private final CosmosAsyncContainer container;
    private final int maxMicroBatchPayloadSizeInBytes;
    private final AsyncDocumentClient docClientWrapper;
    private final String operationContextText;
    private final OperationContextAndListenerTuple operationListener;
    private final ThrottlingRetryOptions throttlingRetryOptions;
    private final Flux<CosmosItemOperation> inputOperations;
    private final Long maxMicroBatchIntervalInMs;
    private final TContext batchContext;
    private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
    private final CosmosBulkExecutionOptions cosmosBulkExecutionOptions;
    private final AtomicBoolean mainSourceCompleted;
    private final AtomicBoolean isDisposed = new AtomicBoolean(false);
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final AtomicInteger totalCount;
    private static final Sinks.EmitFailureHandler serializedEmitFailureHandler = new SerializedEmitFailureHandler();
    private static final Sinks.EmitFailureHandler serializedCompleteEmitFailureHandler = new SerializedCompleteEmitFailureHandler();
    private final Sinks.Many<CosmosItemOperation> mainSink;
    private final List<FluxSink<CosmosItemOperation>> groupSinks;
    private final CosmosAsyncClient cosmosClient;
    private final String bulkSpanName;
    private final AtomicReference<Disposable> scheduledFutureForFlush;
    private final String identifier = "BulkExecutor-" + instanceCount.incrementAndGet();
    private final BulkExecutorDiagnosticsTracker diagnosticsTracker;

    public BulkExecutor(CosmosAsyncContainer container, Flux<CosmosItemOperation> inputOperations, CosmosBulkExecutionOptions cosmosBulkOptions) {
        Preconditions.checkNotNull(container, "expected non-null container");
        Preconditions.checkNotNull(inputOperations, "expected non-null inputOperations");
        Preconditions.checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");
        this.maxMicroBatchPayloadSizeInBytes = bulkOptionsAccessor.getMaxMicroBatchPayloadSizeInBytes(cosmosBulkOptions);
        this.cosmosBulkExecutionOptions = cosmosBulkOptions;
        this.container = container;
        this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
        this.inputOperations = inputOperations;
        this.docClientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(container.getDatabase());
        this.cosmosClient = ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor().getCosmosAsyncClient(container.getDatabase());
        this.throttlingRetryOptions = this.docClientWrapper.getConnectionPolicy().getThrottlingRetryOptions();
        this.maxMicroBatchIntervalInMs = bulkOptionsAccessor.getMaxMicroBatchInterval(this.cosmosBulkExecutionOptions).toMillis();
        this.batchContext = bulkOptionsAccessor.getLegacyBatchScopedContext(this.cosmosBulkExecutionOptions);
        this.partitionScopeThresholds = ImplementationBridgeHelpers.CosmosBulkExecutionThresholdsStateHelper.getBulkExecutionThresholdsAccessor().getPartitionScopeThresholds(this.cosmosBulkExecutionOptions.getThresholdsState());
        this.operationListener = bulkOptionsAccessor.getOperationContext(this.cosmosBulkExecutionOptions);
        this.operationContextText = this.operationListener != null && this.operationListener.getOperationContext() != null ? this.identifier + "[" + this.operationListener.getOperationContext().toString() + "]" : this.identifier + "[n/a]";
        this.diagnosticsTracker = bulkOptionsAccessor.getDiagnosticsTracker(cosmosBulkOptions);
        this.mainSourceCompleted = new AtomicBoolean(false);
        this.totalCount = new AtomicInteger(0);
        this.mainSink = Sinks.many().unicast().onBackpressureBuffer();
        this.groupSinks = new CopyOnWriteArrayList<FluxSink<CosmosItemOperation>>();
        this.scheduledFutureForFlush = new AtomicReference<Disposable>(CosmosSchedulers.BULK_EXECUTOR_FLUSH_BOUNDED_ELASTIC.schedulePeriodically(this::onFlush, this.maxMicroBatchIntervalInMs.longValue(), this.maxMicroBatchIntervalInMs.longValue(), TimeUnit.MILLISECONDS));
        logger.debug("Instantiated BulkExecutor, Context: {}", (Object)this.operationContextText);
    }

    public void dispose() {
        if (this.isDisposed.compareAndSet(false, true)) {
            long totalCountSnapshot = this.totalCount.get();
            if (totalCountSnapshot == 0L) {
                this.completeAllSinks();
            } else {
                this.shutdown();
            }
        }
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    private void cancelFlushTask(boolean initializeAggressiveFlush) {
        long flushIntervalAfterDrainingIncomingFlux = Math.min(this.maxMicroBatchIntervalInMs, 100L);
        Disposable newFlushTask = initializeAggressiveFlush ? CosmosSchedulers.BULK_EXECUTOR_FLUSH_BOUNDED_ELASTIC.schedulePeriodically(this::onFlush, flushIntervalAfterDrainingIncomingFlux, flushIntervalAfterDrainingIncomingFlux, TimeUnit.MILLISECONDS) : null;
        Disposable scheduledFutureSnapshot = this.scheduledFutureForFlush.getAndSet(newFlushTask);
        if (scheduledFutureSnapshot != null) {
            try {
                scheduledFutureSnapshot.dispose();
                this.logDebugOrWarning("Cancelled all future scheduled tasks {}, Context: {}", BulkExecutor.getThreadInfo(), this.operationContextText);
            }
            catch (Exception e) {
                logger.warn("Failed to cancel scheduled tasks{}, Context: {}", new Object[]{BulkExecutor.getThreadInfo(), this.operationContextText, e});
            }
        }
    }

    private void logInfoOrWarning(String msg, Object ... args) {
        if (this.diagnosticsTracker == null || !this.diagnosticsTracker.verboseLoggingAfterReEnqueueingRetriesEnabled()) {
            logger.info(msg, args);
        } else {
            logger.warn(msg, args);
        }
    }

    private void logDebugOrWarning(String msg, Object ... args) {
        if (this.diagnosticsTracker == null || !this.diagnosticsTracker.verboseLoggingAfterReEnqueueingRetriesEnabled()) {
            logger.debug(msg, args);
        } else {
            logger.warn(msg, args);
        }
    }

    private void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            this.logDebugOrWarning("Shutting down, Context: {}", this.operationContextText);
            this.groupSinks.forEach(FluxSink::complete);
            logger.debug("All group sinks completed, Context: {}", (Object)this.operationContextText);
            this.cancelFlushTask(false);
        }
    }

    public Flux<CosmosBulkOperationResponse<TContext>> execute() {
        return this.executeCore().doFinally(signal -> {
            if (signal == SignalType.ON_COMPLETE) {
                this.logDebugOrWarning("BulkExecutor.execute flux completed - # left items {}, Context: {}, {}", this.totalCount.get(), this.operationContextText, BulkExecutor.getThreadInfo());
            } else {
                int itemsLeftSnapshot = this.totalCount.get();
                if (itemsLeftSnapshot > 0) {
                    this.logInfoOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", signal, itemsLeftSnapshot, this.operationContextText, BulkExecutor.getThreadInfo());
                } else {
                    this.logDebugOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", signal, itemsLeftSnapshot, this.operationContextText, BulkExecutor.getThreadInfo());
                }
            }
            this.dispose();
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeCore() {
        Integer nullableMaxConcurrentCosmosPartitions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getMaxConcurrentCosmosPartitions(this.cosmosBulkExecutionOptions);
        Mono maxConcurrentCosmosPartitionsMono = nullableMaxConcurrentCosmosPartitions != null ? Mono.just((Object)Math.max(256, nullableMaxConcurrentCosmosPartitions)) : ImplementationBridgeHelpers.CosmosAsyncContainerHelper.getCosmosAsyncContainerAccessor().getFeedRanges(this.container, false).map(ranges -> Math.max(256, ranges.size() * 2));
        return maxConcurrentCosmosPartitionsMono.subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMapMany(maxConcurrentCosmosPartitions -> {
            this.logDebugOrWarning("BulkExecutor.execute with MaxConcurrentPartitions: {}, Context: {}", maxConcurrentCosmosPartitions, this.operationContextText);
            return this.inputOperations.publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).onErrorMap(throwable -> {
                logger.error("{}: Skipping an error operation while processing. Cause: {}, Context: {}", new Object[]{BulkExecutor.getThreadInfo(), throwable.getMessage(), this.operationContextText, throwable});
                return throwable;
            }).doOnNext(cosmosItemOperation -> {
                BulkExecutorUtil.setRetryPolicyForBulk(this.docClientWrapper, this.container, cosmosItemOperation, this.throttlingRetryOptions);
                if (cosmosItemOperation != FlushBuffersItemOperation.singleton()) {
                    this.totalCount.incrementAndGet();
                }
                logger.trace("SetupRetryPolicy, {}, TotalCount: {}, Context: {}, {}", new Object[]{BulkExecutor.getItemOperationDiagnostics(cosmosItemOperation), this.totalCount.get(), this.operationContextText, BulkExecutor.getThreadInfo()});
            }).doOnComplete(() -> {
                this.mainSourceCompleted.set(true);
                long totalCountSnapshot = this.totalCount.get();
                this.logDebugOrWarning("Main source completed - # left items {}, Context: {}", totalCountSnapshot, this.operationContextText);
                if (totalCountSnapshot == 0L) {
                    this.completeAllSinks();
                } else {
                    this.cancelFlushTask(true);
                    this.onFlush();
                    this.logDebugOrWarning("Scheduled new flush operation {}, Context: {}", BulkExecutor.getThreadInfo(), this.operationContextText);
                }
            }).mergeWith((Publisher)this.mainSink.asFlux()).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(operation -> {
                logger.trace("Before Resolve PkRangeId, {}, Context: {} {}", new Object[]{BulkExecutor.getItemOperationDiagnostics(operation), this.operationContextText, BulkExecutor.getThreadInfo()});
                return BulkExecutorUtil.resolvePartitionKeyRangeId(this.docClientWrapper, this.container, operation).map(pkRangeId -> {
                    PartitionScopeThresholds partitionScopeThresholds = this.partitionScopeThresholds.computeIfAbsent((String)pkRangeId, newPkRangeId -> new PartitionScopeThresholds((String)newPkRangeId, this.cosmosBulkExecutionOptions));
                    logger.trace("Resolved PkRangeId, {}, PKRangeId: {} Context: {} {}", new Object[]{BulkExecutor.getItemOperationDiagnostics(operation), pkRangeId, this.operationContextText, BulkExecutor.getThreadInfo()});
                    return Pair.of(partitionScopeThresholds, operation);
                });
            }).groupBy(Pair::getKey, Pair::getValue).flatMap(this::executePartitionedGroup, maxConcurrentCosmosPartitions.intValue()).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).doOnNext(requestAndResponse -> {
                int totalCountAfterDecrement = this.totalCount.decrementAndGet();
                boolean mainSourceCompletedSnapshot = this.mainSourceCompleted.get();
                if (totalCountAfterDecrement == 0 && mainSourceCompletedSnapshot) {
                    this.logDebugOrWarning("All work completed, {}, TotalCount: {}, Context: {} {}", BulkExecutor.getItemOperationDiagnostics(requestAndResponse.getOperation()), totalCountAfterDecrement, this.operationContextText, BulkExecutor.getThreadInfo());
                    this.completeAllSinks();
                } else {
                    if (totalCountAfterDecrement == 0) {
                        this.logDebugOrWarning("No Work left - but mainSource not yet completed, Context: {} {}", this.operationContextText, BulkExecutor.getThreadInfo());
                    }
                    logger.trace("Work left - TotalCount after decrement: {}, main sink completed {}, {}, Context: {} {}", new Object[]{totalCountAfterDecrement, mainSourceCompletedSnapshot, BulkExecutor.getItemOperationDiagnostics(requestAndResponse.getOperation()), this.operationContextText, BulkExecutor.getThreadInfo()});
                }
            }).doOnComplete(() -> {
                int totalCountSnapshot = this.totalCount.get();
                boolean mainSourceCompletedSnapshot = this.mainSourceCompleted.get();
                if (totalCountSnapshot == 0 && mainSourceCompletedSnapshot) {
                    this.logDebugOrWarning("DoOnComplete: All work completed, Context: {}", this.operationContextText);
                    this.completeAllSinks();
                } else {
                    this.logDebugOrWarning("DoOnComplete: Work left - TotalCount after decrement: {}, main sink completed {}, Context: {} {}", totalCountSnapshot, mainSourceCompletedSnapshot, this.operationContextText, BulkExecutor.getThreadInfo());
                }
            });
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(GroupedFlux<PartitionScopeThresholds, CosmosItemOperation> partitionedGroupFluxOfInputOperations) {
        PartitionScopeThresholds thresholds = (PartitionScopeThresholds)partitionedGroupFluxOfInputOperations.key();
        FluxProcessor groupFluxProcessor = UnicastProcessor.create().serialize();
        FluxSink groupSink = groupFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks.add((FluxSink<CosmosItemOperation>)groupSink);
        AtomicLong firstRecordTimeStamp = new AtomicLong(-1L);
        AtomicLong currentMicroBatchSize = new AtomicLong(0L);
        AtomicInteger currentTotalSerializedLength = new AtomicInteger(0);
        return partitionedGroupFluxOfInputOperations.mergeWith((Publisher)groupFluxProcessor).onBackpressureBuffer().timestamp().subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).bufferUntil(timeStampItemOperationTuple -> {
            long timestamp = (Long)timeStampItemOperationTuple.getT1();
            CosmosItemOperation itemOperation = (CosmosItemOperation)timeStampItemOperationTuple.getT2();
            logger.trace("BufferUntil - enqueued {}, {}, Context: {} {}", new Object[]{timestamp, BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo()});
            if (itemOperation == FlushBuffersItemOperation.singleton()) {
                long currentMicroBatchSizeSnapshot = currentMicroBatchSize.get();
                if (currentMicroBatchSizeSnapshot > 0L) {
                    logger.trace("Flushing PKRange {} (batch size: {}) due to FlushItemOperation, Context: {} {}", new Object[]{thresholds.getPartitionKeyRangeId(), currentMicroBatchSizeSnapshot, this.operationContextText, BulkExecutor.getThreadInfo()});
                    firstRecordTimeStamp.set(-1L);
                    currentMicroBatchSize.set(0L);
                    currentTotalSerializedLength.set(0);
                    return true;
                }
                return false;
            }
            firstRecordTimeStamp.compareAndSet(-1L, timestamp);
            long age = timestamp - firstRecordTimeStamp.get();
            long batchSize = currentMicroBatchSize.incrementAndGet();
            int totalSerializedLength = this.calculateTotalSerializedLength(currentTotalSerializedLength, itemOperation);
            if (batchSize >= (long)thresholds.getTargetMicroBatchSizeSnapshot() || age >= this.maxMicroBatchIntervalInMs || totalSerializedLength >= this.maxMicroBatchPayloadSizeInBytes) {
                this.logDebugOrWarning("BufferUntil - Flushing PKRange {} due to BatchSize ({}), payload size ({}) or age ({}), Triggering {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), batchSize, totalSerializedLength, age, BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
                firstRecordTimeStamp.set(-1L);
                currentMicroBatchSize.set(0L);
                currentTotalSerializedLength.set(0);
                return true;
            }
            return false;
        }).flatMap(timeStampAndItemOperationTuples -> {
            ArrayList<CosmosItemOperation> operations = new ArrayList<CosmosItemOperation>(timeStampAndItemOperationTuples.size());
            for (Tuple2 timeStampAndItemOperationTuple : timeStampAndItemOperationTuples) {
                CosmosItemOperation itemOperation = (CosmosItemOperation)timeStampAndItemOperationTuple.getT2();
                if (itemOperation == FlushBuffersItemOperation.singleton()) continue;
                operations.add(itemOperation);
            }
            this.logDebugOrWarning("Flushing PKRange {} micro batch with {} operations,  Context: {} {}", thresholds.getPartitionKeyRangeId(), operations.size(), this.operationContextText, BulkExecutor.getThreadInfo());
            return this.executeOperations(operations, thresholds, (FluxSink<CosmosItemOperation>)groupSink);
        }, ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getMaxMicroBatchConcurrency(this.cosmosBulkExecutionOptions));
    }

    private int calculateTotalSerializedLength(AtomicInteger currentTotalSerializedLength, CosmosItemOperation item) {
        if (item instanceof CosmosItemOperationBase) {
            return currentTotalSerializedLength.accumulateAndGet(((CosmosItemOperationBase)item).getSerializedLength(), Integer::sum);
        }
        return currentTotalSerializedLength.get();
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(List<CosmosItemOperation> operations, PartitionScopeThresholds thresholds, FluxSink<CosmosItemOperation> groupSink) {
        if (operations.size() == 0) {
            logger.trace("Empty operations list, Context: {}", (Object)this.operationContextText);
            return Flux.empty();
        }
        String pkRange = thresholds.getPartitionKeyRangeId();
        ServerOperationBatchRequest serverOperationBatchRequest = BulkExecutorUtil.createBatchRequest(operations, pkRange, this.maxMicroBatchPayloadSizeInBytes);
        if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
            serverOperationBatchRequest.getBatchPendingOperations().forEach(arg_0 -> groupSink.next(arg_0));
        }
        return Flux.just((Object)serverOperationBatchRequest.getBatchRequest()).publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(serverRequest -> this.executePartitionKeyRangeServerBatchRequest((PartitionKeyRangeServerBatchRequest)serverRequest, groupSink, thresholds));
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionKeyRangeServerBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest, FluxSink<CosmosItemOperation> groupSink, PartitionScopeThresholds thresholds) {
        return this.executeBatchRequest(serverRequest).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMapMany(response -> {
            if (this.diagnosticsTracker != null && response.getDiagnostics() != null) {
                this.diagnosticsTracker.trackDiagnostics(response.getDiagnostics().getDiagnosticsContext());
            }
            return Flux.fromIterable(response.getResults()).publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(result -> this.handleTransactionalBatchOperationResult((CosmosBatchResponse)response, (CosmosBatchOperationResult)result, groupSink, thresholds));
        }).onErrorResume(throwable -> {
            if (!(throwable instanceof Exception)) {
                throw Exceptions.propagate((Throwable)throwable);
            }
            Exception exception = (Exception)throwable;
            return Flux.fromIterable(serverRequest.getOperations()).publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(itemOperation -> this.handleTransactionalBatchExecutionException((CosmosItemOperation)itemOperation, exception, groupSink, thresholds));
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOperationResult(CosmosBatchResponse response, CosmosBatchOperationResult operationResult, FluxSink<CosmosItemOperation> groupSink, PartitionScopeThresholds thresholds) {
        CosmosBulkItemResponse cosmosBulkItemResponse = ModelBridgeInternal.createCosmosBulkItemResponse(operationResult, response);
        CosmosItemOperation itemOperation = operationResult.getOperation();
        TContext actualContext = this.getActualContext(itemOperation);
        this.logDebugOrWarning("HandleTransactionalBatchOperationResult - PKRange {}, Response Status Code {}, Operation Status Code, {}, {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), response.getStatusCode(), operationResult.getStatusCode(), BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
        if (!operationResult.isSuccessStatusCode()) {
            if (itemOperation instanceof ItemBulkOperation) {
                ItemBulkOperation itemBulkOperation = (ItemBulkOperation)itemOperation;
                return itemBulkOperation.getRetryPolicy().shouldRetry(operationResult).flatMap(result -> {
                    if (result.shouldRetry) {
                        this.logDebugOrWarning("HandleTransactionalBatchOperationResult - enqueue retry, PKRange {}, Response Status Code {}, Operation Status Code, {}, {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), response.getStatusCode(), operationResult.getStatusCode(), BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
                        return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation, thresholds);
                    }
                    if (response.getStatusCode() == 409 || response.getStatusCode() == 412) {
                        this.logDebugOrWarning("HandleTransactionalBatchOperationResult - Fail, PKRange {}, Response Status Code {}, Operation Status Code {}, {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), response.getStatusCode(), operationResult.getStatusCode(), BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
                    } else {
                        logger.error("HandleTransactionalBatchOperationResult - Fail, PKRange {}, Response Status Code {}, Operation Status Code {}, {}, Context: {} {}", new Object[]{thresholds.getPartitionKeyRangeId(), response.getStatusCode(), operationResult.getStatusCode(), BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo()});
                    }
                    return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(itemOperation, cosmosBulkItemResponse, actualContext));
                });
            }
            throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
        }
        thresholds.recordSuccessfulOperation();
        return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(itemOperation, cosmosBulkItemResponse, actualContext));
    }

    private TContext getActualContext(CosmosItemOperation itemOperation) {
        ItemBulkOperation itemBulkOperation = null;
        if (itemOperation instanceof ItemBulkOperation) {
            itemBulkOperation = (ItemBulkOperation)itemOperation;
        }
        if (itemBulkOperation == null) {
            return this.batchContext;
        }
        Object operationContext = itemBulkOperation.getContext();
        if (operationContext != null) {
            return (TContext)operationContext;
        }
        return this.batchContext;
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExecutionException(CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink, PartitionScopeThresholds thresholds) {
        this.logDebugOrWarning("HandleTransactionalBatchExecutionException, PKRange {}, Error: {}, {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), exception, BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
        if (exception instanceof CosmosException && itemOperation instanceof ItemBulkOperation) {
            CosmosException cosmosException = (CosmosException)((Object)exception);
            ItemBulkOperation itemBulkOperation = (ItemBulkOperation)itemOperation;
            return itemBulkOperation.getRetryPolicy().shouldRetryForGone(cosmosException.getStatusCode(), cosmosException.getSubStatusCode(), itemBulkOperation, cosmosException).flatMap(shouldRetryGone -> {
                if (shouldRetryGone.booleanValue()) {
                    this.logDebugOrWarning("HandleTransactionalBatchExecutionException - Retry due to split, PKRange {}, Error: {}, {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), exception, BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
                    this.mainSink.emitNext((Object)itemOperation, serializedEmitFailureHandler);
                    return Mono.empty();
                }
                this.logDebugOrWarning("HandleTransactionalBatchExecutionException - Retry other, PKRange {}, Error: {}, {}, Context: {} {}", thresholds.getPartitionKeyRangeId(), exception, BulkExecutor.getItemOperationDiagnostics(itemOperation), this.operationContextText, BulkExecutor.getThreadInfo());
                return this.retryOtherExceptions(itemOperation, exception, groupSink, cosmosException, itemBulkOperation, thresholds);
            });
        }
        TContext actualContext = this.getActualContext(itemOperation);
        return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, actualContext));
    }

    private Mono<CosmosBulkOperationResponse<TContext>> enqueueForRetry(Duration backOffTime, FluxSink<CosmosItemOperation> groupSink, CosmosItemOperation itemOperation, PartitionScopeThresholds thresholds) {
        thresholds.recordEnqueuedRetry();
        if (backOffTime == null || backOffTime.isZero()) {
            groupSink.next((Object)itemOperation);
            return Mono.empty();
        }
        return Mono.delay((Duration)backOffTime).flatMap(dummy -> {
            groupSink.next((Object)itemOperation);
            return Mono.empty();
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink, CosmosException cosmosException, ItemBulkOperation<?, ?> itemBulkOperation, PartitionScopeThresholds thresholds) {
        TContext actualContext = this.getActualContext(itemOperation);
        return itemBulkOperation.getRetryPolicy().shouldRetry((Exception)((Object)cosmosException)).flatMap(result -> {
            if (result.shouldRetry) {
                return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation, thresholds);
            }
            return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, actualContext));
        });
    }

    private Mono<CosmosBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {
        RequestOptions options = new RequestOptions();
        options.setThroughputControlGroupName(this.cosmosBulkExecutionOptions.getThroughputControlGroupName());
        options.setExcludeRegions(this.cosmosBulkExecutionOptions.getExcludedRegions());
        Map<String, String> customOptions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getCustomOptions(this.cosmosBulkExecutionOptions);
        if (customOptions != null && !customOptions.isEmpty()) {
            for (Map.Entry<String, String> entry : customOptions.entrySet()) {
                options.setHeader(entry.getKey(), entry.getValue());
            }
        }
        options.setOperationContextAndListenerTuple(this.operationListener);
        if (!this.docClientWrapper.isContentResponseOnWriteEnabled() && serverRequest.getOperations().size() > 0) {
            for (CosmosItemOperation itemOperation : serverRequest.getOperations()) {
                ItemBulkOperation itemBulkOperation;
                if (!(itemOperation instanceof ItemBulkOperation) || (itemBulkOperation = (ItemBulkOperation)itemOperation).getOperationType() != CosmosItemOperationType.READ && (itemBulkOperation.getRequestOptions() == null || itemBulkOperation.getRequestOptions().isContentResponseOnWriteEnabled() == null || !itemBulkOperation.getRequestOptions().isContentResponseOnWriteEnabled().booleanValue())) continue;
                options.setContentResponseOnWriteEnabled(true);
                break;
            }
        }
        return FluxUtil.withContext(context -> {
            Mono<CosmosBatchResponse> responseMono = this.docClientWrapper.executeBatchRequest(BridgeInternal.getLink(this.container), serverRequest, options, false);
            return clientAccessor.getDiagnosticsProvider(this.cosmosClient).traceEnabledBatchResponsePublisher(responseMono, (Context)context, this.bulkSpanName, this.container.getDatabase().getId(), this.container.getId(), this.cosmosClient, options.getConsistencyLevel(), OperationType.Batch, ResourceType.Document, options, this.cosmosBulkExecutionOptions.getMaxMicroBatchSize());
        });
    }

    private void completeAllSinks() {
        this.logInfoOrWarning("Closing all sinks, Context: {}", this.operationContextText);
        logger.debug("Executor service shut down, Context: {}", (Object)this.operationContextText);
        this.mainSink.emitComplete(serializedCompleteEmitFailureHandler);
        this.shutdown();
    }

    private void onFlush() {
        try {
            this.groupSinks.forEach(sink -> sink.next((Object)FlushBuffersItemOperation.singleton()));
        }
        catch (Throwable t) {
            logger.error("Callback invocation 'onFlush' failed. Context: {}", (Object)this.operationContextText, (Object)t);
        }
    }

    private static String getItemOperationDiagnostics(CosmosItemOperation operation) {
        if (operation == FlushBuffersItemOperation.singleton()) {
            return "ItemOperation[Type: Flush]";
        }
        return "ItemOperation[Type: " + operation.getOperationType().toString() + ", PK: " + (operation.getPartitionKeyValue() != null ? operation.getPartitionKeyValue().toString() : "n/a") + ", id: " + operation.getId() + "]";
    }

    private static String getThreadInfo() {
        StringBuilder sb = new StringBuilder();
        Thread t = Thread.currentThread();
        sb.append("Thread[").append("Name: ").append(t.getName()).append(",Group: ").append(t.getThreadGroup() != null ? t.getThreadGroup().getName() : "n/a").append(", isDaemon: ").append(t.isDaemon()).append(", Id: ").append(t.getId()).append("]");
        return sb.toString();
    }

    private static class SerializedEmitFailureHandler
    implements Sinks.EmitFailureHandler {
        private SerializedEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            if (emitResult.equals((Object)Sinks.EmitResult.FAIL_NON_SERIALIZED)) {
                logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
                return true;
            }
            logger.error("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
            return false;
        }
    }

    private static class SerializedCompleteEmitFailureHandler
    implements Sinks.EmitFailureHandler {
        private SerializedCompleteEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            if (emitResult.equals((Object)Sinks.EmitResult.FAIL_NON_SERIALIZED)) {
                logger.debug("SerializedCompleteEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
                return true;
            }
            if (emitResult == Sinks.EmitResult.FAIL_CANCELLED || emitResult == Sinks.EmitResult.FAIL_TERMINATED) {
                logger.debug("SerializedCompleteEmitFailureHandler.onEmitFailure - Main sink already completed, Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
                return false;
            }
            logger.error("SerializedCompleteEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", (Object)signalType, (Object)emitResult);
            return false;
        }
    }
}

