/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.taskexecutor.IdleTestTask;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ThreadInfoRequestCoordinatorTest {
    private static final Duration REQUEST_TIMEOUT = Duration.ofMillis(100L);
    private static final String REQUEST_TIMEOUT_MESSAGE = "Request timeout.";
    private static final int DEFAULT_NUMBER_OF_SAMPLES = 1;
    private static final int DEFAULT_MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DEFAULT_DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    private static ScheduledExecutorService executorService;
    private ThreadInfoRequestCoordinator coordinator;

    ThreadInfoRequestCoordinatorTest() {
    }

    @BeforeAll
    static void setUp() {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterAll
    static void tearDown() {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @BeforeEach
    void initCoordinator() {
        this.coordinator = new ThreadInfoRequestCoordinator((Executor)executorService, REQUEST_TIMEOUT);
    }

    @AfterEach
    void shutdownCoordinator() {
        if (this.coordinator != null) {
            Assertions.assertThat((int)this.coordinator.getNumberOfPendingRequests()).isZero();
            this.coordinator.shutDown();
        }
    }

    @Test
    void testSuccessfulThreadInfoRequest() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        VertexThreadInfoStats threadInfoStats = (VertexThreadInfoStats)requestFuture.get();
        Assertions.assertThat((int)threadInfoStats.getRequestId()).isEqualTo(0);
        Map samplesBySubtask = threadInfoStats.getSamplesBySubtask();
        for (Collection result : samplesBySubtask.values()) {
            Object[] stackTrace = ((ThreadInfoSample)result.iterator().next()).getStackTrace();
            Assertions.assertThat((Object[])stackTrace).isNotEmpty();
        }
    }

    @Test
    void testThreadInfoRequestWithException() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(requestFuture::get, (String)"The request must be failed.", (Object[])new Object[0]).isInstanceOf(ExecutionException.class)).hasCauseInstanceOf(RuntimeException.class);
    }

    @Test
    void testThreadInfoRequestTimeout() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        try {
            Assertions.assertThatThrownBy(requestFuture::get, (String)"The request must be failed.", (Object[])new Object[0]).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)REQUEST_TIMEOUT_MESSAGE)});
        }
        finally {
            this.coordinator.shutDown();
        }
    }

    @Test
    void testShutDown() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.NEVER_COMPLETE);
        ArrayList<CompletableFuture> requestFutures = new ArrayList<CompletableFuture>();
        CompletableFuture requestFuture1 = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        CompletableFuture requestFuture2 = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        requestFutures.add(requestFuture1);
        requestFutures.add(requestFuture2);
        for (CompletableFuture future : requestFutures) {
            Assertions.assertThat((CompletableFuture)future).isNotDone();
        }
        this.coordinator.shutDown();
        for (CompletableFuture future : requestFutures) {
            Assertions.assertThat((CompletableFuture)future).isCompletedExceptionally();
        }
        CompletableFuture future = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        Assertions.assertThat((CompletableFuture)future).isCompletedExceptionally();
    }

    private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskManagerGateway(CompletionType completionType) throws Exception {
        CompletableFuture responseFuture = new CompletableFuture();
        switch (completionType) {
            case SUCCESSFULLY: {
                HashSet<IdleTestTask> tasks = new HashSet<IdleTestTask>();
                IdleTestTask.executeWithTerminationGuarantee(() -> {
                    tasks.add(new IdleTestTask());
                    tasks.add(new IdleTestTask());
                    Map<Long, ExecutionAttemptID> threads = tasks.stream().collect(Collectors.toMap(task -> task.getExecutingThread().getId(), IdleTestTask::getExecutionId));
                    Map<ExecutionAttemptID, Collection> threadInfoSample = JvmUtils.createThreadInfoSample(threads.keySet(), (int)100).entrySet().stream().collect(Collectors.toMap(entry -> (ExecutionAttemptID)threads.get(entry.getKey()), entry -> Collections.singletonList(entry.getValue())));
                    responseFuture.complete(new TaskThreadInfoResponse(threadInfoSample));
                }, tasks);
                break;
            }
            case EXCEPTIONALLY: {
                responseFuture.completeExceptionally(new RuntimeException("Request failed."));
                break;
            }
            case TIMEOUT: {
                executorService.schedule(() -> responseFuture.completeExceptionally(new TimeoutException(REQUEST_TIMEOUT_MESSAGE)), REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                break;
            }
            case NEVER_COMPLETE: {
                break;
            }
            default: {
                throw new RuntimeException("Unknown completion type.");
            }
        }
        TaskExecutorThreadInfoGateway executorGateway = (taskExecutionAttemptId, requestParams, timeout) -> responseFuture;
        return CompletableFuture.completedFuture(executorGateway);
    }

    private static Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways(CompletionType ... completionTypes) throws Exception {
        HashMap<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> result = new HashMap<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>();
        for (CompletionType completionType : completionTypes) {
            ImmutableSet ids = ImmutableSet.of((Object)ExecutionGraphTestUtils.createExecutionAttemptId(), (Object)ExecutionGraphTestUtils.createExecutionAttemptId());
            result.put((ImmutableSet<ExecutionAttemptID>)ids, ThreadInfoRequestCoordinatorTest.createMockTaskManagerGateway(completionType));
        }
        return result;
    }

    private static enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE;

    }
}

