/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FileStoreLookupFunctionTest {
    private static final Random RANDOM = new Random();
    @TempDir
    private java.nio.file.Path tempDir;
    private final String commitUser = UUID.randomUUID().toString();
    private final TraceableFileIO fileIO = new TraceableFileIO();
    private Path tablePath;
    private FileStoreLookupFunction lookupFunction;
    private FileStoreTable table;

    @BeforeEach
    public void before() throws Exception {
        this.tablePath = new Path(this.tempDir.toString());
    }

    private void createLookupFunction() throws Exception {
        this.createLookupFunction(true, false);
    }

    private void createLookupFunction(boolean isPartition, boolean joinEqualPk) throws Exception {
        this.createLookupFunction(isPartition, joinEqualPk, false);
    }

    private void createLookupFunction(boolean isPartition, boolean joinEqualPk, boolean dynamicPartition) throws Exception {
        int[] nArray;
        SchemaManager schemaManager = new SchemaManager((FileIO)this.fileIO, this.tablePath);
        Options conf = new Options();
        conf.set(CoreOptions.BUCKET, (Object)2);
        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, (Object)3);
        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, (Object)2);
        conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, (Object)Duration.ofSeconds(1L));
        if (dynamicPartition) {
            conf.set(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION, (Object)"max_pt()");
        }
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"pt", "k", "v"});
        Schema schema = new Schema(rowType.getFields(), isPartition ? Collections.singletonList("pt") : Collections.emptyList(), Arrays.asList("pt", "k"), conf.toMap(), "");
        TableSchema tableSchema = schemaManager.createTable(schema);
        this.table = FileStoreTableFactory.create((FileIO)this.fileIO, (Path)new Path(this.tempDir.toString()), (TableSchema)tableSchema);
        int[] nArray2 = new int[]{0, 1};
        if (joinEqualPk) {
            int[] nArray3 = new int[2];
            nArray3[0] = 0;
            nArray = nArray3;
            nArray3[1] = 1;
        } else {
            int[] nArray4 = new int[1];
            nArray = nArray4;
            nArray4[0] = 1;
        }
        this.lookupFunction = new FileStoreLookupFunction((Table)this.table, nArray2, nArray, null);
        this.lookupFunction.open(this.tempDir.toString());
    }

    @AfterEach
    public void close() throws Exception {
        if (this.lookupFunction != null) {
            this.lookupFunction.close();
        }
    }

    @Test
    public void testDefaultLocalPartial() throws Exception {
        this.createLookupFunction(false, true);
        Assertions.assertThat((Object)this.lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
        PrimaryKeyPartialLookupTable.QueryExecutor queryExecutor = ((PrimaryKeyPartialLookupTable)this.lookupFunction.lookupTable()).queryExecutor();
        Assertions.assertThat((Object)queryExecutor).isInstanceOf(PrimaryKeyPartialLookupTable.LocalQueryExecutor.class);
    }

    @Test
    public void testDefaultRemotePartial() throws Exception {
        this.createLookupFunction(false, true);
        ServiceManager serviceManager = new ServiceManager((FileIO)this.fileIO, this.tablePath);
        serviceManager.resetService("primary-key-lookup", new InetSocketAddress[]{new InetSocketAddress(1)});
        this.lookupFunction.open(this.tempDir.toString());
        Assertions.assertThat((Object)this.lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
        PrimaryKeyPartialLookupTable.QueryExecutor queryExecutor = ((PrimaryKeyPartialLookupTable)this.lookupFunction.lookupTable()).queryExecutor();
        Assertions.assertThat((Object)queryExecutor).isInstanceOf(PrimaryKeyPartialLookupTable.RemoteQueryExecutor.class);
    }

    @Test
    public void testLookupScanLeak() throws Exception {
        this.createLookupFunction();
        this.commit(this.writeCommit(1));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
        this.commit(this.writeCommit(10));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
    }

    @Test
    public void testLookupExpiredSnapshot() throws Exception {
        this.createLookupFunction();
        this.commit(this.writeCommit(1));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        this.commit(this.writeCommit(2));
        this.commit(this.writeCommit(3));
        this.commit(this.writeCommit(4));
        this.commit(this.writeCommit(5));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
    }

    @Test
    public void testLookupDynamicPartition() throws Exception {
        this.createLookupFunction(true, false, true);
        this.commit(this.writeCommit(1));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
        this.commit(this.writeCommit(10));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
    }

    private void commit(List<CommitMessage> messages) throws Exception {
        TableCommitImpl commit = this.table.newCommit(this.commitUser);
        commit.commit(messages);
        commit.close();
    }

    private List<CommitMessage> writeCommit(int number) throws Exception {
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        StreamTableWrite writer = this.table.newStreamWriteBuilder().newWrite();
        for (int i = 0; i < number; ++i) {
            writer.write(this.randomRow());
            messages.addAll(writer.prepareCommit(true, (long)i));
        }
        return messages;
    }

    private InternalRow randomRow() {
        return GenericRow.of((Object[])new Object[]{RANDOM.nextInt(100), RANDOM.nextInt(100), RANDOM.nextLong()});
    }
}

