/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.utils.MultiTablesCompactorUtil;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.Preconditions;

public class MultiTablesReadOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<Tuple2<Split, String>, RowData> {
    private static final long serialVersionUID = 1L;
    private final Catalog.Loader catalogLoader;
    private final boolean isStreaming;
    private transient Catalog catalog;
    private transient IOManager ioManager;
    private transient Map<Identifier, BucketsTable> tablesMap;
    private transient Map<Identifier, TableRead> readsMap;
    private transient StreamRecord<RowData> reuseRecord;
    private transient FlinkRowData reuseRow;

    public MultiTablesReadOperator(Catalog.Loader catalogLoader, boolean isStreaming) {
        this.catalogLoader = catalogLoader;
        this.isStreaming = isStreaming;
    }

    public void open() throws Exception {
        super.open();
        this.ioManager = IOManager.create(this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.tablesMap = new HashMap<Identifier, BucketsTable>();
        this.readsMap = new HashMap<Identifier, TableRead>();
        this.catalog = this.catalogLoader.load();
        this.reuseRow = new FlinkRowData(null);
        this.reuseRecord = new StreamRecord((Object)this.reuseRow);
    }

    public void processElement(StreamRecord<Tuple2<Split, String>> record) throws Exception {
        Identifier identifier = Identifier.fromString((String)((Tuple2)record.getValue()).f1);
        TableRead read = this.getTableRead(identifier);
        try (CloseableIterator<InternalRow> iterator = read.createReader((Split)((Tuple2)record.getValue()).f0).toCloseableIterator();){
            while (iterator.hasNext()) {
                this.reuseRow.replace((InternalRow)iterator.next());
                this.output.collect(this.reuseRecord);
            }
        }
    }

    private TableRead getTableRead(Identifier tableId) {
        Table table = this.tablesMap.get(tableId);
        if (table == null) {
            try {
                Table newTable = this.catalog.getTable(tableId);
                Preconditions.checkArgument(newTable instanceof FileStoreTable, "Only FileStoreTable supports compact action. The table type is '%s'.", newTable.getClass().getName());
                table = new BucketsTable((FileStoreTable)newTable, this.isStreaming, tableId.getDatabaseName()).copy((Map)MultiTablesCompactorUtil.compactOptions(this.isStreaming));
                this.tablesMap.put(tableId, (BucketsTable)table);
                this.readsMap.put(tableId, table.newReadBuilder().newRead().withIOManager(this.ioManager));
            }
            catch (Catalog.TableNotExistException e) {
                LOG.error(String.format("table: %s not found.", tableId.getFullName()));
            }
        }
        return this.readsMap.get(tableId);
    }

    public void close() throws Exception {
        super.close();
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }
}

