/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosDiagnosticsContext;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.DiagnosticsProvider;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.models.FeedResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.util.context.ContextView;

public final class CosmosPagedFlux<T>
extends ContinuablePagedFlux<String, T, FeedResponse<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFlux.class);
    private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();
    private static final ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.CosmosDiagnosticsContextAccessor ctxAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.getCosmosDiagnosticsContextAccessor();
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final Consumer<FeedResponse<T>> feedResponseConsumer;
    private final int defaultPageSize;

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
        this(optionsFluxFunction, null, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction, Consumer<FeedResponse<T>> feedResponseConsumer) {
        this(optionsFluxFunction, feedResponseConsumer, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction, Consumer<FeedResponse<T>> feedResponseConsumer, int defaultPageSize) {
        this.optionsFluxFunction = optionsFluxFunction;
        this.feedResponseConsumer = feedResponseConsumer;
        this.defaultPageSize = defaultPageSize;
    }

    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsumer) {
        if (this.feedResponseConsumer != null) {
            return new CosmosPagedFlux<T>(this.optionsFluxFunction, this.feedResponseConsumer.andThen(newFeedResponseConsumer));
        }
        return new CosmosPagedFlux<T>(this.optionsFluxFunction, newFeedResponseConsumer);
    }

    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public Flux<FeedResponse<T>> byPage(String continuationToken) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public Flux<FeedResponse<T>> byPage(int preferredPageSize) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageSize) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
        cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        Flux<FeedResponse<T>> pagedResponse = this.byPage();
        pagedResponse.flatMap(tFeedResponse -> {
            IterableStream elements = tFeedResponse.getElements();
            if (elements == null) {
                return Flux.empty();
            }
            return Flux.fromIterable(elements);
        }).subscribe(coreSubscriber);
    }

    CosmosPagedFlux<T> withDefaultPageSize(int pageSize) {
        return new CosmosPagedFlux<T>(this.optionsFluxFunction, this.feedResponseConsumer, pageSize);
    }

    private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        if (this.defaultPageSize > 0) {
            cosmosPagedFluxOptions.setMaxItemCount(this.defaultPageSize);
        }
        return cosmosPagedFluxOptions;
    }

    private <TOutput> Flux<TOutput> wrapWithTracingIfEnabled(CosmosPagedFluxOptions pagedFluxOptions, Flux<TOutput> publisher) {
        DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider();
        if (tracerProvider == null || !tracerProvider.isEnabled()) {
            return publisher;
        }
        return tracerProvider.runUnderSpanInContext(publisher, pagedFluxOptions);
    }

    private void recordFeedResponse(CosmosPagedFluxOptions pagedFluxOptions, Context traceCtx, DiagnosticsProvider tracerProvider, FeedResponse<T> response, AtomicLong feedResponseConsumerLatencyInNanos) {
        Integer actualItemCount;
        CosmosDiagnostics diagnostics = response != null ? response.getCosmosDiagnostics() : null;
        Integer n = actualItemCount = response != null && response.getResults() != null ? Integer.valueOf(response.getResults().size()) : null;
        if (diagnostics != null && cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(diagnostics).compareAndSet(false, true)) {
            if (pagedFluxOptions.getSamplingRateSnapshot() < 1.0) {
                cosmosDiagnosticsAccessor.setSamplingRateSnapshot(diagnostics, pagedFluxOptions.getSamplingRateSnapshot());
            }
            if (this.isTracerEnabled(tracerProvider)) {
                tracerProvider.recordPage(traceCtx, response != null ? response.getCosmosDiagnostics() : null, actualItemCount, response != null ? Double.valueOf(response.getRequestCharge()) : null);
            }
            if (this.feedResponseConsumer != null) {
                Instant feedResponseConsumerStart = Instant.now();
                this.feedResponseConsumer.accept(response);
                feedResponseConsumerLatencyInNanos.addAndGet(Duration.between(Instant.now(), feedResponseConsumerStart).toNanos());
            }
        }
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions pagedFluxOptions, Context context) {
        AtomicReference startTime = new AtomicReference();
        AtomicLong feedResponseConsumerLatencyInNanos = new AtomicLong(0L);
        Flux result = this.wrapWithTracingIfEnabled(pagedFluxOptions, this.optionsFluxFunction.apply(pagedFluxOptions)).doOnSubscribe(ignoredValue -> {
            startTime.set(Instant.now());
            feedResponseConsumerLatencyInNanos.set(0L);
        }).doOnEach(signal -> {
            FeedResponse response = (FeedResponse)signal.get();
            Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView());
            DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider();
            switch (signal.getType()) {
                case ON_COMPLETE: {
                    this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos);
                    if (!this.isTracerEnabled(tracerProvider)) break;
                    tracerProvider.recordFeedResponseConsumerLatency(signal, Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));
                    tracerProvider.endSpan(traceCtx);
                    break;
                }
                case ON_NEXT: {
                    this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos);
                    break;
                }
                case ON_ERROR: {
                    if (!this.isTracerEnabled(tracerProvider)) break;
                    tracerProvider.recordFeedResponseConsumerLatency(signal, Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));
                    tracerProvider.endSpan(traceCtx, signal.getThrowable());
                    break;
                }
            }
        });
        DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider();
        if (this.isTracerEnabled(tracerProvider)) {
            CosmosDiagnosticsContext cosmosCtx = ctxAccessor.create(pagedFluxOptions.getSpanName(), pagedFluxOptions.getAccountTag(), BridgeInternal.getServiceEndpoint(pagedFluxOptions.getCosmosAsyncClient()), pagedFluxOptions.getDatabaseId(), pagedFluxOptions.getContainerId(), pagedFluxOptions.getResourceType(), pagedFluxOptions.getOperationType(), pagedFluxOptions.getOperationId(), pagedFluxOptions.getEffectiveConsistencyLevel(), pagedFluxOptions.getMaxItemCount(), pagedFluxOptions.getDiagnosticsThresholds(), null, pagedFluxOptions.getConnectionMode(), pagedFluxOptions.getUserAgent());
            ctxAccessor.setSamplingRateSnapshot(cosmosCtx, pagedFluxOptions.getSamplingRateSnapshot());
            return Flux.deferContextual(reactorCtx -> result.doOnCancel(() -> {
                Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
                tracerProvider.endSpan(traceCtx);
            }).doOnComplete(() -> {
                Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
                tracerProvider.endSpan(traceCtx);
            })).contextWrite((ContextView)DiagnosticsProvider.setContextInReactor(pagedFluxOptions.getDiagnosticsProvider().startSpan(pagedFluxOptions.getSpanName(), cosmosCtx, context)));
        }
        return result;
    }

    private boolean isTracerEnabled(DiagnosticsProvider tracerProvider) {
        return tracerProvider != null;
    }

    static void initialize() {
        ImplementationBridgeHelpers.CosmosPageFluxHelper.setCosmosPageFluxAccessor(CosmosPagedFlux::new);
    }

    static {
        CosmosPagedFlux.initialize();
    }
}

