/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl.extension.join.operators;

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.RuntimeContext;
import org.apache.flink.datastream.api.extension.join.JoinType;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction;
import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

public class TwoInputNonBroadcastJoinProcessOperator<K, IN1, IN2, OUT>
extends KeyedTwoInputNonBroadcastProcessOperator<K, IN1, IN2, OUT> {
    private final TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT> joinProcessFunction;
    private final ListStateDescriptor<IN1> leftStateDescriptor;
    private final ListStateDescriptor<IN2> rightStateDescriptor;
    private transient ListState<IN1> leftState;
    private transient ListState<IN2> rightState;

    public TwoInputNonBroadcastJoinProcessOperator(TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> userFunction, ListStateDescriptor<IN1> leftStateDescriptor, ListStateDescriptor<IN2> rightStateDescriptor) {
        super(userFunction);
        this.joinProcessFunction = (TwoInputNonBroadcastJoinProcessFunction)userFunction;
        Preconditions.checkArgument((this.joinProcessFunction.getJoinType() == JoinType.INNER ? 1 : 0) != 0, (Object)"Currently only support INNER join.");
        this.leftStateDescriptor = leftStateDescriptor;
        this.rightStateDescriptor = rightStateDescriptor;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.leftState = (ListState)this.getOrCreateKeyedState(VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)this.leftStateDescriptor);
        this.rightState = (ListState)this.getOrCreateKeyedState(VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)this.rightStateDescriptor);
    }

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        this.collector.setTimestampFromStreamRecord(element);
        Object leftRecord = element.getValue();
        Iterable rightRecords = (Iterable)this.rightState.get();
        if (rightRecords != null) {
            for (Object rightRecord : rightRecords) {
                this.joinProcessFunction.getJoinFunction().processRecord(leftRecord, rightRecord, (Collector)this.collector, (RuntimeContext)this.partitionedContext);
            }
        }
        this.leftState.add(leftRecord);
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        this.collector.setTimestampFromStreamRecord(element);
        Iterable leftRecords = (Iterable)this.leftState.get();
        Object rightRecord = element.getValue();
        if (leftRecords != null) {
            for (Object leftRecord : (Iterable)this.leftState.get()) {
                this.joinProcessFunction.getJoinFunction().processRecord(leftRecord, rightRecord, (Collector)this.collector, (RuntimeContext)this.partitionedContext);
            }
        }
        this.rightState.add(rightRecord);
    }
}

