/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.List;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class DeletionVectorITCase
extends CatalogITCaseBase {
    @ParameterizedTest
    @ValueSource(strings={"none", "lookup"})
    public void testStreamingReadDVTable(String changelogProducer) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", changelogProducer), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */", new Object[0]);){
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(12)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{3, "3"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4_1"})});
        }
        iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);
        var3_3 = null;
        try {
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4_1"})});
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (iter != null) {
                if (var3_3 != null) {
                    try {
                        iter.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    iter.close();
                }
            }
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"none", "lookup"})
    public void testBatchReadDVTable(String changelogProducer) {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", changelogProducer), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111"}), Row.of((Object[])new Object[]{2, "2_2"}), Row.of((Object[])new Object[]{3, "3_1"}), Row.of((Object[])new Object[]{4, "4_1"})});
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111"}), Row.of((Object[])new Object[]{2, "2"}), Row.of((Object[])new Object[]{3, "3"}), Row.of((Object[])new Object[]{4, "4"})});
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='4') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111"}), Row.of((Object[])new Object[]{2, "2_1"}), Row.of((Object[])new Object[]{3, "3_1"}), Row.of((Object[])new Object[]{4, "4"})});
    }

    @ParameterizedTest
    @ValueSource(strings={"none", "lookup"})
    public void testDVTableWithAggregationMergeEngine(String changelogProducer) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", changelogProducer), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 1), (3, 1)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 1), (4, 1)", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 111111111}), Row.of((Object[])new Object[]{2, 4}), Row.of((Object[])new Object[]{3, 4}), Row.of((Object[])new Object[]{4, 5})});
        if (changelogProducer.equals("lookup")) {
            try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);){
                AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 111111111}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, 3}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, 4}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, 4}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, 3}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, 4}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, 4}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, 5})});
            }
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"none", "lookup"})
    public void testDVTableWithPartialUpdateMergeEngine(String changelogProducer) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1 STRING, v2 STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'merge-engine'='partial-update')", changelogProducer), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111', '1'), (2, '2', CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1', '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4, '4', CAST(NULL AS STRING))", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111", "1"}), Row.of((Object[])new Object[]{2, "2_1", "2"}), Row.of((Object[])new Object[]{3, "3_1", "3_1"}), Row.of((Object[])new Object[]{4, "4", "4"})});
        if (changelogProducer.equals("lookup")) {
            try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);){
                AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111", "1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2", "2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1", "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, null, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2", "2"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_1", "2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, null, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4", "4"})});
            }
        }
    }
}

