/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class LookupChangelogWithAggITCase
extends CatalogITCaseBase {
    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testMultipleCompaction(boolean changelogRowDeduplicate) throws Exception {
        int i;
        this.sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'changelog-producer.row-deduplicate'='%s', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", changelogRowDeduplicate);
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 1), (2, 2)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1}), Row.of((Object[])new Object[]{2, 2})});
        for (i = 1; i < 5; ++i) {
            this.sql("INSERT INTO T VALUES (1, 1), (2, 2)", new Object[0]);
            Assertions.assertThat((List)iterator.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, i}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, 2 * i}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, i + 1}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, 2 * (i + 1)})});
        }
        for (i = 5; i < 10; ++i) {
            this.sql("INSERT INTO T VALUES (1, 0), (2, 2)", new Object[0]);
            if (changelogRowDeduplicate) {
                Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, 2 * i}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, 2 * (i + 1)})});
                continue;
            }
            Assertions.assertThat((List)iterator.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, 5}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, 2 * i}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, 5}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, 2 * (i + 1)})});
        }
        iterator.close();
    }

    @Test
    public void testLookupChangelogProducerWithValueSwitch() throws Exception {
        this.sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", new Object[0]);
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 1), (2, 2), (1, 3), (1, 4), (1, 5)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 13}), Row.of((Object[])new Object[]{2, 2})});
        iterator.close();
    }

    @Test
    public void testLookupChangelogProducerWithProjection() {
        this.sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v1 INT, v2 INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'merge-engine'='aggregation', 'fields.v1.aggregate-function'='sum', 'fields.v2.aggregate-function'='sum')", new Object[0]);
        int times = 3 + ThreadLocalRandom.current().nextInt(3);
        for (int i = 0; i < times; ++i) {
            this.sql("INSERT INTO T VALUES (1, 1, 1), (2, 2, 2)", new Object[0]);
        }
        Assertions.assertThat(this.sql("SELECT v2 FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{times}), Row.of((Object[])new Object[]{times * 2})});
    }
}

