/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class FlowSseClient {
    private final HttpClient httpClient;
    private final HttpRequest.Builder requestBuilder;
    private static final Pattern EVENT_DATA_PATTERN = Pattern.compile("^data:(.+)$", 8);
    private static final Pattern EVENT_ID_PATTERN = Pattern.compile("^id:(.+)$", 8);
    private static final Pattern EVENT_TYPE_PATTERN = Pattern.compile("^event:(.+)$", 8);

    public FlowSseClient(HttpClient httpClient) {
        this(httpClient, HttpRequest.newBuilder());
    }

    public FlowSseClient(HttpClient httpClient, HttpRequest.Builder requestBuilder) {
        this.httpClient = httpClient;
        this.requestBuilder = requestBuilder;
    }

    public void subscribe(String url, final SseEventHandler eventHandler) {
        HttpRequest request = this.requestBuilder.uri(URI.create(url)).header("Accept", "text/event-stream").header("Cache-Control", "no-cache").GET().build();
        final StringBuilder eventBuilder = new StringBuilder();
        final AtomicReference currentEventId = new AtomicReference();
        final AtomicReference<String> currentEventType = new AtomicReference<String>("message");
        Flow.Subscriber<String> lineSubscriber = new Flow.Subscriber<String>(){
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String line) {
                Matcher matcher;
                if (line.isEmpty()) {
                    if (eventBuilder.length() > 0) {
                        String eventData = eventBuilder.toString();
                        SseEvent event = new SseEvent((String)currentEventId.get(), (String)currentEventType.get(), eventData.trim());
                        eventHandler.onEvent(event);
                        eventBuilder.setLength(0);
                    }
                } else if (line.startsWith("data:")) {
                    Matcher matcher2 = EVENT_DATA_PATTERN.matcher(line);
                    if (matcher2.find()) {
                        eventBuilder.append(matcher2.group(1).trim()).append("\n");
                    }
                } else if (line.startsWith("id:")) {
                    Matcher matcher3 = EVENT_ID_PATTERN.matcher(line);
                    if (matcher3.find()) {
                        currentEventId.set(matcher3.group(1).trim());
                    }
                } else if (line.startsWith("event:") && (matcher = EVENT_TYPE_PATTERN.matcher(line)).find()) {
                    currentEventType.set(matcher.group(1).trim());
                }
                this.subscription.request(1L);
            }

            @Override
            public void onError(Throwable throwable) {
                eventHandler.onError(throwable);
            }

            @Override
            public void onComplete() {
                if (eventBuilder.length() > 0) {
                    String eventData = eventBuilder.toString();
                    SseEvent event = new SseEvent((String)currentEventId.get(), (String)currentEventType.get(), eventData.trim());
                    eventHandler.onEvent(event);
                }
            }
        };
        Function<Flow.Subscriber, HttpResponse.BodySubscriber> subscriberFactory = subscriber -> HttpResponse.BodySubscribers.fromLineSubscriber(subscriber);
        CompletableFuture future = this.httpClient.sendAsync(request, info -> (HttpResponse.BodySubscriber)subscriberFactory.apply(lineSubscriber));
        ((CompletableFuture)future.thenAccept(response -> {
            int status = response.statusCode();
            if (status != 200 && status != 201 && status != 202 && status != 206) {
                throw new RuntimeException("Failed to connect to SSE stream. Unexpected status code: " + status);
            }
        })).exceptionally(throwable -> {
            eventHandler.onError((Throwable)throwable);
            return null;
        });
    }

    public static interface SseEventHandler {
        public void onEvent(SseEvent var1);

        public void onError(Throwable var1);
    }

    public record SseEvent(String id, String type, String data) {
    }
}

