/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.service;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.service.network.NetworkUtils;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializationUtils;

public class QueryExecutorOperator
extends AbstractStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow> {
    private static final long serialVersionUID = 1L;
    private final Table table;
    private transient LocalTableQuery query;
    private transient IOManager ioManager;

    public QueryExecutorOperator(Table table) {
        this.table = table;
    }

    public static RowType outputType() {
        return RowType.of(DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT());
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.ioManager = IOManager.create(this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.query = ((FileStoreTable)this.table).newLocalTableQuery().withIOManager(this.ioManager);
        KvQueryServer server = new KvQueryServer(this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getNumberOfParallelSubtasks(), NetworkUtils.findHostAddress(), Collections.singletonList(0).iterator(), 1, 1, this.query, new DisabledServiceRequestStats());
        try {
            server.start();
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
        InetSocketAddress address = server.getServerAddress();
        this.output.collect((Object)new StreamRecord((Object)GenericRow.of(this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask(), BinaryString.fromString(address.getHostName()), address.getPort())));
    }

    public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception {
        InternalRow row = (InternalRow)streamRecord.getValue();
        BinaryRow partition = SerializationUtils.deserializeBinaryRow(row.getBinary(1));
        int bucket = row.getInt(2);
        DataFileMetaSerializer fileMetaSerializer = new DataFileMetaSerializer();
        List<DataFileMeta> beforeFiles = fileMetaSerializer.deserializeList(row.getBinary(3));
        List<DataFileMeta> dataFiles = fileMetaSerializer.deserializeList(row.getBinary(4));
        this.query.refreshFiles(partition, bucket, beforeFiles, dataFiles);
    }

    public void close() throws Exception {
        super.close();
        if (this.query != null) {
            this.query.close();
        }
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }
}

