/*
 * Decompiled with CFR 0.152.
 */
package dev.langchain4j.model;

import dev.langchain4j.model.LambdaStreamingResponseHandler;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.WithAssertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class LambdaStreamingResponseHandlerTest
implements WithAssertions {
    LambdaStreamingResponseHandlerTest() {
    }

    @Test
    void testOnPartialResponse() {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("The sky ");
        tokens.add("is blue because of ");
        tokens.add("a phenomenon called ");
        tokens.add("Rayleigh scattering.");
        DummyModel model = new DummyModel(tokens);
        ArrayList receivedTokens = new ArrayList();
        model.chat("Why is the sky blue?", LambdaStreamingResponseHandler.onPartialResponse(receivedTokens::add));
        this.assertThat(receivedTokens).containsSequence(tokens);
    }

    @Test
    void testOnPartialResponseAndError() {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("Three ");
        tokens.add("Two ");
        tokens.add("One ");
        tokens.add(new RuntimeException("BOOM"));
        DummyModel model = new DummyModel(tokens);
        ArrayList receivedTokens = new ArrayList();
        Throwable[] thrown = new Throwable[]{null};
        model.chat("Create a countdown", LambdaStreamingResponseHandler.onPartialResponseAndError(receivedTokens::add, t -> {
            thrown[0] = t;
        }));
        this.assertThat(tokens).containsSubsequence(receivedTokens);
        this.assertThat(thrown[0]).isNotNull();
        this.assertThat(thrown[0]).isInstanceOf(RuntimeException.class);
        this.assertThat(thrown[0].getMessage()).isEqualTo("BOOM");
    }

    @Test
    @Timeout(value=5L, unit=TimeUnit.SECONDS)
    void testOnPartialResponseBlocking() throws InterruptedException {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("Hello ");
        tokens.add("streaming ");
        tokens.add("world!");
        AsyncDummyModel model = new AsyncDummyModel(tokens);
        ArrayList receivedTokens = new ArrayList();
        AtomicBoolean completed = new AtomicBoolean(false);
        LambdaStreamingResponseHandler.onPartialResponseBlocking((StreamingChatModel)model, (String)"Test message", token -> {
            receivedTokens.add(token);
            if ("world!".equals(token)) {
                completed.set(true);
            }
        });
        this.assertThat(receivedTokens).containsSequence(tokens);
        this.assertThat(completed.get()).isTrue();
    }

    @Test
    @Timeout(value=5L, unit=TimeUnit.SECONDS)
    void testOnPartialResponseAndErrorBlocking() throws InterruptedException {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("Processing ");
        tokens.add("request ");
        tokens.add("successfully");
        AsyncDummyModel model = new AsyncDummyModel(tokens);
        ArrayList receivedTokens = new ArrayList();
        Throwable[] thrown = new Throwable[]{null};
        AtomicBoolean completed = new AtomicBoolean(false);
        LambdaStreamingResponseHandler.onPartialResponseAndErrorBlocking((StreamingChatModel)model, (String)"Test message", token -> {
            receivedTokens.add(token);
            if ("successfully".equals(token)) {
                completed.set(true);
            }
        }, t -> {
            thrown[0] = t;
        });
        this.assertThat(receivedTokens).containsSequence(tokens);
        this.assertThat(thrown[0]).isNull();
        this.assertThat(completed.get()).isTrue();
    }

    @Test
    @Timeout(value=5L, unit=TimeUnit.SECONDS)
    void onPartialResponseBlockingWithError() throws InterruptedException {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("Never ");
        tokens.add("ending ");
        tokens.add(new RuntimeException("Something went wrong"));
        AsyncDummyModel model = new AsyncDummyModel(tokens);
        ArrayList receivedTokens = new ArrayList();
        AtomicBoolean errorHandled = new AtomicBoolean(false);
        LambdaStreamingResponseHandler.onPartialResponseBlocking((StreamingChatModel)model, (String)"Test message", token -> {
            receivedTokens.add(token);
            if (receivedTokens.size() == 2) {
                errorHandled.set(true);
            }
        });
        this.assertThat(receivedTokens).containsExactly(new Object[]{"Never ", "ending "});
        this.assertThat(errorHandled.get()).isTrue();
    }

    @Test
    @Timeout(value=5L, unit=TimeUnit.SECONDS)
    void onPartialResponseAndErrorBlockingWithError() throws InterruptedException {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("Never ");
        tokens.add("ending ");
        tokens.add(new RuntimeException("Something went wrong"));
        AsyncDummyModel model = new AsyncDummyModel(tokens);
        ArrayList receivedTokens = new ArrayList();
        Throwable[] thrown = new Throwable[]{null};
        AtomicBoolean completed = new AtomicBoolean(false);
        LambdaStreamingResponseHandler.onPartialResponseAndErrorBlocking((StreamingChatModel)model, (String)"Test message", receivedTokens::add, t -> {
            thrown[0] = t;
            completed.set(true);
        });
        this.assertThat(receivedTokens).containsExactly(new Object[]{"Never ", "ending "});
        this.assertThat(thrown[0]).isNotNull();
        this.assertThat(thrown[0]).isInstanceOf(RuntimeException.class);
        this.assertThat(thrown[0].getMessage()).isEqualTo("Something went wrong");
        this.assertThat(completed.get()).isTrue();
    }

    @Test
    void onPartialResponseBlockingWithInterruption() {
        ArrayList<Object> tokens = new ArrayList<Object>();
        tokens.add("Never ");
        tokens.add("ending ");
        NonCompletingAsyncDummyModel model = new NonCompletingAsyncDummyModel(tokens);
        Thread testThread = Thread.currentThread();
        new Thread(() -> {
            try {
                Thread.sleep(100L);
                testThread.interrupt();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        this.assertThatThrownBy(() -> LambdaStreamingResponseHandler.onPartialResponseBlocking((StreamingChatModel)model, (String)"Test message", System.out::print)).isInstanceOf(InterruptedException.class);
    }

    static class DummyModel
    implements StreamingChatModel {
        private final List<Object> stringsAndError;

        public DummyModel(List<Object> stringsAndError) {
            this.stringsAndError = stringsAndError;
        }

        public void doChat(ChatRequest chatRequest, StreamingChatResponseHandler handler) {
            this.stringsAndError.forEach(obj -> {
                if (obj instanceof String) {
                    String message = (String)obj;
                    handler.onPartialResponse(message);
                } else if (obj instanceof Throwable) {
                    Throwable problem = (Throwable)obj;
                    handler.onError(problem);
                }
            });
            if (this.stringsAndError.stream().noneMatch(obj -> obj instanceof Throwable)) {
                handler.onCompleteResponse(null);
            }
        }
    }

    static class AsyncDummyModel
    implements StreamingChatModel {
        private final List<Object> stringsAndError;

        public AsyncDummyModel(List<Object> stringsAndError) {
            this.stringsAndError = stringsAndError;
        }

        public void doChat(ChatRequest chatRequest, StreamingChatResponseHandler handler) {
            new Thread(() -> {
                try {
                    for (Object obj2 : this.stringsAndError) {
                        Thread.sleep(50L);
                        if (obj2 instanceof String) {
                            String message = (String)obj2;
                            handler.onPartialResponse(message);
                            continue;
                        }
                        if (!(obj2 instanceof Throwable)) continue;
                        Throwable problem = (Throwable)obj2;
                        handler.onError(problem);
                        return;
                    }
                    if (this.stringsAndError.stream().noneMatch(obj -> obj instanceof Throwable)) {
                        handler.onCompleteResponse(null);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    handler.onError((Throwable)e);
                }
            }).start();
        }
    }

    static class NonCompletingAsyncDummyModel
    implements StreamingChatModel {
        private final List<Object> stringsAndError;

        public NonCompletingAsyncDummyModel(List<Object> stringsAndError) {
            this.stringsAndError = stringsAndError;
        }

        public void doChat(ChatRequest chatRequest, StreamingChatResponseHandler handler) {
            new Thread(() -> {
                try {
                    for (Object obj : this.stringsAndError) {
                        Thread.sleep(50L);
                        if (!(obj instanceof String)) continue;
                        String message = (String)obj;
                        handler.onPartialResponse(message);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    handler.onError((Throwable)e);
                }
            }).start();
        }
    }
}

