/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ThroughputControlGroupConfig;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
import com.azure.cosmos.implementation.changefeed.ProcessorSettings;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedObserverContextImpl;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ExceptionClassifier;
import com.azure.cosmos.implementation.changefeed.common.StatusCodeErrorType;
import com.azure.cosmos.implementation.changefeed.epkversion.FeedRangeThroughputControlConfigManager;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionProcessor;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionProcessorHelper;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionNotFoundException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionProcessorImpl<T>
implements PartitionProcessor {
    private static final Logger logger = LoggerFactory.getLogger(PartitionProcessorImpl.class);
    private final ProcessorSettings settings;
    private final PartitionCheckpointer checkpointer;
    private final ChangeFeedObserver<T> observer;
    private volatile CosmosChangeFeedRequestOptions options;
    private final ChangeFeedContextClient documentClient;
    private final Lease lease;
    private final Class<T> itemType;
    private final ChangeFeedMode changeFeedMode;
    private volatile RuntimeException resultException;
    private volatile String lastServerContinuationToken;
    private volatile boolean hasMoreResults;
    private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager;

    public PartitionProcessorImpl(ChangeFeedObserver<T> observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, PartitionCheckpointer checkpointer, Lease lease, Class<T> itemType, ChangeFeedMode changeFeedMode, FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager) {
        this.observer = observer;
        this.documentClient = documentClient;
        this.settings = settings;
        this.checkpointer = checkpointer;
        this.lease = lease;
        this.itemType = itemType;
        this.changeFeedMode = changeFeedMode;
        this.lastServerContinuationToken = this.lease.getContinuationToken();
        this.options = PartitionProcessorHelper.createChangeFeedRequestOptionsForChangeFeedState(settings.getStartState(), settings.getMaxItemCount(), this.changeFeedMode);
        this.feedRangeThroughputControlConfigManager = feedRangeThroughputControlConfigManager;
    }

    @Override
    public Mono<Void> run(CancellationToken cancellationToken) {
        logger.info("Lease with token {}: processing task started with owner {}.", (Object)this.lease.getLeaseToken(), (Object)this.lease.getOwner());
        this.hasMoreResults = true;
        this.checkpointer.setCancellationToken(cancellationToken);
        return Flux.just((Object)this).flatMap(value -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.empty();
            }
            if (this.hasMoreResults && this.resultException == null) {
                return Flux.just((Object)value);
            }
            Instant stopTimer = Instant.now().plus(this.settings.getFeedPollDelay());
            return Mono.just((Object)value).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                Instant currentTime = Instant.now();
                return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
            }).last();
        }).flatMap(value -> this.tryGetThroughputControlConfigForFeedRange(this.lease)).flatMap(configValueHolder -> {
            if (configValueHolder.v != null) {
                this.options.setThroughputControlGroupName(((ThroughputControlGroupConfig)configValueHolder.v).getGroupName());
            }
            return this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), this.options, this.itemType);
        }).flatMap(documentFeedResponse -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error((Throwable)new TaskCancelledException());
            }
            String continuationToken = documentFeedResponse.getContinuationToken();
            ChangeFeedState continuationState = ChangeFeedState.fromString(continuationToken);
            Preconditions.checkNotNull(continuationState, "Argument 'continuationState' must not be null.");
            Preconditions.checkArgument(continuationState.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation");
            this.lastServerContinuationToken = continuationToken;
            boolean bl = this.hasMoreResults = !ModelBridgeInternal.noChanges(documentFeedResponse);
            if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) {
                logger.info("Lease with token {}: processing {} feeds with owner {}.", new Object[]{this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()});
                return this.dispatchChanges((FeedResponse<T>)documentFeedResponse, continuationState).doOnError(throwable -> logger.warn("Lease with token {}: Exception was thrown from thread {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), throwable})).doOnSuccess(Void2 -> {
                    this.options = PartitionProcessorHelper.createForProcessingFromContinuation(continuationToken, this.changeFeedMode);
                    if (cancellationToken.isCancellationRequested()) {
                        throw new TaskCancelledException();
                    }
                });
            }
            return this.checkpointer.checkpointPartition(continuationState).doOnError(throwable -> logger.warn("Failed to checkpoint Lease with token {} from thread {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), throwable})).flatMap(lease -> {
                this.options = PartitionProcessorHelper.createForProcessingFromContinuation(continuationToken, this.changeFeedMode);
                if (cancellationToken.isCancellationRequested()) {
                    return Mono.error((Throwable)new TaskCancelledException());
                }
                return Mono.empty();
            });
        }).doOnComplete(() -> {
            if (this.options.getMaxItemCount() != this.settings.getMaxItemCount()) {
                this.options.setMaxItemCount(this.settings.getMaxItemCount());
            }
        }).onErrorResume(throwable -> {
            block14: {
                block13: {
                    if (!(throwable instanceof CosmosException)) break block13;
                    CosmosException clientException = (CosmosException)((Object)((Object)throwable));
                    logger.warn("Lease with token {}: CosmosException was thrown from thread {} for lease with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException});
                    StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException);
                    switch (docDbError) {
                        case PARTITION_NOT_FOUND: {
                            this.resultException = new PartitionNotFoundException("Partition not found.", this.lastServerContinuationToken);
                            break;
                        }
                        case PARTITION_SPLIT_OR_MERGE: {
                            this.resultException = new FeedRangeGoneException("Partition split or merge.", this.lastServerContinuationToken);
                            break;
                        }
                        case UNDEFINED: {
                            this.resultException = new RuntimeException((Throwable)((Object)clientException));
                            break;
                        }
                        case MAX_ITEM_COUNT_TOO_LARGE: {
                            if (this.options.getMaxItemCount() <= 1) {
                                logger.error("Cannot reduce maxItemCount further as it's already at {}", (Object)this.options.getMaxItemCount(), (Object)clientException);
                                this.resultException = new RuntimeException((Throwable)((Object)clientException));
                            }
                            this.options.setMaxItemCount(this.options.getMaxItemCount() / 2);
                            logger.warn("Reducing maxItemCount, new value: {}", (Object)this.options.getMaxItemCount());
                            return Flux.empty();
                        }
                        case TRANSIENT_ERROR: {
                            if (clientException.getRetryAfterDuration().toMillis() > 0L) {
                                Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), ChronoUnit.MILLIS);
                                return Mono.just((Object)clientException.getRetryAfterDuration().toMillis()).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                                    Instant currentTime = Instant.now();
                                    return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
                                }).flatMap(values -> Flux.empty());
                            }
                            break block14;
                        }
                        default: {
                            logger.error("Lease with token {}: Unrecognized Cosmos exception returned error code {}", new Object[]{this.lease.getLeaseToken(), docDbError, clientException});
                            this.resultException = new RuntimeException((Throwable)((Object)clientException));
                            break;
                        }
                    }
                    break block14;
                }
                if (throwable instanceof LeaseLostException) {
                    logger.info("Lease with token {}: LeaseLoseException was thrown from thread {} for lease with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()});
                    this.resultException = (LeaseLostException)throwable;
                } else if (throwable instanceof TaskCancelledException) {
                    logger.debug("Lease with token {}: Task cancelled exception was thrown from thread {} for lease with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable});
                    this.resultException = (TaskCancelledException)throwable;
                } else {
                    logger.warn("Lease with token {}: Unexpected exception was thrown from thread {} for lease with owner {}", new Object[]{this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable});
                    this.resultException = new RuntimeException((Throwable)throwable);
                }
            }
            return Flux.error((Throwable)throwable);
        }).repeat(() -> {
            if (cancellationToken.isCancellationRequested()) {
                this.resultException = new TaskCancelledException();
                return false;
            }
            return true;
        }).onErrorResume(throwable -> {
            if (this.resultException == null) {
                this.resultException = new RuntimeException((Throwable)throwable);
            }
            return Flux.empty();
        }).then().doFinally(any -> logger.info("Lease with token {}: processing task exited with owner {}.", (Object)this.lease.getLeaseToken(), (Object)this.lease.getOwner()));
    }

    private Mono<Utils.ValueHolder<ThroughputControlGroupConfig>> tryGetThroughputControlConfigForFeedRange(Lease lease) {
        if (this.feedRangeThroughputControlConfigManager == null) {
            return Mono.just(new Utils.ValueHolder<Object>(null));
        }
        return this.feedRangeThroughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange((FeedRangeEpkImpl)lease.getFeedRange()).map(config -> new Utils.ValueHolder<ThroughputControlGroupConfig>((ThroughputControlGroupConfig)config));
    }

    @Override
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Void> dispatchChanges(FeedResponse<T> response, ChangeFeedState continuationState) {
        ChangeFeedObserverContextImpl<T> context = new ChangeFeedObserverContextImpl<T>(this.lease.getLeaseToken(), response, continuationState, this.checkpointer);
        return this.observer.processChanges(context, response.getResults());
    }
}

