/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class BlockingResultPartitionReleaseTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ScheduledExecutorService scheduledExecutorService;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;

    BlockingResultPartitionReleaseTest() {
    }

    @BeforeEach
    void setup() {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
    }

    @AfterEach
    void teardown() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    @Test
    void testMultipleConsumersForAdaptiveBatchScheduler() throws Exception {
        this.testResultPartitionConsumedByMultiConsumers(true);
    }

    @Test
    void testMultipleConsumersForDefaultScheduler() throws Exception {
        this.testResultPartitionConsumedByMultiConsumers(false);
    }

    private void testResultPartitionConsumedByMultiConsumers(boolean isAdaptive) throws Exception {
        int parallelism = 2;
        JobID jobId = new JobID();
        JobVertex producer = ExecutionGraphTestUtils.createNoOpVertex("producer", parallelism);
        JobVertex consumer1 = ExecutionGraphTestUtils.createNoOpVertex("consumer1", parallelism);
        JobVertex consumer2 = ExecutionGraphTestUtils.createNoOpVertex("consumer2", parallelism);
        TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
        SchedulerBase scheduler = SchedulerTestingUtils.createSchedulerAndDeploy(isAdaptive, jobId, producer, new JobVertex[]{consumer1, consumer2}, DistributionPattern.ALL_TO_ALL, new TestingBlobWriter(Integer.MAX_VALUE), this.mainThreadExecutor, this.ioExecutor, partitionTracker, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor(), new Configuration());
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        Assertions.assertThat((List)partitionTracker.releasedPartitions).isEmpty();
        CompletableFuture.runAsync(() -> ExecutionGraphTestUtils.finishJobVertex(executionGraph, consumer1.getID()), (Executor)this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assertions.assertThat((List)partitionTracker.releasedPartitions).isEmpty();
        CompletableFuture.runAsync(() -> ExecutionGraphTestUtils.finishJobVertex(executionGraph, consumer2.getID()), (Executor)this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assertions.assertThat((List)partitionTracker.releasedPartitions).hasSize(parallelism);
        for (int i = 0; i < parallelism; ++i) {
            ExecutionJobVertex ejv = (ExecutionJobVertex)Preconditions.checkNotNull((Object)executionGraph.getJobVertex(producer.getID()));
            Assertions.assertThat(partitionTracker.releasedPartitions.stream().map(ResultPartitionID::getPartitionId)).containsExactlyInAnyOrder((Object[])Arrays.stream(ejv.getProducedDataSets()[0].getPartitions()).map(IntermediateResultPartition::getPartitionId).toArray(IntermediateResultPartitionID[]::new));
        }
    }

    private static class TestingPartitionTracker
    extends NoOpJobMasterPartitionTracker {
        private final List<ResultPartitionID> releasedPartitions = new ArrayList<ResultPartitionID>();

        private TestingPartitionTracker() {
        }

        @Override
        public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster) {
            this.releasedPartitions.addAll((Collection)Preconditions.checkNotNull(resultPartitionIds));
        }
    }
}

