/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.partitioner;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitionerTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class RescalePartitionerTest
extends StreamPartitionerTest {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Override
    public StreamPartitioner<Tuple> createPartitioner() {
        RescalePartitioner partitioner = new RescalePartitioner();
        Assert.assertFalse((boolean)partitioner.isBroadcast());
        return partitioner;
    }

    @Test
    public void testSelectChannelsInterval() {
        this.streamPartitioner.setup(3);
        this.assertSelectedChannel(0);
        this.assertSelectedChannel(1);
        this.assertSelectedChannel(2);
        this.assertSelectedChannel(0);
    }

    @Test
    public void testExecutionGraphGeneration() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource text = env.addSource((SourceFunction)new ParallelSourceFunction<String>(){
            private static final long serialVersionUID = 7772338606389180774L;

            public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            }

            public void cancel() {
            }
        }).setParallelism(2);
        SingleOutputStreamOperator counts = text.rescale().flatMap((FlatMapFunction)new FlatMapFunction<String, Tuple2<String, Integer>>(){
            private static final long serialVersionUID = -5255930322161596829L;

            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            }
        });
        counts.rescale().print().setParallelism(2);
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        List jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)jobVertices.get(0);
        JobVertex mapVertex = (JobVertex)jobVertices.get(1);
        JobVertex sinkVertex = (JobVertex)jobVertices.get(2);
        Assert.assertEquals((long)2L, (long)sourceVertex.getParallelism());
        Assert.assertEquals((long)4L, (long)mapVertex.getParallelism());
        Assert.assertEquals((long)2L, (long)sinkVertex.getParallelism());
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore((JobGraph)jobGraph)).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        try {
            eg.attachJobGraph(jobVertices);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Building ExecutionGraph failed: " + e.getMessage()));
        }
        ExecutionJobVertex execSourceVertex = eg.getJobVertex(sourceVertex.getID());
        ExecutionJobVertex execMapVertex = eg.getJobVertex(mapVertex.getID());
        ExecutionJobVertex execSinkVertex = eg.getJobVertex(sinkVertex.getID());
        Assert.assertEquals((long)0L, (long)execSourceVertex.getInputs().size());
        Assert.assertEquals((long)1L, (long)execMapVertex.getInputs().size());
        Assert.assertEquals((long)4L, (long)execMapVertex.getParallelism());
        ExecutionVertex[] mapTaskVertices = execMapVertex.getTaskVertices();
        HashMap<Integer, Integer> mapInputPartitionCounts = new HashMap<Integer, Integer>();
        for (ExecutionVertex mapTaskVertex : mapTaskVertices) {
            Assert.assertEquals((long)1L, (long)mapTaskVertex.getNumberOfInputs());
            Assert.assertEquals((long)1L, (long)mapTaskVertex.getConsumedPartitionGroup(0).size());
            IntermediateResultPartitionID consumedPartitionId = mapTaskVertex.getConsumedPartitionGroup(0).getFirst();
            Assert.assertEquals((Object)sourceVertex.getID(), (Object)mapTaskVertex.getExecutionGraphAccessor().getResultPartitionOrThrow(consumedPartitionId).getProducer().getJobvertexId());
            int inputPartition = consumedPartitionId.getPartitionNumber();
            if (!mapInputPartitionCounts.containsKey(inputPartition)) {
                mapInputPartitionCounts.put(inputPartition, 1);
                continue;
            }
            mapInputPartitionCounts.put(inputPartition, (Integer)mapInputPartitionCounts.get(inputPartition) + 1);
        }
        Assert.assertEquals((long)2L, (long)mapInputPartitionCounts.size());
        Iterator iterator = mapInputPartitionCounts.values().iterator();
        while (iterator.hasNext()) {
            int count = (Integer)iterator.next();
            Assert.assertEquals((long)2L, (long)count);
        }
        Assert.assertEquals((long)1L, (long)execSinkVertex.getInputs().size());
        Assert.assertEquals((long)2L, (long)execSinkVertex.getParallelism());
        ExecutionVertex[] sinkTaskVertices = execSinkVertex.getTaskVertices();
        InternalExecutionGraphAccessor executionGraphAccessor = execSinkVertex.getGraph();
        HashSet<Integer> mapSubpartitions = new HashSet<Integer>();
        for (ExecutionVertex sinkTaskVertex : sinkTaskVertices) {
            Assert.assertEquals((long)1L, (long)sinkTaskVertex.getNumberOfInputs());
            Assert.assertEquals((long)2L, (long)sinkTaskVertex.getConsumedPartitionGroup(0).size());
            for (IntermediateResultPartitionID consumedPartitionId : sinkTaskVertex.getConsumedPartitionGroup(0)) {
                IntermediateResultPartition consumedPartition = executionGraphAccessor.getResultPartitionOrThrow(consumedPartitionId);
                Assert.assertEquals((Object)mapVertex.getID(), (Object)consumedPartition.getProducer().getJobvertexId());
                int partitionNumber = consumedPartition.getPartitionNumber();
                Assert.assertFalse((boolean)mapSubpartitions.contains(partitionNumber));
                mapSubpartitions.add(partitionNumber);
            }
        }
        Assert.assertEquals((long)4L, (long)mapSubpartitions.size());
    }
}

