/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.IDocumentQueryClient;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.implementation.query.TriFunction;
import com.azure.cosmos.implementation.query.metrics.ClientSideMetrics;
import com.azure.cosmos.implementation.query.metrics.FetchExecutionRangeAccumulator;
import com.azure.cosmos.implementation.query.metrics.SchedulingStopwatch;
import com.azure.cosmos.implementation.query.metrics.SchedulingTimeSpan;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class DocumentProducer<T> {
    private static final ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.CosmosQueryRequestOptionsAccessor qryOptionsAccessor = ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.getCosmosQueryRequestOptionsAccessor();
    private static final Logger logger = LoggerFactory.getLogger(DocumentProducer.class);
    private int retries;
    protected final IDocumentQueryClient client;
    protected final Supplier<String> operationContextTextProvider;
    protected final String collectionRid;
    protected final CosmosQueryRequestOptions cosmosQueryRequestOptions;
    protected final Class<T> resourceType;
    protected final String collectionLink;
    protected final TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc;
    protected final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeRequestFuncWithRetries;
    protected final Supplier<DocumentClientRetryPolicy> createRetryPolicyFunc;
    protected final int pageSize;
    protected final UUID correlatedActivityId;
    public int top;
    private volatile String lastResponseContinuationToken;
    private final SchedulingStopwatch fetchSchedulingMetrics;
    private final FetchExecutionRangeAccumulator fetchExecutionRangeAccumulator;
    protected FeedRangeEpkImpl feedRange;

    public DocumentProducer(IDocumentQueryClient client, String collectionResourceId, CosmosQueryRequestOptions cosmosQueryRequestOptions, TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeRequestFunc, String collectionLink, Supplier<DocumentClientRetryPolicy> createRetryPolicyFunc, Class<T> resourceType, UUID correlatedActivityId, int initialPageSize, String initialContinuationToken, int top, FeedRangeEpkImpl feedRange, Supplier<String> operationContextTextProvider) {
        this.client = client;
        this.collectionRid = collectionResourceId;
        this.createRequestFunc = createRequestFunc;
        this.fetchSchedulingMetrics = new SchedulingStopwatch();
        this.fetchSchedulingMetrics.ready();
        this.fetchExecutionRangeAccumulator = new FetchExecutionRangeAccumulator(feedRange.getRange().toString());
        this.operationContextTextProvider = operationContextTextProvider;
        BiFunction<Supplier, RxDocumentServiceRequest, Mono> executeFeedOperationCore = (clientRetryPolicyFactory, request) -> {
            DocumentClientRetryPolicy finalRetryPolicy = (DocumentClientRetryPolicy)clientRetryPolicyFactory.get();
            return ObservableHelper.inlineIfPossibleAsObs(() -> {
                if (finalRetryPolicy != null) {
                    finalRetryPolicy.onBeforeSendRequest((RxDocumentServiceRequest)request);
                }
                ++this.retries;
                return (Mono)executeRequestFunc.apply((RxDocumentServiceRequest)request);
            }, finalRetryPolicy);
        };
        this.correlatedActivityId = correlatedActivityId;
        this.cosmosQueryRequestOptions = cosmosQueryRequestOptions != null ? qryOptionsAccessor.clone(cosmosQueryRequestOptions) : new CosmosQueryRequestOptions();
        ModelBridgeInternal.setQueryRequestOptionsContinuationToken(this.cosmosQueryRequestOptions, initialContinuationToken);
        this.executeRequestFuncWithRetries = request -> {
            this.retries = -1;
            this.fetchSchedulingMetrics.start();
            this.fetchExecutionRangeAccumulator.beginFetchRange();
            return this.client.executeFeedOperationWithAvailabilityStrategy(ResourceType.Document, OperationType.Query, () -> DocumentProducer.lambda$new$2((Supplier)createRetryPolicyFunc), (RxDocumentServiceRequest)request, executeFeedOperationCore);
        };
        this.lastResponseContinuationToken = initialContinuationToken;
        this.resourceType = resourceType;
        this.collectionLink = collectionLink;
        this.createRetryPolicyFunc = createRetryPolicyFunc;
        this.pageSize = initialPageSize;
        this.top = top;
        this.feedRange = feedRange;
    }

    public Flux<DocumentProducerFeedResponse> produceAsync() {
        BiFunction<String, Integer, RxDocumentServiceRequest> sourcePartitionCreateRequestFunc = (token, maxItemCount) -> this.createRequestFunc.apply(this.feedRange, (String)token, (Integer)maxItemCount);
        Flux obs = Paginator.getPaginatedQueryResultAsObservable(this.lastResponseContinuationToken, sourcePartitionCreateRequestFunc, this.executeRequestFuncWithRetries, this.top, this.pageSize, Paginator.getPreFetchCount(this.cosmosQueryRequestOptions, this.top, this.pageSize), qryOptionsAccessor.getImpl(this.cosmosQueryRequestOptions).getOperationContextAndListenerTuple(), qryOptionsAccessor.getCancelledRequestDiagnosticsTracker(this.cosmosQueryRequestOptions)).map(rsp -> {
            this.lastResponseContinuationToken = rsp.getContinuationToken();
            this.fetchExecutionRangeAccumulator.endFetchRange(rsp.getActivityId(), rsp.getResults().size(), this.retries);
            this.fetchSchedulingMetrics.stop();
            return rsp;
        });
        return this.feedRangeGoneProof((Flux<DocumentProducerFeedResponse>)obs.map(x$0 -> new DocumentProducerFeedResponse(x$0)));
    }

    private Flux<DocumentProducerFeedResponse> feedRangeGoneProof(Flux<DocumentProducerFeedResponse> sourceFeedResponseObservable) {
        return sourceFeedResponseObservable.onErrorResume(t -> {
            CosmosException dce = Utils.as(t, CosmosException.class);
            if (dce == null || !this.isSplitOrMerge(dce)) {
                logger.error("Unexpected failure, Context: {}", (Object)this.operationContextTextProvider.get(), t);
                return Flux.error((Throwable)t);
            }
            logger.debug("DocumentProducer handling a partition gone in [{}], detail:[{}], Context: {}", new Object[]{this.feedRange, dce, this.operationContextTextProvider.get()});
            Mono<Utils.ValueHolder<List<PartitionKeyRange>>> replacementRangesObs = this.getReplacementRanges(this.feedRange.getRange());
            Flux replacementProducers = replacementRangesObs.flux().flatMap(partitionKeyRangesValueHolder -> {
                if (partitionKeyRangesValueHolder == null || partitionKeyRangesValueHolder.v == null || ((List)partitionKeyRangesValueHolder.v).size() == 0) {
                    logger.error("Failed to find at least one child range");
                    return Mono.error((Throwable)new IllegalStateException("Failed to find at least one child range"));
                }
                if (((List)partitionKeyRangesValueHolder.v).size() == 1) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Cross Partition Query Execution detected partition gone due to merge for feedRange [{}] with continuationToken [{}]", (Object)this.feedRange, (Object)this.lastResponseContinuationToken);
                    }
                    return Mono.just((Object)this);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions, last continuation token is [{}]. - Context: {}", new Object[]{this.feedRange, ((List)partitionKeyRangesValueHolder.v).stream().map(JsonSerializable::toJson).collect(Collectors.joining(", ")), this.lastResponseContinuationToken, this.operationContextTextProvider.get()});
                }
                return Flux.fromIterable(this.createReplacingDocumentProducersOnSplit((List)partitionKeyRangesValueHolder.v));
            });
            return this.produceOnFeedRangeGone(replacementProducers);
        });
    }

    protected Flux<DocumentProducerFeedResponse> produceOnFeedRangeGone(Flux<DocumentProducer<T>> replacingDocumentProducers) {
        return replacingDocumentProducers.flatMap(DocumentProducer::produceAsync, 1);
    }

    private List<DocumentProducer<T>> createReplacingDocumentProducersOnSplit(List<PartitionKeyRange> partitionKeyRanges) {
        ArrayList<DocumentProducer<T>> replacingDocumentProducers = new ArrayList<DocumentProducer<T>>(partitionKeyRanges.size());
        for (PartitionKeyRange pkr : partitionKeyRanges) {
            replacingDocumentProducers.add(this.createChildDocumentProducerOnSplit(pkr, this.lastResponseContinuationToken));
        }
        return replacingDocumentProducers;
    }

    protected DocumentProducer<T> createChildDocumentProducerOnSplit(PartitionKeyRange targetRange, String initialContinuationToken) {
        return new DocumentProducer<T>(this.client, this.collectionRid, this.cosmosQueryRequestOptions, this.createRequestFunc, this.executeRequestFuncWithRetries, this.collectionLink, null, this.resourceType, this.correlatedActivityId, this.pageSize, initialContinuationToken, this.top, new FeedRangeEpkImpl(targetRange.toRange()), this.operationContextTextProvider);
    }

    private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> getReplacementRanges(Range<String> range) {
        return this.client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(null, this.collectionRid, range, true, ModelBridgeInternal.getPropertiesFromQueryRequestOptions(this.cosmosQueryRequestOptions));
    }

    private boolean isSplitOrMerge(CosmosException e) {
        return Exceptions.isPartitionSplitOrMerge(e);
    }

    private static /* synthetic */ DocumentClientRetryPolicy lambda$new$2(Supplier createRetryPolicyFunc) {
        if (createRetryPolicyFunc != null) {
            return (DocumentClientRetryPolicy)createRetryPolicyFunc.get();
        }
        return null;
    }

    class DocumentProducerFeedResponse {
        FeedResponse<T> pageResult;
        FeedRangeEpkImpl sourceFeedRange;

        DocumentProducerFeedResponse(FeedResponse<T> pageResult) {
            this.pageResult = pageResult;
            this.sourceFeedRange = DocumentProducer.this.feedRange;
            this.populatePartitionedQueryMetrics();
        }

        DocumentProducerFeedResponse(FeedResponse<T> pageResult, FeedRange feedRange) {
            this.pageResult = pageResult;
            this.sourceFeedRange = (FeedRangeEpkImpl)feedRange;
            this.populatePartitionedQueryMetrics();
        }

        void populatePartitionedQueryMetrics() {
            String queryMetricsDelimitedString = this.pageResult.getResponseHeaders().get("x-ms-documentdb-query-metrics");
            if (!StringUtils.isEmpty(queryMetricsDelimitedString)) {
                queryMetricsDelimitedString = queryMetricsDelimitedString + String.format(Locale.ROOT, ";%s=%.2f", "requestCharge", this.pageResult.getRequestCharge());
                ImmutablePair<String, SchedulingTimeSpan> schedulingTimeSpanMap = new ImmutablePair<String, SchedulingTimeSpan>(DocumentProducer.this.feedRange.getRange().toString(), DocumentProducer.this.fetchSchedulingMetrics.getElapsedTime());
                QueryMetrics qm = BridgeInternal.createQueryMetricsFromDelimitedStringAndClientSideMetrics(queryMetricsDelimitedString, new ClientSideMetrics(DocumentProducer.this.retries, this.pageResult.getRequestCharge(), DocumentProducer.this.fetchExecutionRangeAccumulator.getExecutionRanges(), Collections.singletonList(schedulingTimeSpanMap)), this.pageResult.getActivityId(), this.pageResult.getResponseHeaders().getOrDefault("x-ms-cosmos-index-utilization", null));
                String pkrId = this.pageResult.getResponseHeaders().get("x-ms-documentdb-partitionkeyrangeid");
                String queryMetricKey = DocumentProducer.this.feedRange.getRange().toString() + ",pkrId:" + pkrId;
                BridgeInternal.putQueryMetricsIntoMap(this.pageResult, queryMetricKey, qm);
            }
        }
    }
}

