/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.ReplicatingInputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ReplicatingDataSourceITCase
extends MultipleProgramsTestBase {
    public ReplicatingDataSourceITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testReplicatedSourceToJoin() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator source1 = env.createInput((InputFormat)new ReplicatingInputFormat((InputFormat)new ParallelIteratorInputFormat((SplittableIterator)new NumberSequenceIterator(0L, 1000L))), (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO).map((MapFunction)new ToTuple());
        MapOperator source2 = env.generateSequence(0L, 1000L).map((MapFunction)new ToTuple());
        AggregateOperator pairs = source1.join((DataSet)source2).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0}).sum(0);
        List result = pairs.collect();
        String expectedResult = "(500500)";
        TestBaseUtils.compareResultAsText((List)result, (String)expectedResult);
    }

    @Test
    public void testReplicatedSourceToCross() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator source1 = env.createInput((InputFormat)new ReplicatingInputFormat((InputFormat)new ParallelIteratorInputFormat((SplittableIterator)new NumberSequenceIterator(0L, 1000L))), (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO).map((MapFunction)new ToTuple());
        MapOperator source2 = env.generateSequence(0L, 1000L).map((MapFunction)new ToTuple());
        AggregateOperator pairs = source1.cross((DataSet)source2).filter((FilterFunction)new FilterFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>>(){

            public boolean filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
                return ((Long)((Tuple1)value.f0).f0).equals(((Tuple1)value.f1).f0);
            }
        }).map((MapFunction)new MapFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>, Tuple1<Long>>(){

            public Tuple1<Long> map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
                return (Tuple1)value.f0;
            }
        }).sum(0);
        List result = pairs.collect();
        String expectedResult = "(500500)";
        TestBaseUtils.compareResultAsText((List)result, (String)expectedResult);
    }

    private static class ToTuple
    implements MapFunction<Long, Tuple1<Long>> {
        private ToTuple() {
        }

        public Tuple1<Long> map(Long value) throws Exception {
            return new Tuple1((Object)value);
        }
    }
}

