/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.LinkedHashMap;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public abstract class CommonExecPythonCorrelate
extends ExecNodeBase<RowData>
implements SingleTransformationTranslator<RowData> {
    private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.table.RowDataPythonTableFunctionOperator";
    private final FlinkJoinType joinType;
    private final RexCall invocation;

    public CommonExecPythonCorrelate(FlinkJoinType joinType, RexCall invocation, RexNode condition, InputProperty inputProperty, RowType outputType, String description) {
        super(Collections.singletonList(inputProperty), (LogicalType)outputType, description);
        this.joinType = joinType;
        this.invocation = invocation;
        if (joinType == FlinkJoinType.LEFT && condition != null) {
            throw new TableException("Currently Python correlate does not support conditions in left join.");
        }
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        Configuration config = CommonPythonUtil.getMergedConfig(planner.getExecEnv(), planner.getTableConfig());
        OneInputTransformation<RowData, RowData> transform = this.createPythonOneInputTransformation(inputTransform, config);
        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) {
            transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        }
        return transform;
    }

    private OneInputTransformation<RowData, RowData> createPythonOneInputTransformation(Transformation<RowData> inputTransform, Configuration config) {
        Tuple2<int[], PythonFunctionInfo> extractResult = this.extractPythonTableFunctionInfo();
        int[] pythonUdtfInputOffsets = (int[])extractResult.f0;
        PythonFunctionInfo pythonFunctionInfo = (PythonFunctionInfo)extractResult.f1;
        InternalTypeInfo pythonOperatorInputRowType = (InternalTypeInfo)inputTransform.getOutputType();
        InternalTypeInfo pythonOperatorOutputRowType = InternalTypeInfo.of((RowType)((RowType)this.getOutputType()));
        OneInputStreamOperator<RowData, RowData> pythonOperator = this.getPythonTableFunctionOperator(config, (InternalTypeInfo<RowData>)pythonOperatorInputRowType, (InternalTypeInfo<RowData>)pythonOperatorOutputRowType, pythonFunctionInfo, pythonUdtfInputOffsets);
        return new OneInputTransformation(inputTransform, this.getDescription(), pythonOperator, (TypeInformation)pythonOperatorOutputRowType, inputTransform.getParallelism());
    }

    private Tuple2<int[], PythonFunctionInfo> extractPythonTableFunctionInfo() {
        LinkedHashMap<RexNode, Integer> inputNodes = new LinkedHashMap<RexNode, Integer>();
        PythonFunctionInfo pythonTableFunctionInfo = CommonPythonUtil.createPythonFunctionInfo(this.invocation, inputNodes);
        int[] udtfInputOffsets = inputNodes.keySet().stream().filter(x -> x instanceof RexInputRef).map(x -> ((RexInputRef)x).getIndex()).mapToInt(i -> i).toArray();
        return Tuple2.of((Object)udtfInputOffsets, (Object)pythonTableFunctionInfo);
    }

    private OneInputStreamOperator<RowData, RowData> getPythonTableFunctionOperator(Configuration config, InternalTypeInfo<RowData> inputRowType, InternalTypeInfo<RowData> outputRowType, PythonFunctionInfo pythonFunctionInfo, int[] udtfInputOffsets) {
        Class clazz = CommonPythonUtil.loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME);
        try {
            Constructor ctor = clazz.getConstructor(Configuration.class, PythonFunctionInfo.class, RowType.class, RowType.class, int[].class, FlinkJoinType.class);
            return (OneInputStreamOperator)ctor.newInstance(config, pythonFunctionInfo, inputRowType.toRowType(), outputRowType.toRowType(), udtfInputOffsets, this.joinType);
        }
        catch (Exception e) {
            throw new TableException("Python Table Function Operator constructed failed.", (Throwable)e);
        }
    }
}

