/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AppendOnlyTableITCase
extends CatalogITCaseBase {
    @Test
    public void testCreateUnawareBucketTableWithBucketKey() {
        Assertions.assertThatThrownBy(() -> this.batchSql("CREATE TABLE pk_table (id INT, data STRING) WITH ('bucket' = '-1', 'bucket-key' = 'id')", new Object[0])).hasRootCauseInstanceOf(RuntimeException.class).hasRootCauseMessage("Cannot define 'bucket-key' in unaware or dynamic bucket mode.");
    }

    @Test
    public void testCreateUnawareBucketTableWithFullCompaction() {
        Assertions.assertThatThrownBy(() -> this.batchSql("CREATE TABLE pk_table (id INT, data STRING) WITH ('bucket' = '-1','full-compaction.delta-commits'='10')", new Object[0])).hasRootCauseInstanceOf(RuntimeException.class).hasRootCauseMessage("AppendOnlyTable of unware or dynamic bucket does not support 'full-compaction.delta-commits'");
    }

    @Test
    public void testReadEmpty() {
        Assertions.assertThat(this.batchSql("SELECT * FROM append_table", new Object[0])).isEmpty();
    }

    @Test
    public void testReadWrite() {
        this.batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", new Object[0]);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(2);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"})});
        rows = this.batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(2);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
        rows = this.batchSql("SELECT data from append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(2);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"BBB"})});
    }

    @Test
    public void testReadPartitionOrder() {
        this.setParallelism(1);
        this.batchSql("INSERT INTO part_table VALUES (1, 'AAA', 'part-1')", new Object[0]);
        this.batchSql("INSERT INTO part_table VALUES (2, 'BBB', 'part-2')", new Object[0]);
        this.batchSql("INSERT INTO part_table VALUES (3, 'CCC', 'part-3')", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM part_table", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA", "part-1"}), Row.of((Object[])new Object[]{2, "BBB", "part-2"}), Row.of((Object[])new Object[]{3, "CCC", "part-3"})});
    }

    @Test
    public void testSkipDedup() {
        this.batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2, 'BBB'), (3, 'BBB')", new Object[0]);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "BBB"})});
        rows = this.batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3})});
        rows = this.batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"BBB"}), Row.of((Object[])new Object[]{"BBB"})});
    }

    @Test
    public void testIngestFromSource() {
        List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "AAA"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "AAA"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "BBB"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "AAA"}));
        String id = TestValuesTableFactory.registerData(input);
        this.batchSql("CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        this.batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0]);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{1, "BBB"}), Row.of((Object[])new Object[]{2, "AAA"})});
        rows = this.batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
        rows = this.batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"BBB"}), Row.of((Object[])new Object[]{"AAA"})});
    }

    @Test
    public void testAutoCompaction() {
        this.batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')", new Object[0]);
        this.batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')", new Object[0]);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", 1L, Snapshot.CommitKind.APPEND);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')", 2L, Snapshot.CommitKind.APPEND);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD')", 3L, Snapshot.CommitKind.APPEND);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')", 5L, Snapshot.CommitKind.COMPACT);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')", 6L, Snapshot.CommitKind.APPEND);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')", 7L, Snapshot.CommitKind.APPEND);
        this.assertAutoCompaction("INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')", 9L, Snapshot.CommitKind.COMPACT);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(16);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "CCC"}), Row.of((Object[])new Object[]{4, "DDD"}), Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "CCC"}), Row.of((Object[])new Object[]{4, "DDD"}), Row.of((Object[])new Object[]{5, "EEE"}), Row.of((Object[])new Object[]{6, "FFF"}), Row.of((Object[])new Object[]{7, "HHH"}), Row.of((Object[])new Object[]{8, "III"}), Row.of((Object[])new Object[]{9, "JJJ"}), Row.of((Object[])new Object[]{10, "KKK"}), Row.of((Object[])new Object[]{11, "LLL"}), Row.of((Object[])new Object[]{12, "MMM"})});
    }

    @Test
    public void testRejectDelete() {
        this.testRejectChanges(RowKind.DELETE);
    }

    @Test
    public void testRejectUpdateBefore() {
        this.testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testRejectUpdateAfter() {
        this.testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testComplexType() {
        this.batchSql("INSERT INTO complex_table VALUES (1, CAST(NULL AS MAP<INT, INT>))", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM complex_table", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, null})});
    }

    @Test
    public void testTimestampLzType() {
        this.sql("CREATE TABLE t_table (id INT, data TIMESTAMP_LTZ(3))", new Object[0]);
        this.batchSql("INSERT INTO t_table VALUES (1, TIMESTAMP '2023-02-03 20:20:20')", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM t_table", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, LocalDateTime.parse("2023-02-03T20:20:20").atZone(ZoneId.systemDefault()).toInstant()})});
    }

    @Test
    public void testDynamicOptions() throws Exception {
        this.sql("CREATE TABLE T (id INT)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1)", new Object[0]);
        this.sEnv.getConfig().getConfiguration().setString("paimon.*.*.T." + CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST.toString());
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        this.sql("INSERT INTO T VALUES (2)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{2})});
    }

    @Override
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '1')", "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '1')", "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP<INT, INT>) WITH ('bucket' = '1')");
    }

    private void testRejectChanges(RowKind kind) {
        List<Row> input = Collections.singletonList(Row.ofKind((RowKind)kind, (Object[])new Object[]{1, "AAA"}));
        String id = TestValuesTableFactory.registerData(input);
        this.batchSql("CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        Assertions.assertThatThrownBy(() -> this.batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0])).hasRootCauseInstanceOf(IllegalStateException.class).hasRootCauseMessage("Append only writer can not accept row with RowKind %s", new Object[]{kind});
    }

    private void assertAutoCompaction(String sql, long expectedSnapshotId, Snapshot.CommitKind expectedCommitKind) {
        this.batchSql(sql, new Object[0]);
        Snapshot snapshot = this.findLatestSnapshot("append_table");
        Assertions.assertThat((long)snapshot.id()).isEqualTo(expectedSnapshotId);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)expectedCommitKind);
    }
}

