/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.stream.Stream;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={FailsWithAdaptiveScheduler.class})
public class UnalignedCheckpointITCase
extends UnalignedCheckpointTestBase {
    private final UnalignedCheckpointTestBase.UnalignedSettings settings;

    @Parameterized.Parameters(name="{0} with {2} channels, p = {1}, timeout = {3}, buffersPerChannel = {4}")
    public static Object[][] parameters() {
        Object[] defaults = new Object[]{Topology.PIPELINE, 1, UnalignedCheckpointTestBase.ChannelType.MIXED, 0};
        Object[][] runs = new Object[][]{{Topology.PIPELINE, 1, UnalignedCheckpointTestBase.ChannelType.LOCAL}, {Topology.PIPELINE, 1, UnalignedCheckpointTestBase.ChannelType.REMOTE}, {Topology.PIPELINE, 5, UnalignedCheckpointTestBase.ChannelType.LOCAL}, {Topology.PIPELINE, 5, UnalignedCheckpointTestBase.ChannelType.REMOTE}, {Topology.PIPELINE, 20}, {Topology.PIPELINE, 20, UnalignedCheckpointTestBase.ChannelType.MIXED, 1}, {Topology.PIPELINE, 20, UnalignedCheckpointTestBase.ChannelType.MIXED, 5}, {Topology.MULTI_INPUT, 5}, {Topology.MULTI_INPUT, 10}, {Topology.UNION, 5}, {Topology.UNION, 10}};
        return (Object[][])Stream.of(runs).map(params -> UnalignedCheckpointITCase.addDefaults(params, defaults)).map(params -> new Object[][]{ArrayUtils.add((Object[])params, (Object)0), ArrayUtils.add((Object[])params, (Object)1)}).flatMap(Arrays::stream).toArray(x$0 -> new Object[x$0][]);
    }

    private static Object[] addDefaults(Object[] params, Object[] defaults) {
        return ArrayUtils.addAll((Object[])params, (Object[])ArrayUtils.subarray((Object[])defaults, (int)params.length, (int)defaults.length));
    }

    public UnalignedCheckpointITCase(Topology topology, int parallelism, UnalignedCheckpointTestBase.ChannelType channelType, int timeout, int buffersPerChannel) {
        this.settings = new UnalignedCheckpointTestBase.UnalignedSettings(topology).setParallelism(parallelism).setChannelTypes(channelType).setExpectedFailures(5).setFailuresAfterSourceFinishes(1).setCheckpointTimeout(Duration.ofSeconds(30L)).setTolerableCheckpointFailures(3).setAlignmentTimeout(timeout).setBuffersPerChannel(buffersPerChannel);
    }

    @Test
    public void execute() throws Exception {
        this.execute(this.settings);
    }

    @Override
    protected void checkCounters(JobExecutionResult result) {
        this.collector.checkThat("NUM_OUT_OF_ORDER", result.getAccumulatorResult("outOfOrder"), Matchers.equalTo((Object)0L));
        this.collector.checkThat("NUM_DUPLICATES", result.getAccumulatorResult("duplicates"), Matchers.equalTo((Object)0L));
        this.collector.checkThat("NUM_LOST", result.getAccumulatorResult("lost"), Matchers.equalTo((Object)0L));
        this.collector.checkThat("NUM_FAILURES", result.getAccumulatorResult("failures"), Matchers.equalTo((Object)this.settings.expectedFailures));
    }

    private static DataStreamSink<Long> addFailingPipeline(long minCheckpoints, boolean slotSharing, DataStream<Long> combinedSource) {
        return combinedSource.partitionCustom((Partitioner)new UnalignedCheckpointTestBase.ShiftingPartitioner(), (KeySelector & Serializable)l -> l).map((MapFunction)new UnalignedCheckpointTestBase.FailingMapper((FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> state.completedCheckpoints >= minCheckpoints / 4L && state.runNumber == 0L || state.completedCheckpoints >= minCheckpoints * 3L / 4L && state.runNumber == 2L, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> state.completedCheckpoints >= minCheckpoints / 2L && state.runNumber == 1L, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> state.runNumber == 3L, (FilterFunction<UnalignedCheckpointTestBase.FailingMapper.FailingMapperState>)(FilterFunction & Serializable)state -> state.runNumber == 4L)).name("failing-map").uid("failing-map").slotSharingGroup(slotSharing ? "default" : "failing-map").partitionCustom((Partitioner)new UnalignedCheckpointTestBase.ChunkDistributingPartitioner(), (KeySelector & Serializable)l -> l).addSink((SinkFunction)new StrictOrderVerifyingSink(minCheckpoints, combinedSource.getExecutionEnvironment().getCheckpointInterval())).name("sink").uid("sink").slotSharingGroup(slotSharing ? "default" : "sink");
    }

    private static class SourceAwareMinEmittingFunction
    extends RichFlatMapFunction<Tuple2<Integer, Long>, Long>
    implements CheckpointedFunction {
        private final int numSources;
        private State state;
        private ListState<State> stateList;

        public SourceAwareMinEmittingFunction(int numSources) {
            this.numSources = numSources;
        }

        public void flatMap(Tuple2<Integer, Long> sourceValue, Collector<Long> out) throws Exception {
            int source = (Integer)sourceValue.f0;
            long value = UnalignedCheckpointTestBase.withoutHeader((Long)sourceValue.f1);
            int partition = (int)(value % (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
            ((State)this.state).lastValues[source][partition] = value;
            for (int index = 0; index < this.numSources; ++index) {
                if (this.state.lastValues[index][partition] >= value) continue;
                return;
            }
            out.collect(sourceValue.f1);
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.stateList.clear();
            this.stateList.add((Object)this.state);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.stateList = context.getOperatorStateStore().getListState(new ListStateDescriptor("state", State.class));
            this.state = (State)Iterables.getOnlyElement((Iterable)((Iterable)this.stateList.get()), (Object)new State(this.numSources, this.getRuntimeContext().getNumberOfParallelSubtasks()));
        }

        private static class State {
            private final long[][] lastValues;

            public State(int numSources, int numberOfParallelSubtasks) {
                for (long[] lastValue : this.lastValues = new long[numSources][numberOfParallelSubtasks]) {
                    Arrays.fill(lastValue, Long.MIN_VALUE);
                }
            }
        }
    }

    private static class KeyedIdentityFunction
    extends KeyedProcessFunction<Long, Long, Long> {
        ValueState<Long> state;

        private KeyedIdentityFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.state = this.getRuntimeContext().getState(new ValueStateDescriptor("keyedState", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO));
        }

        public void processElement(Long value, KeyedProcessFunction.Context ctx, Collector<Long> out) {
            UnalignedCheckpointTestBase.checkHeader(value);
            out.collect((Object)value);
        }
    }

    protected static class StrictOrderVerifyingSink
    extends UnalignedCheckpointTestBase.VerifyingSinkBase<State> {
        private boolean firstOutOfOrder = true;
        private boolean firstDuplicate = true;
        private boolean firstLostValue = true;

        protected StrictOrderVerifyingSink(long minCheckpoints, long checkpointingInterval) {
            super(minCheckpoints, checkpointingInterval);
        }

        @Override
        protected State createState() {
            return new State(this.getRuntimeContext().getNumberOfParallelSubtasks());
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            super.initializeState(context);
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            value = UnalignedCheckpointTestBase.withoutHeader(value);
            int parallelism = ((State)this.state).lastRecordInPartitions.length;
            int partition = (int)(value % (long)parallelism);
            long lastRecord = ((State)this.state).lastRecordInPartitions[partition];
            if (value < lastRecord) {
                ++((State)this.state).numOutOfOrderness;
                if (this.firstOutOfOrder) {
                    UnalignedCheckpointTestBase.LOG.info("Out of order records current={} and last={} @ {} subtask ({} attempt)", new Object[]{value, lastRecord, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getAttemptNumber()});
                    this.firstOutOfOrder = false;
                }
            } else if (value == lastRecord) {
                ++((State)this.state).numDuplicates;
                if (this.firstDuplicate) {
                    UnalignedCheckpointTestBase.LOG.info("Duplicate record {} @ {} subtask ({} attempt)", new Object[]{value, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getAttemptNumber()});
                    this.firstDuplicate = false;
                }
            } else if (lastRecord != -1L) {
                long expectedValue = lastRecord + (long)(parallelism * parallelism);
                if (value != expectedValue) {
                    ++((State)this.state).numLostValues;
                    if (this.firstLostValue) {
                        UnalignedCheckpointTestBase.LOG.info("Lost records current={}, expected={}, and last={} @ {} subtask ({} attempt)", new Object[]{value, expectedValue, lastRecord, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getAttemptNumber()});
                        this.firstLostValue = false;
                    }
                }
            }
            ((State)((State)this.state)).lastRecordInPartitions[partition] = value;
            ++((State)this.state).numOutput;
            this.induceBackpressure();
        }

        static class State
        extends UnalignedCheckpointTestBase.VerifyingSinkStateBase {
            private final long[] lastRecordInPartitions;

            private State(int numberOfParallelSubtasks) {
                this.lastRecordInPartitions = new long[numberOfParallelSubtasks];
                Arrays.fill(this.lastRecordInPartitions, -1L);
            }
        }
    }

    static enum Topology implements UnalignedCheckpointTestBase.DagCreator
    {
        PIPELINE{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                SingleOutputStreamOperator stream = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, parallelism, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source").slotSharingGroup(slotSharing ? "default" : "source").disableChaining().map((MapFunction & Serializable)i -> UnalignedCheckpointTestBase.checkHeader(i)).name("forward").uid("forward").slotSharingGroup(slotSharing ? "default" : "forward").keyBy((KeySelector & Serializable)i -> UnalignedCheckpointTestBase.withoutHeader(i) % (long)parallelism * (long)parallelism).process((KeyedProcessFunction)new KeyedIdentityFunction()).name("keyed").uid("keyed");
                UnalignedCheckpointITCase.addFailingPipeline(minCheckpoints, slotSharing, (DataStream<Long>)((DataStream)stream));
            }
        }
        ,
        MULTI_INPUT{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                SingleOutputStreamOperator combinedSource = null;
                for (int inputIndex = 0; inputIndex < 3; ++inputIndex) {
                    SingleOutputStreamOperator source = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, parallelism, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source" + inputIndex).slotSharingGroup(slotSharing ? "default" : "source" + inputIndex).disableChaining();
                    combinedSource = combinedSource == null ? source : combinedSource.connect((DataStream)source).flatMap((CoFlatMapFunction)new UnalignedCheckpointTestBase.MinEmittingFunction()).name("min" + inputIndex).uid("min" + inputIndex).slotSharingGroup(slotSharing ? "default" : "min" + inputIndex);
                }
                UnalignedCheckpointITCase.addFailingPipeline(minCheckpoints, slotSharing, (DataStream<Long>)combinedSource);
            }
        }
        ,
        UNION{

            @Override
            public void create(StreamExecutionEnvironment env, int minCheckpoints, boolean slotSharing, int expectedRestarts, long sourceSleepMs) {
                int parallelism = env.getParallelism();
                SingleOutputStreamOperator combinedSource = null;
                for (int inputIndex = 0; inputIndex < 3; ++inputIndex) {
                    int finalInputIndex = inputIndex;
                    SingleOutputStreamOperator source = env.fromSource((Source)new UnalignedCheckpointTestBase.LongSource(minCheckpoints, parallelism, expectedRestarts, env.getCheckpointInterval(), sourceSleepMs), WatermarkStrategy.noWatermarks(), "source" + inputIndex).slotSharingGroup(slotSharing ? "default" : "source" + inputIndex).map((MapFunction & Serializable)i -> new Tuple2((Object)finalInputIndex, (Object)UnalignedCheckpointTestBase.checkHeader(i))).returns(TypeInformation.of((TypeHint)new TypeHint<Tuple2<Integer, Long>>(){})).slotSharingGroup(slotSharing ? "default" : "source" + inputIndex).disableChaining();
                    combinedSource = combinedSource == null ? source : combinedSource.union(new DataStream[]{source});
                }
                SingleOutputStreamOperator deduplicated = combinedSource.flatMap((FlatMapFunction)new SourceAwareMinEmittingFunction(3)).name("min").uid("min").slotSharingGroup(slotSharing ? "default" : "min");
                UnalignedCheckpointITCase.addFailingPipeline(minCheckpoints, slotSharing, (DataStream<Long>)((DataStream)deduplicated));
            }
        };


        public String toString() {
            return this.name().toLowerCase();
        }
    }
}

