/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.SessionRetryOptions;
import com.azure.cosmos.implementation.BackoffRetryUtility;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.Integers;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SessionTokenHelper;
import com.azure.cosmos.implementation.SessionTokenMismatchRetryPolicy;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.collections.ComparatorUtils;
import com.azure.cosmos.implementation.directconnectivity.AddressInformation;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.BarrierRequestHelper;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.ReadMode;
import com.azure.cosmos.implementation.directconnectivity.ReplicatedResourceClient;
import com.azure.cosmos.implementation.directconnectivity.RequestHelper;
import com.azure.cosmos.implementation.directconnectivity.StoreReader;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper;
import com.azure.cosmos.implementation.directconnectivity.TransportClient;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ConsistencyWriter {
    private static final int MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES = 30;
    private static final int DELAY_BETWEEN_WRITE_BARRIER_CALLS_IN_MS = 30;
    private static final int MAX_SHORT_BARRIER_RETRIES_FOR_MULTI_REGION = 4;
    private static final int SHORT_BARRIER_RETRY_INTERVAL_IN_MS_FOR_MULTI_REGION = 10;
    private final DiagnosticsClientContext diagnosticsClientContext;
    private final Logger logger = LoggerFactory.getLogger(ConsistencyWriter.class);
    private final TransportClient transportClient;
    private final AddressSelector addressSelector;
    private final ISessionContainer sessionContainer;
    private final IAuthorizationTokenProvider authorizationTokenProvider;
    private final boolean useMultipleWriteLocations;
    private final GatewayServiceConfigurationReader serviceConfigReader;
    private final StoreReader storeReader;
    private final SessionRetryOptions sessionRetryOptions;

    public ConsistencyWriter(DiagnosticsClientContext diagnosticsClientContext, AddressSelector addressSelector, ISessionContainer sessionContainer, TransportClient transportClient, IAuthorizationTokenProvider authorizationTokenProvider, GatewayServiceConfigurationReader serviceConfigReader, boolean useMultipleWriteLocations, SessionRetryOptions sessionRetryOptions) {
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.transportClient = transportClient;
        this.addressSelector = addressSelector;
        this.sessionContainer = sessionContainer;
        this.authorizationTokenProvider = authorizationTokenProvider;
        this.useMultipleWriteLocations = useMultipleWriteLocations;
        this.serviceConfigReader = serviceConfigReader;
        this.storeReader = new StoreReader(transportClient, addressSelector, null);
        this.sessionRetryOptions = sessionRetryOptions;
    }

    public Mono<StoreResponse> writeAsync(RxDocumentServiceRequest entity, TimeoutHelper timeout, boolean forceRefresh) {
        if (timeout.isElapsed() && BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics) != null && BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics).getRetryCount() > 1) {
            return Mono.error((Throwable)((Object)new RequestTimeoutException()));
        }
        String sessionToken = entity.getHeaders().get("x-ms-session-token");
        return BackoffRetryUtility.executeRetry(() -> this.writePrivateAsync(entity, timeout, forceRefresh), new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), this.sessionRetryOptions)).doOnEach(arg -> {
            block2: {
                try {
                    SessionTokenHelper.setOriginalSessionToken(entity, sessionToken);
                }
                catch (Throwable throwable) {
                    this.logger.error("Unexpected failure in handling orig [{}]: new [{}]", new Object[]{arg, throwable.getMessage(), throwable});
                    if (!(throwable instanceof Error)) break block2;
                    throw (Error)throwable;
                }
            }
        });
    }

    Mono<StoreResponse> writePrivateAsync(RxDocumentServiceRequest request, TimeoutHelper timeout, boolean forceRefresh) {
        if (timeout.isElapsed() && BridgeInternal.getRetryContext(request.requestContext.cosmosDiagnostics) != null && BridgeInternal.getRetryContext(request.requestContext.cosmosDiagnostics).getRetryCount() > 1) {
            return Mono.error((Throwable)((Object)new RequestTimeoutException()));
        }
        request.requestContext.timeoutHelper = timeout;
        if (request.requestContext.requestChargeTracker == null) {
            request.requestContext.requestChargeTracker = new RequestChargeTracker();
        }
        if (request.requestContext.cosmosDiagnostics == null) {
            request.requestContext.cosmosDiagnostics = request.createCosmosDiagnostics();
        }
        request.requestContext.forceRefreshAddressCache = forceRefresh;
        if (request.requestContext.globalStrongWriteResponse == null) {
            Mono<List<AddressInformation>> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh);
            AtomicReference primaryURI = new AtomicReference();
            ConcurrentHashMap replicaStatusList = new ConcurrentHashMap();
            Set replicaStatuses = Collections.newSetFromMap(new ConcurrentHashMap());
            return replicaAddressesObs.flatMap(replicaAddresses -> {
                try {
                    ArrayList<URI> contactedReplicas = new ArrayList<URI>();
                    replicaAddresses.forEach(replicaAddress -> {
                        Uri uri = replicaAddress.getPhysicalUri();
                        contactedReplicas.add(uri.getURI());
                        if (!uri.isPrimary()) {
                            replicaStatuses.add(uri.getHealthStatusDiagnosticString());
                        }
                    });
                    BridgeInternal.setContactedReplicas(request.requestContext.cosmosDiagnostics, contactedReplicas);
                    return Mono.just((Object)AddressSelector.getPrimaryUri(request, replicaAddresses));
                }
                catch (GoneException e) {
                    return Mono.error((Throwable)((Object)e));
                }
            }).flatMap(primaryUri -> {
                try {
                    primaryURI.set(primaryUri);
                    if ((this.useMultipleWriteLocations || request.getOperationType() == OperationType.Batch) && RequestHelper.getReadConsistencyStrategyToUse(this.serviceConfigReader, request) == ReadConsistencyStrategy.SESSION) {
                        SessionTokenHelper.setPartitionLocalSessionToken(request, this.sessionContainer);
                    } else {
                        SessionTokenHelper.validateAndRemoveSessionToken(request);
                    }
                }
                catch (Exception e) {
                    return Mono.error((Throwable)e);
                }
                replicaStatusList.put("Ignoring", replicaStatuses);
                replicaStatusList.put("Attempting", new HashSet<String>(Arrays.asList(primaryUri.getHealthStatusDiagnosticString())));
                return Mono.defer(() -> this.transportClient.invokeResourceOperationAsync((Uri)primaryUri, request).doOnError(t -> {
                    try {
                        Integer result;
                        String value;
                        Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)t);
                        CosmosException ex = Utils.as(unwrappedException, CosmosException.class);
                        Exception rawException = null;
                        if (ex == null && (rawException = Utils.as(unwrappedException, Exception.class)) == null) {
                            throw unwrappedException;
                        }
                        this.storeReader.createAndRecordStoreResult(request, null, (Exception)(ex != null ? ex : rawException), false, false, (Uri)primaryUri, replicaStatusList);
                        String string = value = ex != null ? ex.getResponseHeaders().get("x-ms-write-request-trigger-refresh") : null;
                        if (!Strings.isNullOrWhiteSpace(value) && (result = Integers.tryParse(value)) != null && result == 1) {
                            this.startBackgroundAddressRefresh(request);
                        }
                    }
                    catch (Throwable throwable) {
                        if (throwable instanceof Error) {
                            this.logger.error("Unexpected failure in handling orig [{}] : new [{}]", new Object[]{t.getMessage(), throwable.getMessage(), throwable});
                            throw (Error)throwable;
                        }
                        this.logger.info("Unexpected failure in handling orig [{}] : new [{}]", new Object[]{t.getMessage(), throwable.getMessage(), throwable});
                    }
                })).doOnError(throwable -> {
                    CosmosException cosmosException = Utils.as(throwable, CosmosException.class);
                    if (cosmosException != null) {
                        throw cosmosException;
                    }
                    String errorMessage = "Unexpected exception " + throwable.getMessage() + " received while reading from store.";
                    throw new InternalServerErrorException(Exceptions.getInternalServerErrorMessage(errorMessage), (Exception)throwable, 20910);
                });
            }).flatMap(response -> {
                this.storeReader.createAndRecordStoreResult(request, (StoreResponse)response, null, false, false, (Uri)primaryURI.get(), replicaStatusList);
                return this.barrierForGlobalStrong(request, (StoreResponse)response);
            }).doFinally(signalType -> {
                if (signalType != SignalType.CANCEL) {
                    return;
                }
                this.storeReader.createAndRecordStoreResultForCancelledRequest(request, false, false, replicaStatusList);
            });
        }
        Mono<RxDocumentServiceRequest> barrierRequestObs = BarrierRequestHelper.createAsync(this.diagnosticsClientContext, request, this.authorizationTokenProvider, null, request.requestContext.globalCommittedSelectedLSN);
        return barrierRequestObs.flatMap(barrierRequest -> this.waitForWriteBarrierAsync((RxDocumentServiceRequest)barrierRequest, request.requestContext.globalCommittedSelectedLSN).flatMap(v -> {
            if (!v.booleanValue()) {
                this.logger.info("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", (Object)request.requestContext.globalCommittedSelectedLSN);
                return Mono.error((Throwable)((Object)new GoneException("Global STRONG write barrier has not been met for the request.", 21006)));
            }
            return Mono.just((Object)request);
        })).map(req -> req.requestContext.globalStrongWriteResponse);
    }

    boolean isGlobalStrongRequest(RxDocumentServiceRequest request, StoreResponse response) {
        if (this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG) {
            int numberOfReadRegions = -1;
            String headerValue = response.getHeaderValue("x-ms-number-of-read-regions");
            if (headerValue != null) {
                numberOfReadRegions = Integer.parseInt(headerValue);
            }
            return numberOfReadRegions > 0 && this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG;
        }
        return false;
    }

    Mono<StoreResponse> barrierForGlobalStrong(RxDocumentServiceRequest request, StoreResponse response) {
        try {
            if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) {
                Utils.ValueHolder<Long> lsn = Utils.ValueHolder.initialize(-1L);
                Utils.ValueHolder<Long> globalCommittedLsn = Utils.ValueHolder.initialize(-1L);
                ConsistencyWriter.getLsnAndGlobalCommittedLsn(response, lsn, globalCommittedLsn);
                if ((Long)lsn.v == -1L || (Long)globalCommittedLsn.v == -1L) {
                    this.logger.error("ConsistencyWriter: lsn {} or GlobalCommittedLsn {} is not set for global strong request", lsn, globalCommittedLsn);
                    throw new GoneException("The requested resource is no longer available at the server.", 21005);
                }
                request.requestContext.globalStrongWriteResponse = response;
                request.requestContext.globalCommittedSelectedLSN = (Long)lsn.v;
                request.requestContext.forceRefreshAddressCache = false;
                this.logger.debug("ConsistencyWriter: globalCommittedLsn {}, lsn {}", globalCommittedLsn, lsn);
                if ((Long)globalCommittedLsn.v < (Long)lsn.v) {
                    Mono<RxDocumentServiceRequest> barrierRequestObs = BarrierRequestHelper.createAsync(this.diagnosticsClientContext, request, this.authorizationTokenProvider, null, request.requestContext.globalCommittedSelectedLSN);
                    return barrierRequestObs.flatMap(barrierRequest -> {
                        Mono<Boolean> barrierWait = this.waitForWriteBarrierAsync((RxDocumentServiceRequest)barrierRequest, request.requestContext.globalCommittedSelectedLSN);
                        return barrierWait.flatMap(res -> {
                            if (!res.booleanValue()) {
                                this.logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", (Object)request.requestContext.globalCommittedSelectedLSN);
                                return Mono.error((Throwable)((Object)new GoneException("Global STRONG write barrier has not been met for the request.", 21006)));
                            }
                            return Mono.just((Object)request.requestContext.globalStrongWriteResponse);
                        });
                    });
                }
                return Mono.just((Object)request.requestContext.globalStrongWriteResponse);
            }
            return Mono.just((Object)response);
        }
        catch (CosmosException e) {
            return Mono.error((Throwable)((Object)e));
        }
    }

    private Mono<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) {
        AtomicInteger writeBarrierRetryCount = new AtomicInteger(30);
        AtomicLong maxGlobalCommittedLsnReceived = new AtomicLong(0L);
        return Flux.defer(() -> {
            if (barrierRequest.requestContext.timeoutHelper.isElapsed()) {
                return Flux.error((Throwable)((Object)new RequestTimeoutException()));
            }
            Mono<List<StoreResult>> storeResultListObs = this.storeReader.readMultipleReplicaAsync(barrierRequest, true, 1, false, false, ReadMode.Strong, false, false);
            return storeResultListObs.flatMap(responses -> {
                if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
                    return Mono.just((Object)Boolean.TRUE);
                }
                long maxGlobalCommittedLsn = responses != null ? responses.stream().map(s -> s.globalCommittedLSN).max(ComparatorUtils.naturalComparator()).orElse(0L) : 0L;
                maxGlobalCommittedLsnReceived.set(Math.max(maxGlobalCommittedLsnReceived.get(), maxGlobalCommittedLsn));
                barrierRequest.requestContext.forceRefreshAddressCache = false;
                if (writeBarrierRetryCount.getAndDecrement() == 0) {
                    if (this.logger.isDebugEnabled() && responses != null) {
                        this.logger.debug("ConsistencyWriter: WaitForWriteBarrierAsync - Last barrier multi-region strong. Responses: {}", (Object)responses.stream().map(Object::toString).collect(Collectors.joining("; ")));
                        this.logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", (Object)maxGlobalCommittedLsnReceived);
                    }
                    return Mono.just((Object)Boolean.FALSE);
                }
                return Mono.empty();
            }).flux();
        }).repeatWhen(s -> s.flatMap(x -> {
            if (30 - writeBarrierRetryCount.get() > 4) {
                return Mono.delay((Duration)Duration.ofMillis(30L), (Scheduler)CosmosSchedulers.COSMOS_PARALLEL).flux();
            }
            return Mono.delay((Duration)Duration.ofMillis(10L), (Scheduler)CosmosSchedulers.COSMOS_PARALLEL).flux();
        })).take(1L).single();
    }

    static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolder<Long> lsn, Utils.ValueHolder<Long> globalCommittedLsn) {
        lsn.v = -1L;
        globalCommittedLsn.v = -1L;
        String headerValue = response.getHeaderValue("lsn");
        if (headerValue != null) {
            lsn.v = Long.parseLong(headerValue);
        }
        if ((headerValue = response.getHeaderValue("x-ms-global-Committed-lsn")) != null) {
            globalCommittedLsn.v = Long.parseLong(headerValue);
        }
    }

    void startBackgroundAddressRefresh(RxDocumentServiceRequest request) {
        this.addressSelector.resolvePrimaryUriAsync(request, true).publishOn(Schedulers.boundedElastic()).subscribe(r -> {}, e -> this.logger.warn("Background refresh of the primary address failed with {}", (Object)e.getMessage(), e));
    }
}

