/*
 * Decompiled with CFR 0.152.
 */
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.logging.LogValues;
import com.launchdarkly.sdk.server.FDv2ChangeSetTranslator;
import com.launchdarkly.sdk.server.HeaderConstants;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.SelectorSource;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
import com.launchdarkly.sdk.server.subsystems.SerializationException;
import com.launchdarkly.shaded.com.google.gson.JsonElement;
import com.launchdarkly.shaded.com.google.gson.stream.JsonReader;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.ErrorStrategy;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.HttpConnectStrategy;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamClosedByCallerException;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamException;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamHttpErrorException;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.GsonHelpers;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.collections.IterableAsyncQueue;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.http.HttpHelpers;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.http.HttpProperties;
import com.launchdarkly.shaded.okhttp3.Headers;
import com.launchdarkly.shaded.okhttp3.Request;
import com.launchdarkly.shaded.org.jetbrains.annotations.NotNull;
import java.io.Reader;
import java.lang.reflect.Type;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class StreamingSynchronizerImpl
implements Synchronizer {
    private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300L);
    private final HttpProperties httpProperties;
    private final SelectorSource selectorSource;
    final URI streamUri;
    private final LDLogger logger;
    private final String payloadFilter;
    private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue();
    private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture();
    private boolean closed = false;
    private final Object closeLock = new Object();
    private final FDv2ProtocolHandler protocolHandler = new FDv2ProtocolHandler();
    private volatile EventSource eventSource;
    final Duration initialReconnectDelay;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final int threadPriority;

    public StreamingSynchronizerImpl(HttpProperties httpProperties, URI baseUri, String requestPath, LDLogger logger, SelectorSource selectorSource, String payloadFilter, Duration initialReconnectDelaySeconds, int threadPriority) {
        this.httpProperties = httpProperties;
        this.selectorSource = selectorSource;
        this.logger = logger.subLogger("StreamingSynchronizer");
        this.payloadFilter = payloadFilter;
        this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
        this.initialReconnectDelay = initialReconnectDelaySeconds;
        this.threadPriority = threadPriority;
    }

    private void startStream() {
        Headers headers = this.httpProperties.toHeadersBuilder().add("Accept", "text/event-stream").build();
        HttpConnectStrategy connectStrategy = ConnectStrategy.http(this.streamUri).headers(headers).clientBuilderActions(clientBuilder -> {
            this.httpProperties.applyToHttpClientBuilder(clientBuilder);
            clientBuilder.addInterceptor(chain -> {
                URI currentUri;
                Request originalRequest = chain.request();
                Selector selector = this.selectorSource.getSelector();
                URI updatedUri = currentUri = originalRequest.url().uri();
                if (!selector.isEmpty()) {
                    updatedUri = HttpHelpers.addQueryParam(updatedUri, "basis", selector.getState());
                }
                if (this.payloadFilter != null && !this.payloadFilter.isEmpty()) {
                    updatedUri = HttpHelpers.addQueryParam(updatedUri, "filter", this.payloadFilter);
                }
                if (updatedUri.equals(currentUri)) {
                    return chain.proceed(originalRequest);
                }
                Request newRequest = originalRequest.newBuilder().url(updatedUri.toString()).build();
                return chain.proceed(newRequest);
            });
        }).readTimeout(DEAD_CONNECTION_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
        EventSource.Builder builder = new EventSource.Builder(connectStrategy).errorStrategy(ErrorStrategy.alwaysContinue()).logger(this.logger).readBufferSize(5000).streamEventData(true).expectFields("event").retryDelay(this.initialReconnectDelay.toMillis(), TimeUnit.MILLISECONDS);
        this.eventSource = builder.build();
        Thread thread2 = this.getRunThread();
        thread2.start();
    }

    @NotNull
    private Thread getRunThread() {
        Thread thread2 = new Thread(() -> {
            try {
                StreamEvent event;
                Iterator<StreamEvent> iterator2 = this.eventSource.anyEvents().iterator();
                do {
                    if (!iterator2.hasNext()) return;
                } while (this.handleEvent(event = iterator2.next()));
                return;
            }
            catch (Exception e) {
                Object event = this.closeLock;
                synchronized (event) {
                    if (this.closed) {
                        return;
                    }
                }
                this.logger.error("Stream thread ended with exception: {}", LogValues.exceptionSummary(e));
                this.logger.debug(LogValues.exceptionTrace(e));
                DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.UNKNOWN, 0, e.toString(), Instant.now());
                this.resultQueue.put(FDv2SourceResult.interrupted(errorInfo, StreamingSynchronizerImpl.getFallback(e)));
                return;
            }
            finally {
                this.eventSource.close();
            }
        });
        thread2.setName("LaunchDarkly-FDv2-streaming-synchronizer");
        thread2.setPriority(this.threadPriority);
        thread2.setDaemon(true);
        return thread2;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<FDv2SourceResult> next() {
        Object object = this.closeLock;
        synchronized (object) {
            if (!this.closed && !this.started.getAndSet(true)) {
                this.startStream();
            }
        }
        return CompletableFuture.anyOf(this.shutdownFuture, this.resultQueue.take()).thenApply(result -> (FDv2SourceResult)result);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this.closeLock;
        synchronized (object) {
            this.closed = true;
            if (this.eventSource != null) {
                try {
                    this.eventSource.close();
                }
                catch (Exception e) {
                    this.logger.debug("Error closing event source during shutdown: {}", LogValues.exceptionSummary(e));
                }
            }
        }
        this.shutdownFuture.complete(FDv2SourceResult.shutdown());
    }

    private boolean handleEvent(StreamEvent event) {
        if (event instanceof MessageEvent) {
            this.handleMessage((MessageEvent)event);
            return true;
        }
        if (event instanceof FaultEvent) {
            return this.handleError(((FaultEvent)event).getCause());
        }
        return true;
    }

    private void handleMessage(MessageEvent event) {
        FDv2ProtocolHandler.IFDv2ProtocolAction action;
        FDv2Event fdv2Event;
        String eventName = event.getEventName();
        try {
            fdv2Event = this.parseFDv2Event(eventName, event.getDataReader());
        }
        catch (SerializationException e) {
            this.logger.error("Failed to parse FDv2 event: {}", LogValues.exceptionSummary(e));
            this.interruptedWithException(e, DataSourceStatusProvider.ErrorKind.INVALID_DATA, event);
            return;
        }
        try {
            action = this.protocolHandler.handleEvent(fdv2Event);
        }
        catch (Exception e) {
            this.logger.error("FDv2 protocol handler error: {}", LogValues.exceptionSummary(e));
            this.interruptedWithException(e, DataSourceStatusProvider.ErrorKind.INVALID_DATA, event);
            return;
        }
        FDv2SourceResult result = null;
        switch (action.getAction()) {
            case CHANGESET: {
                FDv2ProtocolHandler.FDv2ActionChangeset changeset = (FDv2ProtocolHandler.FDv2ActionChangeset)action;
                try {
                    DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted = FDv2ChangeSetTranslator.toChangeSet(changeset.getChangeset(), this.logger, event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()), true);
                    result = FDv2SourceResult.changeSet(converted, StreamingSynchronizerImpl.getFallback(event));
                }
                catch (Exception e) {
                    this.logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e));
                    this.logger.debug(LogValues.exceptionTrace(e));
                    DataSourceStatusProvider.ErrorInfo conversionError = new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.INVALID_DATA, 0, e.toString(), Instant.now());
                    result = FDv2SourceResult.interrupted(conversionError, StreamingSynchronizerImpl.getFallback(event));
                    this.restartStream();
                }
                break;
            }
            case ERROR: {
                FDv2ProtocolHandler.FDv2ActionError error = (FDv2ProtocolHandler.FDv2ActionError)action;
                this.logger.error("Received error from server: {} - {}", (Object)error.getId(), (Object)error.getReason());
                break;
            }
            case GOODBYE: {
                String reason = ((FDv2ProtocolHandler.FDv2ActionGoodbye)action).getReason();
                this.logger.info("Goodbye was received from the LaunchDarkly connection with reason: '{}'.", (Object)reason);
                result = FDv2SourceResult.goodbye(reason, StreamingSynchronizerImpl.getFallback(event));
                this.restartStream();
                break;
            }
            case INTERNAL_ERROR: {
                FDv2ProtocolHandler.FDv2ActionInternalError internalErrorAction = (FDv2ProtocolHandler.FDv2ActionInternalError)action;
                DataSourceStatusProvider.ErrorKind kind = DataSourceStatusProvider.ErrorKind.UNKNOWN;
                switch (internalErrorAction.getErrorType()) {
                    case MISSING_PAYLOAD: 
                    case JSON_ERROR: {
                        kind = DataSourceStatusProvider.ErrorKind.INVALID_DATA;
                        break;
                    }
                }
                DataSourceStatusProvider.ErrorInfo internalError = new DataSourceStatusProvider.ErrorInfo(kind, 0, "Internal error during FDv2 event processing", Instant.now());
                result = FDv2SourceResult.interrupted(internalError, StreamingSynchronizerImpl.getFallback(event));
                if (kind != DataSourceStatusProvider.ErrorKind.INVALID_DATA) break;
                this.restartStream();
                break;
            }
        }
        if (result != null) {
            this.resultQueue.put(result);
        }
    }

    private void interruptedWithException(Exception e, DataSourceStatusProvider.ErrorKind kind, MessageEvent event) {
        this.logger.debug(LogValues.exceptionTrace(e));
        DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(kind, 0, e.toString(), Instant.now());
        this.resultQueue.put(FDv2SourceResult.interrupted(errorInfo, StreamingSynchronizerImpl.getFallback(event)));
        this.restartStream();
    }

    private boolean handleError(StreamException e) {
        if (e instanceof StreamClosedByCallerException) {
            return false;
        }
        if (e instanceof StreamHttpErrorException) {
            int status = ((StreamHttpErrorException)e).getCode();
            DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(status);
            boolean recoverable = HttpErrors.checkIfErrorIsRecoverableAndLog(this.logger, "HTTP error " + status, "in FDv2 streaming connection", status, "will retry");
            if (!recoverable) {
                this.shutdownFuture.complete(FDv2SourceResult.terminalError(errorInfo, StreamingSynchronizerImpl.getFallback(e)));
                return false;
            }
            this.resultQueue.put(FDv2SourceResult.interrupted(errorInfo, StreamingSynchronizerImpl.getFallback(e)));
            return true;
        }
        this.logger.warn("Stream error: {}", LogValues.exceptionSummary(e));
        this.logger.debug(LogValues.exceptionTrace(e));
        DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 0, e.toString(), Instant.now());
        this.resultQueue.put(FDv2SourceResult.interrupted(errorInfo, StreamingSynchronizerImpl.getFallback(e)));
        return true;
    }

    private void restartStream() {
        Objects.requireNonNull(this.eventSource, "eventSource must not be null");
        this.eventSource.interrupt();
        this.protocolHandler.reset();
    }

    private FDv2Event parseFDv2Event(String eventName, Reader eventDataReader) throws SerializationException {
        try {
            JsonReader reader = new JsonReader(eventDataReader);
            FDv2Event event = new FDv2Event(eventName, (JsonElement)GsonHelpers.gsonInstance().fromJson(reader, (Type)((Object)JsonElement.class)));
            reader.close();
            return event;
        }
        catch (Exception e) {
            throw new SerializationException(e);
        }
    }

    private static boolean getFallback(Exception ex) {
        if (ex instanceof StreamHttpErrorException) {
            String headerValue = ((StreamHttpErrorException)ex).getHeaders().value(HeaderConstants.FDV1_FALLBACK.getHeaderName());
            return headerValue != null && headerValue.equalsIgnoreCase("true");
        }
        return false;
    }

    private static boolean getFallback(StreamEvent event) {
        String headerName = HeaderConstants.FDV1_FALLBACK.getHeaderName();
        String headerValue = null;
        if (event instanceof FaultEvent) {
            headerValue = ((FaultEvent)event).getHeaders().value(headerName);
        } else if (event instanceof MessageEvent) {
            headerValue = ((MessageEvent)event).getHeaders().value(headerName);
        }
        return headerValue != null && headerValue.equalsIgnoreCase("true");
    }
}

