/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FiniteTestSource;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.SerializableRowData;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class FileStoreITCase
extends AbstractTestBase {
    public static final RowType TABLE_TYPE = new RowType(Arrays.asList(new RowType.RowField("v", (LogicalType)new IntType()), new RowType.RowField("p", (LogicalType)new VarCharType(10)), new RowType.RowField("_k", (LogicalType)new IntType())));
    public static final ObjectIdentifier IDENTIFIER = ObjectIdentifier.of((String)"catalog", (String)"db", (String)"t");
    public static final DataStructureConverter<RowData, Row> CONVERTER = DataStructureConverters.getConverter((DataType)TypeConversions.fromLogicalToDataType((LogicalType)TABLE_TYPE));
    private static final int NUM_BUCKET = 3;
    public static final List<RowData> SOURCE_DATA = Arrays.asList(FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{0, StringData.fromString((String)"p1"), 1})), FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{0, StringData.fromString((String)"p1"), 2})), FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{0, StringData.fromString((String)"p1"), 1})), FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{5, StringData.fromString((String)"p1"), 1})), FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{6, StringData.fromString((String)"p2"), 1})), FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"p2"), 5})), FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{5, StringData.fromString((String)"p2"), 1})));
    private final boolean isBatch;
    private final StreamExecutionEnvironment env;

    public FileStoreITCase(boolean isBatch) {
        this.isBatch = isBatch;
        this.env = isBatch ? this.buildBatchEnv() : this.buildStreamEnv();
    }

    @Parameters(name="isBatch-{0}")
    public static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    private static SerializableRowData wrap(RowData row) {
        return new SerializableRowData(row, (TypeSerializer<RowData>)InternalSerializers.create((RowType)TABLE_TYPE));
    }

    @TestTemplate
    public void testRowSourceSink() throws Exception {
        FileStoreTable table = this.buildFileStoreTable(new int[]{1}, new int[]{1, 2});
        DataStreamSource<RowData> source = FileStoreITCase.buildTestSource(this.env, this.isBatch);
        SingleOutputStreamOperator input = source.map((MapFunction & Serializable)r -> Row.of((Object[])new Object[]{r.getInt(0), r.getString(1).toString(), r.getInt(2)})).setParallelism(source.getParallelism());
        DataType inputType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"v", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"p", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"_k", (DataType)DataTypes.INT())});
        new FlinkSinkBuilder((Table)table).forRow((DataStream)input, inputType).build();
        this.env.execute();
        List<Row> results = FileStoreITCase.executeAndCollectRow((DataStream<Row>)new FlinkSourceBuilder((Table)table).env(this.env).sourceBounded(true).buildForRow());
        Object[] expected = new Row[]{Row.of((Object[])new Object[]{5, "p2", 1}), Row.of((Object[])new Object[]{3, "p2", 5}), Row.of((Object[])new Object[]{5, "p1", 1}), Row.of((Object[])new Object[]{0, "p1", 2})};
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
    }

    @TestTemplate
    public void testPartitioned() throws Exception {
        FileStoreTable table = this.buildFileStoreTable(new int[]{1}, new int[]{1, 2});
        new FlinkSinkBuilder((Table)table).forRowData(FileStoreITCase.buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        List<Row> results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).env(this.env).build());
        Object[] expected = new Row[]{Row.of((Object[])new Object[]{5, "p2", 1}), Row.of((Object[])new Object[]{3, "p2", 5}), Row.of((Object[])new Object[]{5, "p1", 1}), Row.of((Object[])new Object[]{0, "p1", 2})};
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
    }

    @TestTemplate
    public void testNonPartitioned() throws Exception {
        FileStoreTable table = this.buildFileStoreTable(new int[0], new int[]{2});
        new FlinkSinkBuilder((Table)table).forRowData(FileStoreITCase.buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        List<Row> results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).env(this.env).build());
        Object[] expected = new Row[]{Row.of((Object[])new Object[]{5, "p2", 1}), Row.of((Object[])new Object[]{0, "p1", 2}), Row.of((Object[])new Object[]{3, "p2", 5})};
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
    }

    @TestTemplate
    public void testOverwrite() throws Exception {
        Assumptions.assumeTrue((boolean)this.isBatch);
        FileStoreTable table = this.buildFileStoreTable(new int[]{1}, new int[]{1, 2});
        new FlinkSinkBuilder((Table)table).forRowData(FileStoreITCase.buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        DataStreamSource partialData = this.env.fromCollection(Collections.singletonList(FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{9, StringData.fromString((String)"p2"), 5}))), (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE));
        HashMap<String, String> overwrite = new HashMap<String, String>();
        overwrite.put("p", "p2");
        new FlinkSinkBuilder((Table)table).forRowData((DataStream)partialData).overwrite(overwrite).build();
        this.env.execute();
        List<Row> results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).env(this.env).build());
        Object[] expected = new Row[]{Row.of((Object[])new Object[]{9, "p2", 5}), Row.of((Object[])new Object[]{5, "p1", 1}), Row.of((Object[])new Object[]{0, "p1", 2})};
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
        partialData = this.env.fromCollection(Collections.singletonList(FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{19, StringData.fromString((String)"p2"), 6}))), (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE));
        new FlinkSinkBuilder((Table)table).forRowData((DataStream)partialData).overwrite().build();
        this.env.execute();
        results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).env(this.env).build());
        expected = new Row[]{Row.of((Object[])new Object[]{19, "p2", 6}), Row.of((Object[])new Object[]{5, "p1", 1}), Row.of((Object[])new Object[]{0, "p1", 2})};
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
        partialData = this.env.fromCollection(Collections.singletonList(FileStoreITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{20, StringData.fromString((String)"p2"), 3}))), (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE));
        new FlinkSinkBuilder((Table)table.copy(Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"))).forRowData((DataStream)partialData).overwrite().build();
        this.env.execute();
        results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).env(this.env).build());
        expected = new Row[]{Row.of((Object[])new Object[]{20, "p2", 3})};
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
    }

    @TestTemplate
    public void testPartitionedNonKey() throws Exception {
        FileStoreTable table = this.buildFileStoreTable(new int[]{1}, new int[0]);
        new FlinkSinkBuilder((Table)table).forRowData(FileStoreITCase.buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        List<Row> results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).env(this.env).build());
        Stream expectedStream = this.isBatch ? SOURCE_DATA.stream() : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
        Object[] expected = (Row[])expectedStream.map(arg_0 -> CONVERTER.toExternal(arg_0)).toArray(Row[]::new);
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
    }

    @TestTemplate
    public void testKeyedProjection() throws Exception {
        this.testProjection(this.buildFileStoreTable(new int[0], new int[]{2}));
    }

    @TestTemplate
    public void testNonKeyedProjection() throws Exception {
        this.testProjection(this.buildFileStoreTable(new int[0], new int[0]));
    }

    private void testProjection(FileStoreTable table) throws Exception {
        new FlinkSinkBuilder((Table)table).forRowData(FileStoreITCase.buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        Projection projection = Projection.of((int[])new int[]{1, 2});
        DataStructureConverter converter = DataStructureConverters.getConverter((DataType)TypeConversions.fromLogicalToDataType((LogicalType)projection.project((LogicalType)TABLE_TYPE)));
        List<Row> results = FileStoreITCase.executeAndCollect((DataStream<RowData>)new FlinkSourceBuilder((Table)table).sourceBounded(true).projection(projection.toNestedIndexes()).env(this.env).build(), (DataStructureConverter<RowData, Row>)converter);
        Object[] expected = new Row[]{Row.of((Object[])new Object[]{"p2", 1}), Row.of((Object[])new Object[]{"p1", 2}), Row.of((Object[])new Object[]{"p2", 5})};
        if (table.schema().trimmedPrimaryKeys().isEmpty()) {
            Stream expectedStream = this.isBatch ? SOURCE_DATA.stream() : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
            expected = (Row[])expectedStream.map(arg_0 -> CONVERTER.toExternal(arg_0)).map(r -> Row.of((Object[])new Object[]{r.getField(1), r.getField(2)})).toArray(Row[]::new);
        }
        Assertions.assertThat(results).containsExactlyInAnyOrder(expected);
    }

    @TestTemplate
    public void testContinuous() throws Exception {
        this.innerTestContinuous(this.buildFileStoreTable(new int[0], new int[]{2}));
    }

    @TestTemplate
    public void testContinuousWithoutPK() throws Exception {
        this.innerTestContinuous(this.buildFileStoreTable(new int[0], new int[0]));
    }

    @TestTemplate
    public void testContinuousBounded() throws Exception {
        FileStoreTable table = this.buildFileStoreTable(new int[0], new int[]{2});
        table = table.copy(Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024"));
        DataStream source = new FlinkSourceBuilder((Table)table).sourceBounded(false).env(this.env).build();
        Transformation transformation = source.getTransformation();
        Assertions.assertThat((Object)transformation).isInstanceOf(SourceTransformation.class);
        Assertions.assertThat((Comparable)((SourceTransformation)transformation).getSource().getBoundedness()).isEqualTo((Object)Boundedness.BOUNDED);
    }

    private void innerTestContinuous(FileStoreTable table) throws Exception {
        Assumptions.assumeFalse((boolean)this.isBatch);
        BlockingIterator iterator = BlockingIterator.of((Iterator)new FlinkSourceBuilder((Table)table).sourceBounded(false).env(this.env).build().executeAndCollect(), arg_0 -> CONVERTER.toExternal(arg_0));
        Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        this.sinkAndValidate(table, Arrays.asList(FileStoreITCase.srcRow(RowKind.INSERT, 1, "p1", 1), FileStoreITCase.srcRow(RowKind.INSERT, 2, "p2", 2)), (BlockingIterator<RowData, Row>)iterator, Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "p1", 1}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "p2", 2}));
        if (table.primaryKeys().size() > 0) {
            this.sinkAndValidate(table, Arrays.asList(FileStoreITCase.srcRow(RowKind.DELETE, 1, "p1", 1), FileStoreITCase.srcRow(RowKind.INSERT, 3, "p3", 3)), (BlockingIterator<RowData, Row>)iterator, Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, "p1", 1}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "p3", 3}));
        }
    }

    private void sinkAndValidate(FileStoreTable table, List<RowData> src, BlockingIterator<RowData, Row> iterator, Row ... expected) throws Exception {
        if (this.isBatch) {
            throw new UnsupportedOperationException();
        }
        DataStreamSource source = this.env.addSource(new FiniteTestSource<RowData>(src, true), (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE));
        new FlinkSinkBuilder((Table)table).forRowData((DataStream)source).build();
        this.env.execute();
        Assertions.assertThat((List)iterator.collect(expected.length)).containsExactlyInAnyOrder((Object[])expected);
    }

    public FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey) throws Exception {
        return FileStoreITCase.buildFileStoreTable(this.isBatch, this.getTempDirPath(), partitions, primaryKey);
    }

    private static RowData srcRow(RowKind kind, int v, String p, int k) {
        return FileStoreITCase.wrap((RowData)GenericRowData.ofKind((RowKind)kind, (Object[])new Object[]{v, StringData.fromString((String)p), k}));
    }

    public StreamExecutionEnvironment buildStreamEnv() {
        return this.streamExecutionEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).parallelism(2).build();
    }

    public StreamExecutionEnvironment buildBatchEnv() {
        return this.streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
    }

    public static FileStoreTable buildFileStoreTable(boolean noFail, String temporaryPath, int[] partitions, int[] primaryKey) throws Exception {
        Options options = FileStoreITCase.buildConfiguration(noFail, temporaryPath);
        Path tablePath = new CoreOptions(options.toMap()).path();
        Schema schema = new Schema(LogicalTypeConversion.toDataType((RowType)TABLE_TYPE).getFields(), Arrays.stream(partitions).mapToObj(i -> (String)TABLE_TYPE.getFieldNames().get(i)).collect(Collectors.toList()), Arrays.stream(primaryKey).mapToObj(i -> (String)TABLE_TYPE.getFieldNames().get(i)).collect(Collectors.toList()), options.toMap(), "");
        return (FileStoreTable)FailingFileIO.retryArtificialException(() -> {
            new SchemaManager((FileIO)LocalFileIO.create(), tablePath).createTable(schema);
            return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Options)options);
        });
    }

    public static Options buildConfiguration(boolean noFail, String temporaryPath) {
        Options options = new Options();
        options.set(CoreOptions.BUCKET, (Object)3);
        if (noFail) {
            options.set(CoreOptions.PATH, (Object)temporaryPath);
        } else {
            String failingName = UUID.randomUUID().toString();
            FailingFileIO.reset((String)failingName, (int)3, (int)100);
            options.set(CoreOptions.PATH, (Object)FailingFileIO.getFailingPath((String)failingName, (String)temporaryPath));
        }
        options.set(CoreOptions.FILE_FORMAT, (Object)CoreOptions.FileFormatType.AVRO);
        return options;
    }

    public static DataStreamSource<RowData> buildTestSource(StreamExecutionEnvironment env, boolean isBatch) {
        return isBatch ? env.fromCollection(SOURCE_DATA, (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE)) : env.addSource(new FiniteTestSource<RowData>(SOURCE_DATA, false), (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE));
    }

    public static List<Row> executeAndCollect(DataStream<RowData> source) throws Exception {
        return FileStoreITCase.executeAndCollect(source, CONVERTER);
    }

    public static List<Row> executeAndCollect(DataStream<RowData> source, DataStructureConverter<RowData, Row> converter) throws Exception {
        CloseableIterator iterator = source.executeAndCollect();
        ArrayList<Row> results = new ArrayList<Row>();
        while (iterator.hasNext()) {
            results.add((Row)converter.toExternal(iterator.next()));
        }
        iterator.close();
        return results;
    }

    public static List<Row> executeAndCollectRow(DataStream<Row> source) throws Exception {
        CloseableIterator iterator = source.executeAndCollect();
        ArrayList<Row> results = new ArrayList<Row>();
        while (iterator.hasNext()) {
            results.add((Row)iterator.next());
        }
        iterator.close();
        return results;
    }
}

