/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
import com.azure.cosmos.implementation.changefeed.ProcessorSettings;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedObserverContextImpl;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ExceptionClassifier;
import com.azure.cosmos.implementation.changefeed.common.StatusCodeErrorType;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionNotFoundException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessor;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionProcessorImpl
implements PartitionProcessor {
    private static final Logger logger = LoggerFactory.getLogger(PartitionProcessorImpl.class);
    private static final int DefaultMaxItemCount = 100;
    private final ProcessorSettings settings;
    private final PartitionCheckpointer checkpointer;
    private final ChangeFeedObserver<JsonNode> observer;
    private volatile CosmosChangeFeedRequestOptions options;
    private final ChangeFeedContextClient documentClient;
    private final Lease lease;
    private volatile RuntimeException resultException;
    private volatile String lastServerContinuationToken;
    private volatile boolean isFirstQueryForChangeFeeds;

    public PartitionProcessorImpl(ChangeFeedObserver<JsonNode> observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, PartitionCheckpointer checkpointer, Lease lease) {
        this.observer = observer;
        this.documentClient = documentClient;
        this.settings = settings;
        this.checkpointer = checkpointer;
        this.lease = lease;
        ChangeFeedState state = settings.getStartState();
        this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state);
        this.options.setMaxItemCount(settings.getMaxItemCount());
    }

    @Override
    public Mono<Void> run(CancellationToken cancellationToken) {
        logger.info("Partition {}: processing task started with owner {}.", (Object)this.lease.getLeaseToken(), (Object)this.lease.getOwner());
        this.isFirstQueryForChangeFeeds = true;
        this.checkpointer.setCancellationToken(cancellationToken);
        return Flux.just((Object)this).flatMap(value -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.empty();
            }
            if (this.isFirstQueryForChangeFeeds) {
                this.isFirstQueryForChangeFeeds = false;
                return Flux.just((Object)value);
            }
            Instant stopTimer = Instant.now().plus(this.settings.getFeedPollDelay());
            return Mono.just((Object)value).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                Instant currentTime = Instant.now();
                return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
            }).last();
        }).flatMap(value -> this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), this.options, JsonNode.class).limitRequest(1L)).flatMap(documentFeedResponse -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error((Throwable)new TaskCancelledException());
            }
            String continuationToken = documentFeedResponse.getContinuationToken();
            ChangeFeedState continuationState = ChangeFeedState.fromString(continuationToken);
            Preconditions.checkNotNull(continuationState, "Argument 'continuationState' must not be null.");
            Preconditions.checkArgument(continuationState.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation");
            this.lastServerContinuationToken = continuationState.getContinuation().getCurrentContinuationToken().getToken();
            if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) {
                logger.info("Partition {}: processing {} feeds with owner {}.", new Object[]{this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()});
                return this.dispatchChanges((FeedResponse<JsonNode>)documentFeedResponse, continuationState).doOnError(throwable -> logger.debug("Exception was thrown from thread {}", (Object)Thread.currentThread().getId(), throwable)).doOnSuccess(Void2 -> {
                    this.options = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(continuationToken);
                    if (cancellationToken.isCancellationRequested()) {
                        throw new TaskCancelledException();
                    }
                });
            }
            this.options = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(continuationToken);
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error((Throwable)new TaskCancelledException());
            }
            return Flux.empty();
        }).doOnComplete(() -> {
            if (this.options.getMaxItemCount() != this.settings.getMaxItemCount()) {
                this.options.setMaxItemCount(this.settings.getMaxItemCount());
            }
        }).onErrorResume(throwable -> {
            block14: {
                block13: {
                    if (!(throwable instanceof CosmosException)) break block13;
                    CosmosException clientException = (CosmosException)((Object)((Object)throwable));
                    logger.warn("CosmosException: Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException});
                    StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException);
                    switch (docDbError) {
                        case PARTITION_NOT_FOUND: {
                            this.resultException = new PartitionNotFoundException("Partition not found.", this.lastServerContinuationToken);
                            break;
                        }
                        case PARTITION_SPLIT_OR_MERGE: {
                            this.resultException = new FeedRangeGoneException("Partition split.", this.lastServerContinuationToken);
                            break;
                        }
                        case UNDEFINED: {
                            this.resultException = new RuntimeException((Throwable)((Object)clientException));
                            break;
                        }
                        case MAX_ITEM_COUNT_TOO_LARGE: {
                            if (this.options.getMaxItemCount() <= 1) {
                                logger.error("Cannot reduce maxItemCount further as it's already at {}", (Object)this.options.getMaxItemCount(), (Object)clientException);
                                this.resultException = new RuntimeException((Throwable)((Object)clientException));
                            }
                            this.options.setMaxItemCount(this.options.getMaxItemCount() / 2);
                            logger.warn("Reducing maxItemCount, new value: {}", (Object)this.options.getMaxItemCount());
                            return Flux.empty();
                        }
                        case TRANSIENT_ERROR: {
                            if (clientException.getRetryAfterDuration().toMillis() > 0L) {
                                Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), ChronoUnit.MILLIS);
                                return Mono.just((Object)clientException.getRetryAfterDuration().toMillis()).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                                    Instant currentTime = Instant.now();
                                    return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
                                }).flatMap(values -> Flux.empty());
                            }
                            break block14;
                        }
                        default: {
                            logger.error("Unrecognized Cosmos exception returned error code {}", (Object)docDbError, (Object)clientException);
                            this.resultException = new RuntimeException((Throwable)((Object)clientException));
                            break;
                        }
                    }
                    break block14;
                }
                if (throwable instanceof LeaseLostException) {
                    logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()});
                    this.resultException = (LeaseLostException)throwable;
                } else if (throwable instanceof TaskCancelledException) {
                    logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable});
                    this.resultException = (TaskCancelledException)throwable;
                } else {
                    logger.warn("Unexpected exception: Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable});
                    this.resultException = new RuntimeException((Throwable)throwable);
                }
            }
            return Flux.error((Throwable)throwable);
        }).repeat(() -> {
            if (cancellationToken.isCancellationRequested()) {
                this.resultException = new TaskCancelledException();
                return false;
            }
            return true;
        }).onErrorResume(throwable -> {
            if (this.resultException == null) {
                this.resultException = new RuntimeException((Throwable)throwable);
            }
            return Flux.empty();
        }).then().doFinally(any -> logger.info("Partition {}: processing task exited with owner {}.", (Object)this.lease.getLeaseToken(), (Object)this.lease.getOwner()));
    }

    private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() {
        FeedRangeInternal feedRange = this.settings.getStartState().getFeedRange();
        Preconditions.checkNotNull(feedRange, "FeedRange must not be null here.");
        Preconditions.checkArgument(feedRange instanceof FeedRangePartitionKeyRangeImpl, "FeedRange must be a PkRangeId FeedRange when using Lease V1 contract.");
        return (FeedRangePartitionKeyRangeImpl)feedRange;
    }

    @Override
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Void> dispatchChanges(FeedResponse<JsonNode> response, ChangeFeedState continuationState) {
        ChangeFeedObserverContextImpl<JsonNode> context = new ChangeFeedObserverContextImpl<JsonNode>(this.getPkRangeFeedRangeFromStartState().getPartitionKeyRangeId(), response, continuationState, this.checkpointer);
        return this.observer.processChanges(context, response.getResults());
    }
}

