/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
import org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction;
import org.apache.flink.table.runtime.operators.join.interval.ProcTimeIntervalJoin;
import org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonIgnoreProperties(ignoreUnknown=true)
public class StreamExecIntervalJoin
extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>,
MultipleTransformationTranslator<RowData> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecIntervalJoin.class);
    public static final String FIELD_NAME_INTERVAL_JOIN_SPEC = "intervalJoinSpec";
    @JsonProperty(value="intervalJoinSpec")
    private final IntervalJoinSpec intervalJoinSpec;

    public StreamExecIntervalJoin(IntervalJoinSpec intervalJoinSpec, InputProperty leftInputProperty, InputProperty rightInputProperty, RowType outputType, String description) {
        this(intervalJoinSpec, StreamExecIntervalJoin.getNewNodeId(), Lists.newArrayList((Object[])new InputProperty[]{leftInputProperty, rightInputProperty}), outputType, description);
    }

    @JsonCreator
    public StreamExecIntervalJoin(@JsonProperty(value="intervalJoinSpec") IntervalJoinSpec intervalJoinSpec, @JsonProperty(value="id") int id, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, inputProperties, (LogicalType)outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 2 ? 1 : 0) != 0);
        this.intervalJoinSpec = (IntervalJoinSpec)Preconditions.checkNotNull((Object)intervalJoinSpec);
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        ExecEdge leftInputEdge = this.getInputEdges().get(0);
        ExecEdge rightInputEdge = this.getInputEdges().get(1);
        RowType leftRowType = (RowType)leftInputEdge.getOutputType();
        RowType rightRowType = (RowType)rightInputEdge.getOutputType();
        Transformation<?> leftInputTransform = leftInputEdge.translateToPlan(planner);
        Transformation<?> rightInputTransform = rightInputEdge.translateToPlan(planner);
        RowType returnType = (RowType)this.getOutputType();
        InternalTypeInfo returnTypeInfo = InternalTypeInfo.of((RowType)returnType);
        JoinSpec joinSpec = this.intervalJoinSpec.getJoinSpec();
        IntervalJoinSpec.WindowBounds windowBounds = this.intervalJoinSpec.getWindowBounds();
        switch (joinSpec.getJoinType()) {
            case INNER: 
            case LEFT: 
            case RIGHT: 
            case FULL: {
                long relativeWindowSize = windowBounds.getLeftUpperBound() - windowBounds.getLeftLowerBound();
                if (relativeWindowSize < 0L) {
                    LOGGER.warn("The relative time interval size " + relativeWindowSize + "is negative, please check the join conditions.");
                    return this.createNegativeWindowSizeJoin(joinSpec, leftInputTransform, rightInputTransform, leftRowType.getFieldCount(), rightRowType.getFieldCount(), (InternalTypeInfo<RowData>)returnTypeInfo);
                }
                GeneratedJoinCondition joinCondition = JoinUtil.generateConditionFunction(planner.getTableConfig(), joinSpec, (LogicalType)leftRowType, (LogicalType)rightRowType);
                IntervalJoinFunction joinFunction = new IntervalJoinFunction(joinCondition, returnTypeInfo, joinSpec.getFilterNulls());
                TwoInputTransformation<RowData, RowData, RowData> transform = windowBounds.isEventTime() ? this.createRowTimeJoin(leftInputTransform, rightInputTransform, (InternalTypeInfo<RowData>)returnTypeInfo, joinFunction, joinSpec, windowBounds) : this.createProcTimeJoin(leftInputTransform, rightInputTransform, (InternalTypeInfo<RowData>)returnTypeInfo, joinFunction, joinSpec, windowBounds);
                if (this.inputsContainSingleton()) {
                    transform.setParallelism(1);
                    transform.setMaxParallelism(1);
                }
                RowDataKeySelector leftSelect = KeySelectorUtil.getRowDataSelector(joinSpec.getLeftKeys(), (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)leftRowType));
                RowDataKeySelector rightSelect = KeySelectorUtil.getRowDataSelector(joinSpec.getRightKeys(), (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)rightRowType));
                transform.setStateKeySelectors((KeySelector)leftSelect, (KeySelector)rightSelect);
                transform.setStateKeyType((TypeInformation)leftSelect.getProducedType());
                return transform;
            }
        }
        throw new TableException("Interval Join: " + joinSpec.getJoinType() + " Join between stream and stream is not supported yet.\nplease re-check interval join statement according to description above.");
    }

    private Transformation<RowData> createNegativeWindowSizeJoin(JoinSpec joinSpec, Transformation<RowData> leftInputTransform, Transformation<RowData> rightInputTransform, int leftArity, int rightArity, InternalTypeInfo<RowData> returnTypeInfo) {
        FilterAllFlatMapFunction allFilter = new FilterAllFlatMapFunction(returnTypeInfo);
        OuterJoinPaddingUtil paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity);
        PaddingLeftMapFunction leftPadder = new PaddingLeftMapFunction(paddingUtil, returnTypeInfo);
        PaddingRightMapFunction rightPadder = new PaddingRightMapFunction(paddingUtil, returnTypeInfo);
        int leftParallelism = leftInputTransform.getParallelism();
        int rightParallelism = rightInputTransform.getParallelism();
        OneInputTransformation filterAllLeftStream = new OneInputTransformation(leftInputTransform, "filter all left input transformation", (OneInputStreamOperator)new StreamFlatMap((FlatMapFunction)allFilter), returnTypeInfo, leftParallelism);
        OneInputTransformation filterAllRightStream = new OneInputTransformation(rightInputTransform, "filter all right input transformation", (OneInputStreamOperator)new StreamFlatMap((FlatMapFunction)allFilter), returnTypeInfo, rightParallelism);
        OneInputTransformation padLeftStream = new OneInputTransformation(leftInputTransform, "pad left input transformation", (OneInputStreamOperator)new StreamMap((MapFunction)leftPadder), returnTypeInfo, leftParallelism);
        OneInputTransformation padRightStream = new OneInputTransformation(rightInputTransform, "pad right input transformation", (OneInputStreamOperator)new StreamMap((MapFunction)rightPadder), returnTypeInfo, rightParallelism);
        switch (joinSpec.getJoinType()) {
            case INNER: {
                return new UnionTransformation((List)Lists.newArrayList((Object[])new Transformation[]{filterAllLeftStream, filterAllRightStream}));
            }
            case LEFT: {
                return new UnionTransformation((List)Lists.newArrayList((Object[])new Transformation[]{padLeftStream, filterAllRightStream}));
            }
            case RIGHT: {
                return new UnionTransformation((List)Lists.newArrayList((Object[])new Transformation[]{filterAllLeftStream, padRightStream}));
            }
            case FULL: {
                return new UnionTransformation((List)Lists.newArrayList((Object[])new Transformation[]{padLeftStream, padRightStream}));
            }
        }
        throw new TableException("should no reach here");
    }

    private TwoInputTransformation<RowData, RowData, RowData> createProcTimeJoin(Transformation<RowData> leftInputTransform, Transformation<RowData> rightInputTransform, InternalTypeInfo<RowData> returnTypeInfo, IntervalJoinFunction joinFunction, JoinSpec joinSpec, IntervalJoinSpec.WindowBounds windowBounds) {
        InternalTypeInfo leftTypeInfo = (InternalTypeInfo)leftInputTransform.getOutputType();
        InternalTypeInfo rightTypeInfo = (InternalTypeInfo)rightInputTransform.getOutputType();
        ProcTimeIntervalJoin procJoinFunc = new ProcTimeIntervalJoin(joinSpec.getJoinType(), windowBounds.getLeftLowerBound(), windowBounds.getLeftUpperBound(), leftTypeInfo, rightTypeInfo, joinFunction);
        return new TwoInputTransformation(leftInputTransform, rightInputTransform, this.getDescription(), (TwoInputStreamOperator)new KeyedCoProcessOperator((KeyedCoProcessFunction)procJoinFunc), returnTypeInfo, leftInputTransform.getParallelism());
    }

    private TwoInputTransformation<RowData, RowData, RowData> createRowTimeJoin(Transformation<RowData> leftInputTransform, Transformation<RowData> rightInputTransform, InternalTypeInfo<RowData> returnTypeInfo, IntervalJoinFunction joinFunction, JoinSpec joinSpec, IntervalJoinSpec.WindowBounds windowBounds) {
        InternalTypeInfo leftTypeInfo = (InternalTypeInfo)leftInputTransform.getOutputType();
        InternalTypeInfo rightTypeInfo = (InternalTypeInfo)rightInputTransform.getOutputType();
        RowTimeIntervalJoin rowJoinFunc = new RowTimeIntervalJoin(joinSpec.getJoinType(), windowBounds.getLeftLowerBound(), windowBounds.getLeftUpperBound(), 0L, leftTypeInfo, rightTypeInfo, joinFunction, windowBounds.getLeftTimeIdx(), windowBounds.getRightTimeIdx());
        return new TwoInputTransformation(leftInputTransform, rightInputTransform, this.getDescription(), (TwoInputStreamOperator)new KeyedCoProcessOperatorWithWatermarkDelay((KeyedCoProcessFunction)rowJoinFunc, rowJoinFunc.getMaxOutputDelay()), returnTypeInfo, leftInputTransform.getParallelism());
    }

    private static class PaddingRightMapFunction
    implements MapFunction<RowData, RowData>,
    ResultTypeQueryable<RowData> {
        private static final long serialVersionUID = 1L;
        private final OuterJoinPaddingUtil paddingUtil;
        private final InternalTypeInfo<RowData> outputTypeInfo;

        public PaddingRightMapFunction(OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> returnType) {
            this.paddingUtil = paddingUtil;
            this.outputTypeInfo = returnType;
        }

        public RowData map(RowData value) {
            return this.paddingUtil.padRight(value);
        }

        public TypeInformation<RowData> getProducedType() {
            return this.outputTypeInfo;
        }
    }

    private static class PaddingLeftMapFunction
    implements MapFunction<RowData, RowData>,
    ResultTypeQueryable<RowData> {
        private static final long serialVersionUID = 1L;
        private final OuterJoinPaddingUtil paddingUtil;
        private final InternalTypeInfo<RowData> outputTypeInfo;

        public PaddingLeftMapFunction(OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> returnType) {
            this.paddingUtil = paddingUtil;
            this.outputTypeInfo = returnType;
        }

        public RowData map(RowData value) {
            return this.paddingUtil.padLeft(value);
        }

        public TypeInformation<RowData> getProducedType() {
            return this.outputTypeInfo;
        }
    }

    private static class FilterAllFlatMapFunction
    implements FlatMapFunction<RowData, RowData>,
    ResultTypeQueryable<RowData> {
        private static final long serialVersionUID = 1L;
        private final InternalTypeInfo<RowData> outputTypeInfo;

        public FilterAllFlatMapFunction(InternalTypeInfo<RowData> inputTypeInfo) {
            this.outputTypeInfo = inputTypeInfo;
        }

        public void flatMap(RowData value, Collector<RowData> out) {
        }

        public TypeInformation<RowData> getProducedType() {
            return this.outputTypeInfo;
        }
    }
}

