/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.CommonTestUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class PartialUpdateITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update');", "CREATE TABLE IF NOT EXISTS dwd_orders (OrderID INT, OrderNumber INT, PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'ignore-delete'='true');", "CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT, OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');", "CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');");
    }

    @Test
    public void testMergeInMemory() {
        this.batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), '5'), (1, 2, CAST(NULL AS INT), 6, CAST(NULL AS STRING))", new Object[0]);
        List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 6, "5"})});
    }

    @Test
    public void testMergeRead() {
        this.batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 4, 5, "6"})});
        Assertions.assertThat(this.batchSql("SELECT a FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{4})});
    }

    @Test
    public void testMergeCompaction() {
        this.batchSql("ALTER TABLE T SET ('commit.force-compact'='true')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1, '1')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4, CAST(NULL AS STRING))", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 4, 5, "6"}), Row.of((Object[])new Object[]{1, 3, 2, 4, "1"})});
    }

    @Test
    public void testForeignKeyJoin() throws Exception {
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, (Object)ExecutionConfigOptions.UpsertMaterialize.NONE);
        CloseableIterator<Row> iter = this.streamSqlIter("INSERT INTO dwd_orders SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders UNION ALL SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;", new Object[0]);
        this.batchSql("INSERT INTO ods_orders VALUES (1, 2, 3)", new Object[0]);
        this.batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'jon', 23)", new Object[0]);
        CommonTestUtils.waitUtil(() -> this.rowsToList(this.batchSql("SELECT * FROM dwd_orders", new Object[0])).contains(Arrays.asList(1, 2, 3, "snow", "jon", 23)), (Duration)Duration.ofSeconds(5L), (Duration)Duration.ofMillis(200L));
        this.batchSql("INSERT INTO ods_orders VALUES (1, 4, 3)", new Object[0]);
        this.batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'targaryen', 23)", new Object[0]);
        CommonTestUtils.waitUtil(() -> this.rowsToList(this.batchSql("SELECT * FROM dwd_orders", new Object[0])).contains(Arrays.asList(1, 4, 3, "snow", "targaryen", 23)), (Duration)Duration.ofSeconds(5L), (Duration)Duration.ofMillis(200L));
        iter.close();
    }

    private List<List<Object>> rowsToList(List<Row> rows) {
        return rows.stream().map(this::toList).collect(Collectors.toList());
    }

    private List<Object> toList(Row row) {
        Assertions.assertThat((Comparable)row.getKind()).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
        ArrayList<Object> result = new ArrayList<Object>();
        for (int i = 0; i < row.getArity(); ++i) {
            result.add(row.getField(i));
        }
        return result;
    }

    @Test
    public void testStreamingRead() {
        Assertions.assertThatThrownBy(() -> this.sEnv.from("T").execute().print(), (String)"Partial update continuous reading is not supported", (Object[])new Object[0]);
    }

    @Test
    public void testStreamingReadChangelogInput() throws TimeoutException {
        this.sql("CREATE TABLE INPUT_T (a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'changelog-producer'='input');", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM INPUT_T", new Object[0]));
        this.sql("INSERT INTO INPUT_T VALUES (1, CAST(NULL AS INT), 1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, 1})});
        this.sql("INSERT INTO INPUT_T VALUES (1, 1, CAST(NULL AS INT)), (2, 2, 2)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null}), Row.of((Object[])new Object[]{2, 2, 2})});
    }

    @Test
    public void testSequenceGroup() {
        this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 2, 1, 1, 1})});
        Assertions.assertThat(this.sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1})});
        this.sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 2, 3, 3, 3})});
        this.sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT a, b FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{4, 4})});
        Assertions.assertThat(this.sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{5, null})});
    }

    @Test
    public void testInvalidSequenceGroup() {
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_0.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0])).hasRootCauseMessage("Field g_0 can not be found in table schema.");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a1,b', 'fields.g_2.sequence-group'='c,d');", new Object[0])).hasRootCauseMessage("Field a1 can not be found in table schema.");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='a,d');", new Object[0])).hasRootCauseMessage("Field a is defined repeatedly by multiple groups: [g_1, g_2].");
    }

    @Test
    public void testProjectPushDownWithLookupChangelogProducer() {
        this.sql("CREATE TABLE IF NOT EXISTS T_P (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'changelog-producer' = 'lookup', 'fields.a.sequence-group'='j', 'fields.b.sequence-group'='c');", new Object[0]);
        this.batchSql("INSERT INTO T_P VALUES (1, 1, 1, 1, '1')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT k, c FROM T_P", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "1"})});
    }

    @Test
    public void testLocalMerge() {
        this.sql("CREATE TABLE T1 (k INT,v INT,d INT,PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)  WITH ('merge-engine'='partial-update', 'local-merge-buffer-size'='1m');", new Object[0]);
        this.sql("INSERT INTO T1 VALUES (1, CAST(NULL AS INT), 1), (2, 1, 1), (1, 2, 1)", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 1}), Row.of((Object[])new Object[]{2, 1, 1})});
    }

    @Test
    public void testPartialUpdateWithAggregation() {
        this.sql("CREATE TABLE AGG (k INT, a INT, b INT, g_1 INT, c VARCHAR, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.a.aggregate-function'='sum', 'fields.g_1.sequence-group'='a', 'fields.g_2.sequence-group'='c');", new Object[0]);
        this.sql("INSERT INTO AGG VALUES (1, 1, 1, 1, '1', 1)", new Object[0]);
        this.sql("INSERT INTO AGG VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, 2, 2, "1", 1})});
        Assertions.assertThat(this.sql("SELECT a, c FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, "1"})});
        this.sql("INSERT INTO AGG VALUES (1, 3, 3, 1, '3', 3)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 6, 3, 2, "3", 3})});
        this.sql("INSERT INTO AGG VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, CAST(NULL AS VARCHAR), 4)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT a, b, c FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{6, 3, null})});
    }

    @Test
    public void testFirstValuePartialUpdate() {
        this.sql("CREATE TABLE AGG (k INT, a INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a', 'fields.a.aggregate-function'='first_value');", new Object[0]);
        this.sql("INSERT INTO AGG VALUES (1, 1, 1), (1, 2, 2)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, 2})});
        this.sql("INSERT INTO AGG VALUES (1, 0, 0)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 0, 2})});
    }

    @Test
    public void testNoSinkMaterializer() {
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, (Object)ExecutionConfigOptions.UpsertMaterialize.FORCE);
        this.sEnv.getConfig().set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"none");
        String sql = "INSERT INTO dwd_orders SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders UNION ALL SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;";
        try {
            this.sEnv.executeSql(sql).await();
            Assertions.fail((String)"Expecting exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e).hasRootCauseMessage("Sink materializer must not be used with Paimon sink. Please set 'table.exec.sink.upsert-materialize' to 'NONE' in Flink's config.");
        }
    }

    @Test
    public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Exception {
        List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, 1}));
        String id = TestValuesTableFactory.registerData(input);
        this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE source (k INT, a INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', 'changelog-mode' = 'I,D,UA,UB')", id));
        this.sql("CREATE TABLE TEST (k INT, a INT, b INT, g_1 INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a', 'fields.g_2.sequence-group'='b');", new Object[0]);
        CloseableIterator<Row> insert1 = this.streamSqlIter("INSERT INTO TEST SELECT k, a, CAST(NULL AS INT) AS b, g_1, CAST(NULL AS INT) as g_2 FROM source", new Object[0]);
        this.sqlAssertWithRetry("SELECT * FROM TEST", list -> {
            ListAssert cfr_ignored_0 = (ListAssert)list.containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, 1, null})});
        }, new Object[0]);
        input = Arrays.asList(Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, 1, 2}));
        id = TestValuesTableFactory.registerData(input);
        this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE source2 (k INT, a INT, g_1 INT) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id));
        CloseableIterator<Row> insert2 = this.streamSqlIter("INSERT INTO TEST SELECT k, a, CAST(NULL AS INT) AS b, g_1, CAST(NULL AS INT) as g_2 FROM source2", new Object[0]);
        this.sqlAssertWithRetry("SELECT * FROM TEST", list -> {
            ListAssert cfr_ignored_0 = (ListAssert)list.containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, 2, null})});
        }, new Object[0]);
        Assertions.assertThat(this.sql("SELECT COUNT(*) FROM TEST", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1L})});
        insert1.close();
        insert2.close();
    }

    @ParameterizedTest(name="localMergeEnabled = {0}")
    @ValueSource(booleans={true, false})
    public void testIgnoreDelete(boolean localMerge) throws Exception {
        this.sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ( 'merge-engine' = 'partial-update', 'ignore-delete' = 'true')", new Object[0]);
        if (localMerge) {
            this.sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')", new Object[0]);
        }
        String id = TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, null, "apple"}), Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, null, "apple"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "A", null})));
        this.streamSqlIter("CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', 'changelog-mode' = 'I,D')", id).close();
        this.sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await();
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A", "apple"})});
    }

    @Test
    public void testIgnoreDeleteInReader() throws Exception {
        this.sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ( 'merge-engine' = 'deduplicate', 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')", new Object[0]);
        this.sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A", null})});
        HashMap<String, String> newOptions = new HashMap<String, String>();
        newOptions.put(CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
        newOptions.put(CoreOptions.BUCKET.key(), "1");
        newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
        SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)LocalFileIO.create(), new Path(this.path, "default.db/ignore_delete")), (Schema)new Schema(Arrays.asList(new DataField(0, "pk", DataTypes.INT().notNull()), new DataField(1, "a", (DataType)DataTypes.STRING()), new DataField(2, "b", (DataType)DataTypes.STRING())), Collections.emptyList(), Collections.singletonList("pk"), newOptions, null));
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A", "apple"})});
    }
}

