/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.Map;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.functions.FlinkSideInputReader;
import org.apache.beam.runners.flink.translation.functions.HashingFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.util.Collector;

public class FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
    private final CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
    private final SerializablePipelineOptions serializedOptions;

    public FlinkMergingNonShuffleReduceFunction(CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn, WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) {
        this.combineFn = combineFn;
        this.windowingStrategy = windowingStrategy;
        this.sideInputs = sideInputs;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
    }

    public void reduce(Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
        PipelineOptions options = this.serializedOptions.get();
        FlinkSideInputReader sideInputReader = new FlinkSideInputReader(this.sideInputs, this.getRuntimeContext());
        AbstractFlinkCombineRunner reduceRunner = this.windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()) ? new SortingFlinkCombineRunner() : new HashingFlinkCombineRunner();
        reduceRunner.combine(new AbstractFlinkCombineRunner.CompleteFlinkCombiner(this.combineFn), this.windowingStrategy, sideInputReader, options, elements, out);
    }
}

