/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.PactDriver;
import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

public class GroupReduceCombineDriver<T>
implements PactDriver<FlatCombineFunction<T>, T> {
    private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
    private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
    private CloseableInputProvider<T> input;
    private TypeSerializerFactory<T> serializerFactory;
    private TypeComparator<T> comparator;
    private volatile boolean running;

    @Override
    public void setup(PactTaskContext<FlatCombineFunction<T>, T> context) {
        this.taskContext = context;
        this.running = true;
    }

    @Override
    public int getNumberOfInputs() {
        return 1;
    }

    @Override
    public Class<FlatCombineFunction<T>> getStubType() {
        Class<FlatCombineFunction> clazz = FlatCombineFunction.class;
        return clazz;
    }

    @Override
    public boolean requiresComparatorOnInput() {
        return true;
    }

    @Override
    public void prepare() throws Exception {
        TaskConfig config = this.taskContext.getTaskConfig();
        DriverStrategy ls = config.getDriverStrategy();
        MemoryManager memoryManager = this.taskContext.getMemoryManager();
        MutableObjectIterator in = this.taskContext.getInput(0);
        this.serializerFactory = this.taskContext.getInputSerializer(0);
        this.comparator = this.taskContext.getInputComparator(0);
        switch (ls) {
            case SORTED_GROUP_COMBINE: {
                this.input = new AsynchronousPartialSorter(memoryManager, in, this.taskContext.getOwningNepheleTask(), this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
                break;
            }
            default: {
                throw new RuntimeException("Invalid local strategy provided for CombineTask.");
            }
        }
    }

    @Override
    public void run() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
        }
        KeyGroupedIterator<T> iter = new KeyGroupedIterator<T>(this.input.getIterator(), this.serializerFactory.getSerializer(), this.comparator);
        FlatCombineFunction<T> stub = this.taskContext.getStub();
        Collector<T> output = this.taskContext.getOutputCollector();
        while (this.running && iter.nextKey()) {
            stub.combine((Iterable)iter.getValues(), output);
        }
    }

    @Override
    public void cleanup() throws Exception {
        if (this.input != null) {
            this.input.close();
            this.input = null;
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

