/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.migration;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.stream.Collectors;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TypeSerializerSnapshotMigrationITCase
extends SnapshotMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
    private static final SnapshotMigrationTestBase.ExecutionMode executionMode = SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT;
    private final SnapshotMigrationTestBase.SnapshotSpec snapshotSpec;

    @Parameterized.Parameters(name="Test snapshot: {0}")
    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> parameters() {
        Collection<SnapshotMigrationTestBase.SnapshotSpec> parameters = new LinkedList<SnapshotMigrationTestBase.SnapshotSpec>();
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("jobmanager", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_3, (FlinkVersion)FlinkVersion.v1_14)));
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_15, (FlinkVersion)currentVersion)));
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_3, (FlinkVersion)currentVersion)));
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_NATIVE, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_15, (FlinkVersion)currentVersion)));
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_NATIVE, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_15, (FlinkVersion)currentVersion)));
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.CHECKPOINT, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_15, (FlinkVersion)currentVersion)));
        parameters.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.CHECKPOINT, FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_15, (FlinkVersion)currentVersion)));
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            parameters = parameters.stream().filter(x -> x.getFlinkVersion().equals((Object)currentVersion)).collect(Collectors.toList());
        }
        return parameters;
    }

    public TypeSerializerSnapshotMigrationITCase(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) throws Exception {
        this.snapshotSpec = snapshotSpec;
    }

    @Test
    public void testSnapshot() throws Exception {
        boolean parallelism = true;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.noRestart());
        switch (this.snapshotSpec.getStateBackendType()) {
            case "rocksdb": {
                env.setStateBackend((StateBackend)new EmbeddedRocksDBStateBackend());
                break;
            }
            case "jobmanager": {
                env.setStateBackend((StateBackend)new MemoryStateBackend());
                break;
            }
            case "hashmap": {
                env.setStateBackend((StateBackend)new HashMapStateBackend());
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
        env.enableCheckpointing(500L);
        env.setParallelism(1);
        env.setMaxParallelism(1);
        MigrationTestUtils.CheckpointingNonParallelSourceWithListState nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(4);
        env.addSource((SourceFunction)nonParallelSource).keyBy(new int[]{0}).map((MapFunction)new TestMapFunction()).addSink(new MigrationTestUtils.AccumulatorCountingSink());
        String snapshotPath = this.getSnapshotPath(this.snapshotSpec);
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            this.executeAndSnapshot(env, "src/test/resources/" + snapshotPath, this.snapshotSpec.getSnapshotType(), Tuple2.of((Object)MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, (Object)4));
        } else if (executionMode == SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT) {
            this.restoreAndExecute(env, TypeSerializerSnapshotMigrationITCase.getResourceFilename(snapshotPath), Tuple2.of((Object)MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, (Object)4));
        } else {
            throw new IllegalStateException("Unknown ExecutionMode " + (Object)((Object)executionMode));
        }
    }

    private String getSnapshotPath(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        return "type-serializer-snapshot-migration-itcase-" + snapshotSpec;
    }

    public static class TestSerializerSnapshot
    implements TypeSerializerSnapshot<Long> {
        private String configPayload;

        public TestSerializerSnapshot() {
        }

        public TestSerializerSnapshot(String configPayload) {
            this.configPayload = configPayload;
        }

        public int getCurrentVersion() {
            return 1;
        }

        public TypeSerializerSchemaCompatibility<Long> resolveSchemaCompatibility(TypeSerializer<Long> newSerializer) {
            return newSerializer instanceof TestSerializer ? TypeSerializerSchemaCompatibility.compatibleAsIs() : TypeSerializerSchemaCompatibility.incompatible();
        }

        public TypeSerializer<Long> restoreSerializer() {
            return new TestSerializer();
        }

        public void writeSnapshot(DataOutputView out) throws IOException {
            out.writeUTF(this.configPayload);
        }

        public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
            if (readVersion != 1) {
                throw new IllegalStateException("Can not recognize read version: " + readVersion);
            }
            this.configPayload = in.readUTF();
        }
    }

    private static class TestSerializer
    extends TypeSerializer<Long> {
        private static final long serialVersionUID = 1L;
        private LongSerializer serializer = new LongSerializer();
        private String configPayload = "configPayload";

        private TestSerializer() {
        }

        public TypeSerializerSnapshot<Long> snapshotConfiguration() {
            return new TestSerializerSnapshot(this.configPayload);
        }

        public TypeSerializer<Long> duplicate() {
            return this;
        }

        public void serialize(Long record, DataOutputView target) throws IOException {
            this.serializer.serialize(record, target);
        }

        public Long deserialize(Long reuse, DataInputView source) throws IOException {
            return this.serializer.deserialize(reuse, source);
        }

        public Long deserialize(DataInputView source) throws IOException {
            return this.serializer.deserialize(source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            this.serializer.copy(source, target);
        }

        public Long copy(Long from) {
            return this.serializer.copy(from);
        }

        public Long copy(Long from, Long reuse) {
            return this.serializer.copy(from, reuse);
        }

        public Long createInstance() {
            return this.serializer.createInstance();
        }

        public boolean isImmutableType() {
            return this.serializer.isImmutableType();
        }

        public int getLength() {
            return this.serializer.getLength();
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public boolean equals(Object obj) {
            return obj instanceof TestSerializer;
        }
    }

    private static class TestMapFunction
    extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private TestMapFunction() {
        }

        public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
            ValueState state = this.getRuntimeContext().getState(new ValueStateDescriptor("testState", (TypeSerializer)new TestSerializer()));
            state.update(value.f1);
            return value;
        }
    }
}

