/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.MultiTablesCompactorUtil;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.system.BucketsTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MultiTablesCompactorSourceFunction
extends RichSourceFunction<Tuple2<Split, String>> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MultiTablesCompactorSourceFunction.class);
    protected final Catalog.Loader catalogLoader;
    protected final Pattern includingPattern;
    protected final Pattern excludingPattern;
    protected final Pattern databasePattern;
    protected final boolean isStreaming;
    protected final long monitorInterval;
    protected transient Catalog catalog;
    protected transient Map<Identifier, BucketsTable> tablesMap;
    protected transient Map<Identifier, StreamTableScan> scansMap;
    protected volatile boolean isRunning = true;
    protected transient SourceFunction.SourceContext<Tuple2<Split, String>> ctx;

    public MultiTablesCompactorSourceFunction(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, boolean isStreaming, long monitorInterval) {
        this.catalogLoader = catalogLoader;
        this.includingPattern = includingPattern;
        this.excludingPattern = excludingPattern;
        this.databasePattern = databasePattern;
        this.isStreaming = isStreaming;
        this.monitorInterval = monitorInterval;
    }

    public void open(Configuration parameters) throws Exception {
        this.tablesMap = new HashMap<Identifier, BucketsTable>();
        this.scansMap = new HashMap<Identifier, StreamTableScan>();
        this.catalog = this.catalogLoader.load();
        this.updateTableMap();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.ctx != null) {
            Object object = this.ctx.getCheckpointLock();
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
    }

    protected void updateTableMap() throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
        List<String> databases = this.catalog.listDatabases();
        for (String databaseName : databases) {
            if (!this.databasePattern.matcher(databaseName).matches()) continue;
            List<String> tables = this.catalog.listTables(databaseName);
            for (String tableName : tables) {
                Identifier identifier = Identifier.create(databaseName, tableName);
                if (!MultiTablesCompactorUtil.shouldCompactTable(identifier, this.includingPattern, this.excludingPattern) || this.tablesMap.containsKey(identifier)) continue;
                Table table = this.catalog.getTable(identifier);
                if (!(table instanceof FileStoreTable)) {
                    LOG.error(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", table.getClass().getName()));
                    continue;
                }
                FileStoreTable fileStoreTable = (FileStoreTable)table;
                if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
                    LOG.info(String.format("the bucket mode of %s is unware. ", identifier.getFullName()) + "currently, the table with unware bucket mode is not support in combined mode.");
                    continue;
                }
                Table bucketsTable = new BucketsTable(fileStoreTable, this.isStreaming, identifier.getDatabaseName()).copy((Map)MultiTablesCompactorUtil.compactOptions(this.isStreaming));
                this.tablesMap.put(identifier, (BucketsTable)bucketsTable);
                this.scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
            }
        }
    }
}

