/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adapter;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionVertex;
import org.apache.flink.runtime.scheduler.adapter.DefaultResultPartition;
import org.apache.flink.runtime.scheduler.adapter.DefaultSchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IteratorAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultExecutionTopologyTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private DefaultExecutionGraph executionGraph;
    private DefaultExecutionTopology adapter;

    DefaultExecutionTopologyTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        JobVertex[] jobVertices = new JobVertex[2];
        int parallelism = 3;
        jobVertices[0] = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        jobVertices[1] = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        jobVertices[1].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        this.executionGraph = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor(), jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
    }

    @Test
    void testConstructor() {
        DefaultExecutionTopologyTest.assertGraphEquals((ExecutionGraph)this.executionGraph, this.adapter);
    }

    @Test
    void testGetResultPartition() {
        for (ExecutionVertex vertex : this.executionGraph.getAllExecutionVertices()) {
            for (Map.Entry entry : vertex.getProducedPartitions().entrySet()) {
                IntermediateResultPartition partition = (IntermediateResultPartition)entry.getValue();
                DefaultResultPartition schedulingResultPartition = this.adapter.getResultPartition((IntermediateResultPartitionID)entry.getKey());
                DefaultExecutionTopologyTest.assertPartitionEquals(partition, schedulingResultPartition);
            }
        }
    }

    @Test
    void testResultPartitionStateSupplier() {
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition)IterableUtils.toStream((Iterable)this.executionGraph.getAllExecutionVertices()).flatMap(v -> v.getProducedPartitions().values().stream()).findAny().get();
        DefaultResultPartition schedulingResultPartition = this.adapter.getResultPartition(intermediateResultPartition.getPartitionId());
        Assertions.assertThat((Comparable)schedulingResultPartition.getState()).isEqualTo((Object)ResultPartitionState.CREATED);
        intermediateResultPartition.markDataProduced();
        Assertions.assertThat((Comparable)schedulingResultPartition.getState()).isEqualTo((Object)ResultPartitionState.CONSUMABLE);
    }

    @Test
    void testGetVertexOrThrow() {
        Assertions.assertThatThrownBy(() -> this.adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0))).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testResultPartitionOrThrow() {
        Assertions.assertThatThrownBy(() -> this.adapter.getResultPartition(new IntermediateResultPartitionID())).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testGetAllPipelinedRegions() {
        Iterable allPipelinedRegions = this.adapter.getAllPipelinedRegions();
        Assertions.assertThat((Iterable)allPipelinedRegions).hasSize(1);
    }

    @Test
    void testGetPipelinedRegionOfVertex() {
        for (DefaultExecutionVertex vertex : this.adapter.getVertices()) {
            DefaultSchedulingPipelinedRegion pipelinedRegion = this.adapter.getPipelinedRegionOfVertex(vertex.getId());
            this.assertRegionContainsAllVertices(pipelinedRegion);
        }
    }

    @Test
    void testErrorIfCoLocatedTasksAreNotInSameRegion() throws Exception {
        int parallelism = 3;
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        v1.setSlotSharingGroup(slotSharingGroup);
        v2.setSlotSharingGroup(slotSharingGroup);
        v1.setStrictlyCoLocatedWith(v2);
        Assertions.assertThatThrownBy(() -> ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor(), v1, v2)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testUpdateTopology() throws Exception {
        JobVertex[] jobVertices = this.createJobVertices(ResultPartitionType.BLOCKING);
        this.executionGraph = this.createDynamicGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
        ExecutionJobVertex ejv1 = this.executionGraph.getJobVertex(jobVertices[0].getID());
        ExecutionJobVertex ejv2 = this.executionGraph.getJobVertex(jobVertices[1].getID());
        this.executionGraph.initializeJobVertex(ejv1, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv1));
        Assertions.assertThat((Iterable)this.adapter.getVertices()).hasSize(3);
        this.executionGraph.initializeJobVertex(ejv2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv2));
        Assertions.assertThat((Iterable)this.adapter.getVertices()).hasSize(6);
        DefaultExecutionTopologyTest.assertGraphEquals((ExecutionGraph)this.executionGraph, this.adapter);
    }

    @Test
    void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws Exception {
        JobVertex[] jobVertices = this.createJobVertices(ResultPartitionType.PIPELINED);
        this.executionGraph = this.createDynamicGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
        ExecutionJobVertex ejv1 = this.executionGraph.getJobVertex(jobVertices[0].getID());
        ExecutionJobVertex ejv2 = this.executionGraph.getJobVertex(jobVertices[1].getID());
        this.executionGraph.initializeJobVertex(ejv1, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv1));
        this.executionGraph.initializeJobVertex(ejv2, 0L);
        Assertions.assertThatThrownBy(() -> this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv2))).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testExistingRegionsAreNotAffectedDuringTopologyUpdate() throws Exception {
        JobVertex[] jobVertices = this.createJobVertices(ResultPartitionType.BLOCKING);
        this.executionGraph = this.createDynamicGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
        ExecutionJobVertex ejv1 = this.executionGraph.getJobVertex(jobVertices[0].getID());
        ExecutionJobVertex ejv2 = this.executionGraph.getJobVertex(jobVertices[1].getID());
        this.executionGraph.initializeJobVertex(ejv1, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv1));
        DefaultSchedulingPipelinedRegion regionOld = this.adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0));
        this.executionGraph.initializeJobVertex(ejv2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv2));
        DefaultSchedulingPipelinedRegion regionNew = this.adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0));
        Assertions.assertThat((Object)regionNew).isSameAs((Object)regionOld);
    }

    private JobVertex[] createJobVertices(ResultPartitionType resultPartitionType) {
        JobVertex[] jobVertices = new JobVertex[2];
        int parallelism = 3;
        jobVertices[0] = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertices[1] = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertices[1].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, resultPartitionType);
        return jobVertices;
    }

    private DefaultExecutionGraph createDynamicGraph(JobVertex ... jobVertices) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertices)).buildDynamicGraph((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    private void assertRegionContainsAllVertices(DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex) {
        HashSet allVertices = Sets.newHashSet((Iterable)pipelinedRegionOfVertex.getVertices());
        Assertions.assertThat((Collection)allVertices).isEqualTo((Object)Sets.newHashSet((Iterable)this.adapter.getVertices()));
    }

    private static void assertGraphEquals(ExecutionGraph originalGraph, DefaultExecutionTopology adaptedTopology) {
        Iterator originalVertices = originalGraph.getAllExecutionVertices().iterator();
        Iterator adaptedVertices = adaptedTopology.getVertices().iterator();
        while (originalVertices.hasNext()) {
            ExecutionVertex originalVertex = (ExecutionVertex)originalVertices.next();
            DefaultExecutionVertex adaptedVertex = (DefaultExecutionVertex)adaptedVertices.next();
            Assertions.assertThat((Object)adaptedVertex.getId()).isEqualTo((Object)originalVertex.getID());
            ArrayList<IntermediateResultPartition> originalConsumedPartitions = new ArrayList<IntermediateResultPartition>();
            for (ConsumedPartitionGroup consumedPartitionGroup : originalVertex.getAllConsumedPartitionGroups()) {
                for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                    IntermediateResultPartition partition = originalVertex.getExecutionGraphAccessor().getResultPartitionOrThrow(partitionId);
                    originalConsumedPartitions.add(partition);
                }
            }
            Iterable adaptedConsumedPartitions = adaptedVertex.getConsumedResults();
            DefaultExecutionTopologyTest.assertPartitionsEquals(originalConsumedPartitions, adaptedConsumedPartitions);
            Collection<IntermediateResultPartition> originalProducedPartitions = originalVertex.getProducedPartitions().values();
            Iterable adaptedProducedPartitions = adaptedVertex.getProducedResults();
            DefaultExecutionTopologyTest.assertPartitionsEquals(originalProducedPartitions, adaptedProducedPartitions);
        }
        ((IteratorAssert)Assertions.assertThat(adaptedVertices).as("Number of adapted vertices exceeds number of original vertices.", new Object[0])).isExhausted();
    }

    private static void assertPartitionsEquals(Iterable<IntermediateResultPartition> originalResultPartitions, Iterable<DefaultResultPartition> adaptedResultPartitions) {
        Assertions.assertThat(originalResultPartitions).hasSameSizeAs(adaptedResultPartitions);
        for (IntermediateResultPartition originalPartition : originalResultPartitions) {
            DefaultResultPartition adaptedPartition = IterableUtils.toStream(adaptedResultPartitions).filter(adapted -> adapted.getId().equals((Object)originalPartition.getPartitionId())).findAny().orElseThrow(() -> new AssertionError((Object)("Could not find matching adapted partition for " + originalPartition)));
            DefaultExecutionTopologyTest.assertPartitionEquals(originalPartition, adaptedPartition);
            ArrayList<ExecutionVertexID> originalConsumerIds = new ArrayList<ExecutionVertexID>();
            for (ConsumerVertexGroup consumerVertexGroup : originalPartition.getConsumerVertexGroups()) {
                for (ExecutionVertexID executionVertexId : consumerVertexGroup) {
                    originalConsumerIds.add(executionVertexId);
                }
            }
            List adaptedConsumers = adaptedPartition.getConsumerVertexGroups();
            Assertions.assertThat((List)adaptedConsumers).isNotEmpty();
            for (ExecutionVertexID originalId : originalConsumerIds) {
                Assertions.assertThat(adaptedConsumers.stream().flatMap(IterableUtils::toStream)).contains((Object[])new ExecutionVertexID[]{originalId});
            }
        }
    }

    private static void assertPartitionEquals(IntermediateResultPartition originalPartition, DefaultResultPartition adaptedPartition) {
        Assertions.assertThat((Object)adaptedPartition.getId()).isEqualTo((Object)originalPartition.getPartitionId());
        Assertions.assertThat((Comparable)adaptedPartition.getResultId()).isEqualTo((Object)originalPartition.getIntermediateResult().getId());
        Assertions.assertThat((Comparable)adaptedPartition.getResultType()).isEqualTo((Object)originalPartition.getResultType());
        Assertions.assertThat((Object)adaptedPartition.getProducer().getId()).isEqualTo((Object)originalPartition.getProducer().getID());
    }
}

