/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.RecordReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;

public class DataSinkTask<IT>
extends AbstractInvokable {
    public static final String DEGREE_OF_PARALLELISM_KEY = "sink.dop";
    private static final Log LOG = LogFactory.getLog(DataSinkTask.class);
    private volatile OutputFormat<IT> format;
    private MutableObjectIterator<IT> reader;
    private MutableObjectIterator<IT> input;
    private TypeSerializerFactory<IT> inputTypeSerializerFactory;
    private CloseableInputProvider<IT> localStrategy;
    private TaskConfig config;
    private ClassLoader userCodeClassLoader;
    private volatile boolean taskCanceled;

    @Override
    public void registerInputOutput() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Start registering input and output"));
        }
        this.initOutputFormat();
        try {
            this.initInputReaders();
        }
        catch (Exception e) {
            throw new RuntimeException("Initializing the input streams failed" + e.getMessage() == null ? "." : ": " + e.getMessage(), e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Finished registering input and output"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void invoke() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Starting data sink operator"));
        }
        try {
            switch (this.config.getInputLocalStrategy(0)) {
                case NONE: {
                    this.localStrategy = null;
                    this.input = this.reader;
                    break;
                }
                case SORT: {
                    try {
                        TypeComparatorFactory compFact = this.config.getInputComparator(0, this.userCodeClassLoader);
                        if (compFact == null) {
                            throw new Exception("Missing comparator factory for local strategy on input 0");
                        }
                        UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>(this.getEnvironment().getMemoryManager(), this.getEnvironment().getIOManager(), this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(), this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0), this.config.getSpillingThresholdInput(0));
                        this.localStrategy = sorter;
                        this.input = sorter.getIterator();
                        break;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Initializing the input processing failed" + e.getMessage() == null ? "." : ": " + e.getMessage(), e);
                    }
                }
                default: {
                    throw new RuntimeException("Invalid local strategy for DataSinkTask");
                }
            }
            TypeSerializer serializer = this.inputTypeSerializerFactory.getSerializer();
            MutableObjectIterator<IT> input = this.input;
            OutputFormat<IT> format = this.format;
            Object record = serializer.createInstance();
            if (this.taskCanceled) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)this.getLogString("Starting to produce output"));
            }
            format.open(this.getEnvironment().getIndexInSubtaskGroup(), this.getEnvironment().getCurrentNumberOfSubtasks());
            while (!this.taskCanceled && (record = input.next(record)) != null) {
                format.writeRecord(record);
            }
            if (!this.taskCanceled) {
                this.format.close();
                this.format = null;
            }
        }
        catch (Exception ex) {
            ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
            if (ex instanceof CancelTaskException) {
                throw ex;
            }
            if (!this.taskCanceled) {
                if (LOG.isErrorEnabled()) {
                    LOG.error((Object)this.getLogString("Error in user code: " + ex.getMessage()), (Throwable)ex);
                }
                throw ex;
            }
        }
        finally {
            block44: {
                if (this.format != null) {
                    try {
                        this.format.close();
                    }
                    catch (Throwable t) {
                        if (!LOG.isWarnEnabled()) break block44;
                        LOG.warn((Object)this.getLogString("Error closing the ouput format."), t);
                    }
                }
            }
            if (this.localStrategy != null) {
                try {
                    this.localStrategy.close();
                }
                catch (Throwable t) {
                    LOG.error((Object)"Error closing local strategy", t);
                }
            }
        }
        if (!this.taskCanceled) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)this.getLogString("Finished data sink operator"));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Data sink operator cancelled"));
        }
    }

    @Override
    public void cancel() throws Exception {
        this.taskCanceled = true;
        OutputFormat<IT> format = this.format;
        if (format != null) {
            try {
                this.format.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Cancelling data sink operator"));
        }
    }

    public void setUserCodeClassLoader(ClassLoader cl) {
        this.userCodeClassLoader = cl;
    }

    private void initOutputFormat() {
        if (this.userCodeClassLoader == null) {
            try {
                this.userCodeClassLoader = LibraryCacheManager.getClassLoader(this.getEnvironment().getJobID());
            }
            catch (IOException ioe) {
                throw new RuntimeException("Library cache manager could not be instantiated.", ioe);
            }
        }
        Configuration taskConf = this.getTaskConfiguration();
        taskConf.setClassLoader(this.userCodeClassLoader);
        this.config = new TaskConfig(taskConf);
        try {
            this.format = (OutputFormat)this.config.getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
            if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + OutputFormat.class.getName() + "' as is required.");
            }
        }
        catch (ClassCastException ccex) {
            throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
        }
        try {
            this.format.configure(this.config.getStubParameters());
        }
        catch (Throwable t) {
            throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: " + t.getMessage(), t);
        }
    }

    private void initInputReaders() throws Exception {
        MutableRecordReader<Record> reader;
        MutableRecordReader<Record> inputReader;
        int numGates = 0;
        int groupSize = this.config.getGroupSize(0);
        numGates += groupSize;
        if (groupSize == 1) {
            inputReader = new MutableRecordReader(this);
        } else if (groupSize > 1) {
            MutableRecordReader[] readers = new MutableRecordReader[groupSize];
            for (int j = 0; j < groupSize; ++j) {
                readers[j] = new MutableRecordReader(this);
            }
            inputReader = new MutableUnionRecordReader(readers);
        } else {
            throw new Exception("Illegal input group size in task configuration: " + groupSize);
        }
        this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
        if (this.inputTypeSerializerFactory.getDataType() == Record.class) {
            reader = inputReader;
            this.reader = new RecordReaderIterator(reader);
        } else {
            reader = inputReader;
            ReaderIterator<IT> iter = new ReaderIterator<IT>(reader, this.inputTypeSerializerFactory.getSerializer());
            this.reader = iter;
        }
        if (numGates != this.config.getNumInputs()) {
            throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
        }
    }

    private String getLogString(String message) {
        return RegularPactTask.constructLogString(message, this.getEnvironment().getTaskName(), this);
    }
}

