/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.documentdb.internal.query;

import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentQueryClientInternal;
import com.microsoft.azure.documentdb.FeedOptions;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.SqlQuerySpec;
import com.microsoft.azure.documentdb.internal.DocumentServiceRequest;
import com.microsoft.azure.documentdb.internal.DocumentServiceResponse;
import com.microsoft.azure.documentdb.internal.RequestChargeTracker;
import com.microsoft.azure.documentdb.internal.ResourceType;
import com.microsoft.azure.documentdb.internal.Utils;
import com.microsoft.azure.documentdb.internal.query.AbstractQueryExecutionContext;
import com.microsoft.azure.documentdb.internal.query.DocumentProducer;
import com.microsoft.azure.documentdb.internal.query.ExceptionHelper;
import com.microsoft.azure.documentdb.internal.query.FetchScheduler;
import com.microsoft.azure.documentdb.internal.query.PartitionedQueryExecutionInfo;
import com.microsoft.azure.documentdb.internal.query.funcs.Callback3;
import com.microsoft.azure.documentdb.internal.query.funcs.Func1;
import com.microsoft.azure.documentdb.internal.query.funcs.Func2;
import com.microsoft.azure.documentdb.internal.routing.Range;
import com.microsoft.azure.documentdb.internal.routing.RoutingMapProvider;
import com.microsoft.azure.documentdb.internal.routing.RoutingMapProviderHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class ParallelDocumentQueryExecutionContextBase<T extends Document>
extends AbstractQueryExecutionContext<T> {
    private static final int NUMBER_OF_NETWORK_CALLS_PER_PROCESSORS = 10;
    private static final int DEFAULT_MAX_BUFFER_SIZE = 1000;
    private static final String FORMAT_PLACE_HOLDER = "{documentdb-formattableorderbyquery-filter}";
    protected final Comparator<DocumentProducer<T>> defaultComparator = new Comparator<DocumentProducer<T>>(){

        @Override
        public int compare(DocumentProducer<T> producer1, DocumentProducer<T> producer2) {
            return producer1.getTargetRange().getMinInclusive().compareTo(producer2.getTargetRange().getMinInclusive());
        }
    };
    private final Class<T> documentProducerClassT;
    private final FetchScheduler fetchScheduler;
    protected final Logger LOGGER;
    protected final String collectionSelfLink;
    private final Func1<DocumentServiceRequest, DocumentServiceResponse> executeFunc;
    protected Vector<DocumentProducer<T>> documentProducers;
    private int maxDegreeOfParallelism;
    protected final RequestChargeTracker chargeTracker;
    protected Future<Void> initializationFuture;
    protected final ExecutorService executorService;
    protected boolean shouldPrefetch;
    private final Callback3<DocumentProducer<T>, Integer, Double> fetchCompletionCallback;
    private final int actualMaxBufferedItemCount;
    private final AtomicInteger totalBufferedItems;

    public ParallelDocumentQueryExecutionContextBase(DocumentQueryClientInternal client, String collectionSelfLink, SqlQuerySpec querySpec, FeedOptions options, String resourceLink, PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, Class<T> documentProducerClassT) {
        super(client, ResourceType.Document, documentProducerClassT, partitionedQueryExecutionInfo.getQueryInfo().hasRewrittenQuery() ? new SqlQuerySpec(partitionedQueryExecutionInfo.getQueryInfo().getRewrittenQuery().replace(FORMAT_PLACE_HOLDER, "true"), querySpec.getParameters()) : querySpec, options, resourceLink);
        this.collectionSelfLink = collectionSelfLink;
        Collection<PartitionKeyRange> ranges = this.getTargetPartitionKeyRanges(partitionedQueryExecutionInfo.getQueryRanges());
        this.documentProducers = new Vector(ranges.size());
        this.executorService = client.getExecutorService();
        this.documentProducerClassT = documentProducerClassT;
        this.LOGGER = LoggerFactory.getLogger(this.getClass());
        boolean bl = this.shouldPrefetch = options.getMaxDegreeOfParallelism() != 0;
        if (options.getMaxDegreeOfParallelism() >= 0) {
            this.maxDegreeOfParallelism = Math.min(ranges.size(), options.getMaxDegreeOfParallelism());
        } else {
            int cores = Utils.getConcurrencyFactor();
            this.maxDegreeOfParallelism = Math.min(ranges.size(), 10 * cores);
        }
        this.maxDegreeOfParallelism = Math.max(this.maxDegreeOfParallelism, 1);
        this.fetchScheduler = new FetchScheduler(this.executorService, this.maxDegreeOfParallelism);
        this.executeFunc = new Func1<DocumentServiceRequest, DocumentServiceResponse>(){

            @Override
            public DocumentServiceResponse apply(DocumentServiceRequest request) throws DocumentClientException {
                return ParallelDocumentQueryExecutionContextBase.this.executeRequest(request);
            }
        };
        this.chargeTracker = new RequestChargeTracker();
        this.actualMaxBufferedItemCount = Math.max(options.getMaxBufferedItemCount(), 1000);
        this.totalBufferedItems = new AtomicInteger(0);
        this.fetchCompletionCallback = new Callback3<DocumentProducer<T>, Integer, Double>(){

            @Override
            public void run(DocumentProducer<T> producer, Integer cnt, Double requestCharge) throws Exception {
                ParallelDocumentQueryExecutionContextBase.this.chargeTracker.addCharge(requestCharge);
                ParallelDocumentQueryExecutionContextBase.this.totalBufferedItems.addAndGet(cnt);
                if (!producer.fetchedAll() && ParallelDocumentQueryExecutionContextBase.this.shouldPrefetch && ParallelDocumentQueryExecutionContextBase.this.actualMaxBufferedItemCount - ParallelDocumentQueryExecutionContextBase.this.totalBufferedItems.get() > 0) {
                    producer.tryScheduleFetch();
                }
            }
        };
    }

    protected Future<Void> initializeAsync(PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, final int initialPageSize, final Class<T> documentProducerClassT, final Collection<PartitionKeyRange> ranges, FeedOptions options) throws InterruptedException, ExecutionException, Exception {
        final ParallelDocumentQueryExecutionContextBase that = this;
        Callable<Void> callable = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                for (final PartitionKeyRange range : ranges) {
                    DocumentProducer docProducer = new DocumentProducer(that.executeFunc, new Func2<String, Integer, DocumentServiceRequest>(){

                        @Override
                        public DocumentServiceRequest apply(String continuationToken, Integer pageSize) {
                            DocumentServiceRequest request = that.createRequest(that.getFeedHeaders(that.options), that.querySpec, range.getId());
                            request.getHeaders().put("x-ms-max-item-count", Integer.toString(pageSize));
                            request.getHeaders().put("x-ms-continuation", continuationToken);
                            return request;
                        }
                    }, range, documentProducerClassT, that.fetchScheduler, initialPageSize, null, ParallelDocumentQueryExecutionContextBase.this.fetchCompletionCallback);
                    if (that.shouldPrefetch) {
                        docProducer.tryScheduleFetch();
                    }
                    that.documentProducers.add(docProducer);
                }
                return null;
            }
        };
        return this.executorService.submit(callable);
    }

    protected Collection<PartitionKeyRange> getTargetPartitionKeyRanges(List<Range<String>> providedRanges) {
        return RoutingMapProviderHelper.getOverlappingRanges(this.client.getPartitionKeyRangeCache(), this.collectionSelfLink, providedRanges);
    }

    protected void finalize() throws Throwable {
        this.fetchScheduler.stop();
        this.initializationFuture.cancel(true);
        this.notifyStopDocumentProducers();
    }

    @Override
    public List<T> fetchNextBlock() throws DocumentClientException {
        throw new UnsupportedOperationException("fetchNextBlock");
    }

    @Override
    public boolean hasNext() {
        try {
            this.initializationFuture.get();
        }
        catch (InterruptedException | ExecutionException e) {
            this.LOGGER.warn("Failed to initialize. ", (Throwable)e);
            throw ExceptionHelper.toRuntimeException(ExceptionHelper.unwrap(e));
        }
        return this.hasNextInternal();
    }

    public abstract T nextInternal() throws Exception;

    @Override
    public T next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException("next");
        }
        try {
            this.initializationFuture.get();
        }
        catch (InterruptedException | ExecutionException e) {
            this.LOGGER.warn("Failed to initialize. ", (Throwable)e);
            throw ExceptionHelper.toRuntimeException(ExceptionHelper.unwrap(e));
        }
        if (this.responseHeaders == null) {
            this.responseHeaders = new HashMap<String, String>();
        }
        try {
            T result = this.nextInternal();
            this.totalBufferedItems.decrementAndGet();
            return result;
        }
        catch (NoSuchElementException e) {
            throw e;
        }
        catch (Exception e) {
            throw ExceptionHelper.toRuntimeException(ExceptionHelper.unwrap(e));
        }
    }

    protected List<PartitionKeyRange> getReplacementRanges(PartitionKeyRange targetRange, String collectionSelfLink) {
        RoutingMapProvider routingMapProvider = this.client.getPartitionKeyRangeCache();
        ArrayList<PartitionKeyRange> replacementRanges = Collections.list(Collections.enumeration(routingMapProvider.getOverlappingRanges(collectionSelfLink, targetRange.toRange(), true)));
        String replaceMinInclusive = ((PartitionKeyRange)replacementRanges.get(0)).getMinInclusive();
        String replaceMaxExclusive = ((PartitionKeyRange)replacementRanges.get(replacementRanges.size() - 1)).getMaxExclusive();
        if (!replaceMinInclusive.equals(targetRange.getMinInclusive()) || !replaceMaxExclusive.equals(targetRange.getMaxExclusive())) {
            throw new IllegalStateException(String.format("Target range and Replacement range has mismatched min/max. Target range: [%s, %s). Replacement range: [%s, %s).", targetRange.getMinInclusive(), targetRange.getMaxExclusive(), replaceMinInclusive, replaceMaxExclusive));
        }
        return replacementRanges;
    }

    private boolean needPartitionKeyRangeCacheRefresh(Exception ex) {
        Throwable t = ExceptionHelper.unwrap(ex);
        if (t instanceof DocumentClientException) {
            DocumentClientException clientException = (DocumentClientException)t;
            return clientException.getStatusCode() == 410 && clientException.getSubStatusCode() != null && clientException.getSubStatusCode() == 1002;
        }
        return false;
    }

    protected boolean tryMoveNextProducer(DocumentProducer<T> producer, Func1<DocumentProducer<T>, DocumentProducer<T>> producerRepairCallback) throws Exception {
        boolean movedNext = false;
        DocumentProducer<T> currentProducer = producer;
        while (true) {
            boolean needRefreshedPartitionKeyRangeCache = false;
            try {
                movedNext = currentProducer.moveNext();
            }
            catch (Exception ex) {
                needRefreshedPartitionKeyRangeCache = this.needPartitionKeyRangeCacheRefresh(ex);
                if (!needRefreshedPartitionKeyRangeCache) {
                    throw ex;
                }
                this.LOGGER.debug("Encountered exception when moving to the next document producer", (Throwable)ex);
            }
            if (!needRefreshedPartitionKeyRangeCache) break;
            currentProducer = producerRepairCallback.apply(currentProducer);
        }
        return movedNext;
    }

    private FeedOptions getFeedOptions(String continuationToken) {
        FeedOptions options = new FeedOptions((FeedOptions)this.options);
        options.setRequestContinuation(continuationToken);
        return options;
    }

    protected void repairContext(String collectionRid, int currentDocumentProducerIndex, Comparator<DocumentProducer<T>> produceComparer, List<PartitionKeyRange> replacementRanges, final SqlQuerySpec querySpecForRepair) {
        Map<String, String> requestHeaders = this.getFeedHeaders(this.getFeedOptions(null));
        this.documentProducers.ensureCapacity(this.documentProducers.size() + replacementRanges.size() - 1);
        DocumentProducer<T> replacedDocumentProducer = this.documentProducers.get(currentDocumentProducerIndex);
        int index = currentDocumentProducerIndex + 1;
        final ParallelDocumentQueryExecutionContextBase context = this;
        for (final PartitionKeyRange range : replacementRanges) {
            final HashMap<String, String> documentProducerRequestHeader = new HashMap<String, String>(requestHeaders);
            this.documentProducers.add(index++, new DocumentProducer<T>(this.executeFunc, new Func2<String, Integer, DocumentServiceRequest>(){

                @Override
                public DocumentServiceRequest apply(String continuationToken, Integer pageSize) {
                    DocumentServiceRequest request = context.createRequest((Map<String, String>)documentProducerRequestHeader, querySpecForRepair, range.getId());
                    request.getHeaders().put("x-ms-max-item-count", Integer.toString(pageSize));
                    request.getHeaders().put("x-ms-continuation", continuationToken);
                    return request;
                }
            }, range, this.documentProducerClassT, this.fetchScheduler, replacedDocumentProducer.getPageSize(), replacedDocumentProducer.getCurrentBackendContinuationToken(), this.fetchCompletionCallback));
        }
        this.documentProducers.remove(currentDocumentProducerIndex);
        if (this.shouldPrefetch) {
            for (int i = 0; i < replacementRanges.size(); ++i) {
                this.documentProducers.get(i + currentDocumentProducerIndex).tryScheduleFetch();
            }
        }
    }

    private void notifyStopDocumentProducers() {
        for (int i = 0; i < this.documentProducers.size(); ++i) {
            this.documentProducers.get(i).notifyStop();
        }
    }

    @Override
    public void onNotifyStop() {
        this.notifyStopDocumentProducers();
        try {
            this.fetchScheduler.stop();
            this.initializationFuture.get();
        }
        catch (Exception e) {
            this.LOGGER.warn("Failed to wait for Futures to finish.", (Throwable)e);
            throw ExceptionHelper.toRuntimeException(ExceptionHelper.unwrap(e));
        }
        this.onFinish();
    }

    protected void onFinish() {
        if (this.responseHeaders != null) {
            this.responseHeaders.put("x-ms-request-charge", String.valueOf(this.chargeTracker.getTotalRequestCharge()));
        }
    }
}

