/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class CompositePkAndMultiPartitionedTableWithKafkaLogITCase
extends KafkaTableTestBase {
    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init((String)this.getTempDirPath());
    }

    @Test
    public void testStreamingReadWriteMultiPartitionedRecordsWithMultiPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Yen", "US Dollar", 0.0081, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.76, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Chinese Yuan", "Yen", 25.6, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.1, "2022-01-02", "20"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), initialRecords, (String)"dt:2022-01-02,hh:20;dt:2022-01-02,hh:21", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.checkFileStorePath((String)table, Arrays.asList("dt=2022-01-02,hh=20", "dt=2022-01-02,hh=21"));
        BlockingIterator streamingItr = ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildSimpleQuery((String)table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0081, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.76, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 25.6, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.1, "2022-01-02", "20"})));
        ReadWriteTableTestUtil.insertInto((String)table, (String[])new String[]{"('Chinese Yuan', 'HK Dollar', 1.231, '2022-01-03', '15')"});
        ReadWriteTableTestUtil.validateStreamingReadResult((BlockingIterator)streamingItr, Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "HK Dollar", 1.231, "2022-01-03", "15"})));
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("INSERT OVERWRITE `%s` SELECT 'US Dollar', 'US Dollar', 1, '2022-04-02', '10' FROM `%s`", table, table)).await();
        ReadWriteTableTestUtil.checkFileStorePath((String)table, Collections.singletonList("dt=2022-04-02,hh=10"));
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildSimpleQuery((String)table), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-04-02", "10"})));
        ReadWriteTableTestUtil.assertNoMoreRecords((BlockingIterator)streamingItr);
        streamingItr.close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE dt = '2022-01-02' AND from_currency = 'US Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}))).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"from_currency, to_currency", (String)"WHERE dt = '2022-01-02' AND from_currency = 'US Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen"}))).close();
    }

    @Test
    public void testStreamingReadWriteSinglePartitionedRecordsWithMultiPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0081, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", "US Dollar", 1.11, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46, "2022-01-02"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), initialRecords, (String)"dt:2022-01-01;dt:2022-01-02", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.checkFileStorePath((String)table, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        BlockingIterator streamingItr = ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildSimpleQuery((String)table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0081, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", 1.11, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46, "2022-01-02"})));
        ReadWriteTableTestUtil.insertIntoPartition((String)table, (String)"PARTITION (dt = '2022-01-03')", (String[])new String[]{"('Chinese Yuan', 'HK Dollar', 1.231)"});
        ReadWriteTableTestUtil.validateStreamingReadResult((BlockingIterator)streamingItr, Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "HK Dollar", 1.231, "2022-01-03"})));
        streamingItr.close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE dt = '2022-01-02' AND from_currency = 'US Dollar'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"from_currency, to_currency", (String)"WHERE dt = '2022-01-01' AND rate_by_to_currency IS NULL"), Collections.emptyList()).close();
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithMultiPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Yen", "US Dollar", 0.0081, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.76, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Chinese Yuan", "Yen", 25.6, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.1, "2022-01-02", "20"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), initialRecords, (String)"dt:2022-01-02,hh:20;dt:2022-01-02,hh:21", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ArrayList<Row> expectedRecords = new ArrayList<Row>(initialRecords);
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-02", "20"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74, "2022-01-02", "20"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02", "20"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02", "20"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02", "20"}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), expectedRecords).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE dt = '2022-01-02' AND hh = '21'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate_by_to_currency IS NOT NULL AND from_currency = 'US Dollar'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE hh = '21' AND from_currency = 'US Dollar'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen", 122.46, "2022-01-02", "21"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"from_currency, to_currency", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"from_currency, to_currency", (String)"WHERE rate_by_to_currency > 100", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfSinglePartitionedRecordsWithMultiPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", "US Dollar", 0.0082, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0081, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", "US Dollar", 1.11, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46, "2022-01-02"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), initialRecords, (String)"dt:2022-01-01;dt:2022-01-02", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ArrayList<Row> expectedRecords = new ArrayList<Row>(initialRecords);
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-01"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02"}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), expectedRecords).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE dt = '2022-01-02'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32, "2022-01-02"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate_by_to_currency IS NULL", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE dt = '2022-01-02' AND from_currency = 'Yen'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0, "2022-01-02"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"from_currency, to_currency", (String)"WHERE rate_by_to_currency > 100", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithMultiPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "US Dollar", 1.11}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", "US Dollar", 1.12}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0082}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Yen", "US Dollar", 0.0081}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"US Dollar", "US Dollar", 1.0}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", "US Dollar", 0.0081}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.69}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "Yen", 1.0}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Chinese Yuan", "Yen", 19.25}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", "Yen", 1.0}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.0}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), initialRecords, null, (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), true);
        ArrayList<Row> expectedRecords = new ArrayList<Row>(initialRecords);
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Yen", "US Dollar", 0.0082}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", "US Dollar", 1.11}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen", 90.32}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), expectedRecords).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate_by_to_currency < 1 OR rate_by_to_currency > 100", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar", 0.13}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar", 0.0082}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Yen", "US Dollar", 0.0082}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar", 0.74}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Yen", "US Dollar", 0.0081}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", "US Dollar", 0.0081}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro", 0.9}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.67}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Euro", 0.69}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.46}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen", 122.0}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"from_currency, to_currency", (String)"WHERE rate_by_to_currency < 1 OR rate_by_to_currency > 100", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Singapore Dollar", "Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithOnePk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02", "23"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L, "2022-01-02", "23"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), initialRecords, (String)"dt:2022-01-01,hh:15;dt:2022-01-02,hh:23", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ArrayList<Row> expectedRecords = new ArrayList<Row>(initialRecords);
        expectedRecords.remove(TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L, "2022-01-02", "23"}));
        expectedRecords.add(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L, "2022-01-01", "15"}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), expectedRecords).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE dt >= '2022-01-02'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02", "23"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate = 1", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01", "15"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"currency", (String)"WHERE rate = 1", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen"}))).close();
    }
}

