/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.flink.runtime.executiongraph.failover.SchedulingPipelinedRegionComputeUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SchedulingPipelinedRegionComputeUtilTest {
    SchedulingPipelinedRegionComputeUtilTest() {
    }

    @Test
    void testIndividualVertices() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertDistinctRegions(r1, r2, r3);
    }

    @Test
    void testEmbarrassinglyParallelCase() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex va1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex va2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex va3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb3 = topology.newExecutionVertex();
        topology.connect(va1, vb1, ResultPartitionType.PIPELINED).connect(va2, vb2, ResultPartitionType.PIPELINED).connect(va3, vb3, ResultPartitionType.PIPELINED);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
        Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
        Set<SchedulingExecutionVertex> ra3 = pipelinedRegionByVertex.get(va3.getId());
        Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
        Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
        Set<SchedulingExecutionVertex> rb3 = pipelinedRegionByVertex.get(vb3.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(ra1, rb1);
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(ra2, rb2);
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(ra3, rb3);
        SchedulingPipelinedRegionComputeUtilTest.assertDistinctRegions(ra1, ra2, ra3);
    }

    @Test
    void testOneComponentViaTwoExchanges() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex va1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex va2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vc1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vc2 = topology.newExecutionVertex();
        topology.connect(va1, vb1, ResultPartitionType.PIPELINED).connect(va1, vb2, ResultPartitionType.PIPELINED).connect(va2, vb1, ResultPartitionType.PIPELINED).connect(va2, vb2, ResultPartitionType.PIPELINED).connect(vb1, vc1, ResultPartitionType.PIPELINED).connect(vb1, vc2, ResultPartitionType.PIPELINED).connect(vb2, vc1, ResultPartitionType.PIPELINED).connect(vb2, vc2, ResultPartitionType.PIPELINED);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
        Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
        Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
        Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
        Set<SchedulingExecutionVertex> rc1 = pipelinedRegionByVertex.get(vc1.getId());
        Set<SchedulingExecutionVertex> rc2 = pipelinedRegionByVertex.get(vc2.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(ra1, ra2, rb1, rb2, rc1, rc2);
    }

    @Test
    void testOneComponentViaCascadeOfJoins() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v7 = topology.newExecutionVertex();
        topology.connect(v1, v5, ResultPartitionType.PIPELINED).connect(v2, v5, ResultPartitionType.PIPELINED).connect(v3, v6, ResultPartitionType.PIPELINED).connect(v4, v6, ResultPartitionType.PIPELINED).connect(v5, v7, ResultPartitionType.PIPELINED).connect(v6, v7, ResultPartitionType.PIPELINED);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
        Set<SchedulingExecutionVertex> r5 = pipelinedRegionByVertex.get(v5.getId());
        Set<SchedulingExecutionVertex> r6 = pipelinedRegionByVertex.get(v6.getId());
        Set<SchedulingExecutionVertex> r7 = pipelinedRegionByVertex.get(v7.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
    }

    @Test
    void testOneComponentInstanceFromOneSource() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v7 = topology.newExecutionVertex();
        topology.connect(v1, v2, ResultPartitionType.PIPELINED).connect(v1, v3, ResultPartitionType.PIPELINED).connect(v2, v4, ResultPartitionType.PIPELINED).connect(v2, v5, ResultPartitionType.PIPELINED).connect(v3, v6, ResultPartitionType.PIPELINED).connect(v3, v7, ResultPartitionType.PIPELINED);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
        Set<SchedulingExecutionVertex> r5 = pipelinedRegionByVertex.get(v5.getId());
        Set<SchedulingExecutionVertex> r6 = pipelinedRegionByVertex.get(v6.getId());
        Set<SchedulingExecutionVertex> r7 = pipelinedRegionByVertex.get(v7.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
    }

    @Test
    void testTwoComponentsViaBlockingExchange() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex va1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex va2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vc1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vc2 = topology.newExecutionVertex();
        topology.connect(va1, vb1, ResultPartitionType.PIPELINED).connect(va1, vb2, ResultPartitionType.PIPELINED).connect(va2, vb1, ResultPartitionType.PIPELINED).connect(va2, vb2, ResultPartitionType.PIPELINED).connect(vb1, vc1, ResultPartitionType.BLOCKING).connect(vb2, vc2, ResultPartitionType.BLOCKING);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
        Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
        Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
        Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
        Set<SchedulingExecutionVertex> rc1 = pipelinedRegionByVertex.get(vc1.getId());
        Set<SchedulingExecutionVertex> rc2 = pipelinedRegionByVertex.get(vc2.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(ra1, ra2, rb1, rb2);
        SchedulingPipelinedRegionComputeUtilTest.assertDistinctRegions(ra1, rc1, rc2);
    }

    @Test
    void testTwoComponentsViaBlockingExchange2() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex va1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex va2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vb2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vc1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex vc2 = topology.newExecutionVertex();
        topology.connect(va1, vb1, ResultPartitionType.PIPELINED).connect(va1, vb2, ResultPartitionType.PIPELINED).connect(va2, vb1, ResultPartitionType.PIPELINED).connect(va2, vb2, ResultPartitionType.PIPELINED).connect(vb1, vc1, ResultPartitionType.BLOCKING).connect(vb1, vc2, ResultPartitionType.BLOCKING).connect(vb2, vc1, ResultPartitionType.BLOCKING).connect(vb2, vc2, ResultPartitionType.BLOCKING);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
        Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
        Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
        Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
        Set<SchedulingExecutionVertex> rc1 = pipelinedRegionByVertex.get(vc1.getId());
        Set<SchedulingExecutionVertex> rc2 = pipelinedRegionByVertex.get(vc2.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(ra1, ra2, rb1, rb2);
        SchedulingPipelinedRegionComputeUtilTest.assertDistinctRegions(ra1, rc1, rc2);
    }

    @Test
    void testMultipleComponentsViaCascadeOfJoins() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v7 = topology.newExecutionVertex();
        topology.connect(v1, v5, ResultPartitionType.PIPELINED).connect(v2, v5, ResultPartitionType.PIPELINED).connect(v3, v6, ResultPartitionType.PIPELINED).connect(v4, v6, ResultPartitionType.PIPELINED).connect(v5, v7, ResultPartitionType.BLOCKING).connect(v6, v7, ResultPartitionType.BLOCKING);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
        Set<SchedulingExecutionVertex> r5 = pipelinedRegionByVertex.get(v5.getId());
        Set<SchedulingExecutionVertex> r6 = pipelinedRegionByVertex.get(v6.getId());
        Set<SchedulingExecutionVertex> r7 = pipelinedRegionByVertex.get(v7.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(r1, r2, r5);
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(r3, r4, r6);
        SchedulingPipelinedRegionComputeUtilTest.assertDistinctRegions(r1, r3, r7);
    }

    @Test
    void testDiamondWithMixedPipelinedAndBlockingExchanges() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        topology.connect(v1, v2, ResultPartitionType.BLOCKING).connect(v1, v3, ResultPartitionType.PIPELINED).connect(v2, v4, ResultPartitionType.PIPELINED).connect(v3, v4, ResultPartitionType.PIPELINED);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(r1, r2, r3, r4);
    }

    @Test
    void testCyclicDependentRegionsAreMerged() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        topology.connect(v1, v2, ResultPartitionType.BLOCKING).connect(v1, v3, ResultPartitionType.PIPELINED).connect(v2, v4, ResultPartitionType.BLOCKING).connect(v3, v4, ResultPartitionType.PIPELINED);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertSameRegion(r1, r2, r3, r4);
    }

    @Test
    void testPipelinedApproximateDifferentRegions() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        topology.connect(v1, v2, ResultPartitionType.PIPELINED_APPROXIMATE).connect(v1, v3, ResultPartitionType.PIPELINED_APPROXIMATE).connect(v2, v4, ResultPartitionType.PIPELINED_APPROXIMATE).connect(v3, v4, ResultPartitionType.PIPELINED_APPROXIMATE);
        Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(topology);
        Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
        Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
        Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
        Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
        SchedulingPipelinedRegionComputeUtilTest.assertDistinctRegions(r1, r2, r3, r4);
    }

    private static Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> computePipelinedRegionByVertex(TestingSchedulingTopology topology) {
        Set regions = SchedulingPipelinedRegionComputeUtil.computePipelinedRegions(topology.getVertices(), topology::getVertex, topology::getResultPartition);
        return SchedulingPipelinedRegionComputeUtilTest.computePipelinedRegionByVertex(regions);
    }

    private static Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> computePipelinedRegionByVertex(Set<Set<SchedulingExecutionVertex>> regions) {
        HashMap<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = new HashMap<ExecutionVertexID, Set<SchedulingExecutionVertex>>();
        for (Set<SchedulingExecutionVertex> region : regions) {
            for (SchedulingExecutionVertex vertex : region) {
                pipelinedRegionByVertex.put((ExecutionVertexID)vertex.getId(), region);
            }
        }
        return pipelinedRegionByVertex;
    }

    @SafeVarargs
    private static void assertSameRegion(Set<SchedulingExecutionVertex> ... regions) {
        Preconditions.checkNotNull(regions);
        for (int i = 0; i < regions.length; ++i) {
            int j = i + 1;
            while (i < regions.length) {
                Assertions.assertThat(regions[j]).isSameAs(regions[i]);
                ++i;
            }
        }
    }

    @SafeVarargs
    private static void assertDistinctRegions(Set<SchedulingExecutionVertex> ... regions) {
        Preconditions.checkNotNull(regions);
        for (int i = 0; i < regions.length; ++i) {
            for (int j = i + 1; j < regions.length; ++j) {
                Assertions.assertThat(regions[j]).isNotSameAs(regions[i]);
            }
        }
    }
}

