/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.flink.FlinkDetachedRunnerResult;
import org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunnerResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkRunner
extends PipelineRunner<PipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
    private final FlinkPipelineOptions options;
    Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;

    public static FlinkRunner fromOptions(PipelineOptions options) {
        FlinkPipelineOptions flinkOptions = (FlinkPipelineOptions)PipelineOptionsValidator.validate(FlinkPipelineOptions.class, (PipelineOptions)options);
        ArrayList<String> missing = new ArrayList<String>();
        if (flinkOptions.getAppName() == null) {
            missing.add("appName");
        }
        if (missing.size() > 0) {
            throw new IllegalArgumentException("Missing required values: " + Joiner.on((char)',').join(missing));
        }
        if (flinkOptions.getFilesToStage() == null) {
            flinkOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage((ClassLoader)FlinkRunner.class.getClassLoader()));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", (Object)flinkOptions.getFilesToStage().size());
            LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
        }
        return new FlinkRunner(flinkOptions);
    }

    private FlinkRunner(FlinkPipelineOptions options) {
        this.options = options;
        this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet();
    }

    public PipelineResult run(Pipeline pipeline) {
        JobExecutionResult result;
        this.logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
        MetricsEnvironment.setMetricsSupported((boolean)true);
        LOG.info("Executing pipeline using FlinkRunner.");
        FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(this.options);
        LOG.info("Translating pipeline to Flink program.");
        env.translate(pipeline);
        try {
            LOG.info("Starting execution of Flink program.");
            result = env.executePipeline();
        }
        catch (Exception e) {
            LOG.error("Pipeline execution failed", (Throwable)e);
            throw new RuntimeException("Pipeline execution failed", e);
        }
        return FlinkRunner.createPipelineResult(result, this.options);
    }

    static PipelineResult createPipelineResult(JobExecutionResult result, PipelineOptions options) {
        if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
            LOG.info("Pipeline submitted in Detached mode");
            return new FlinkDetachedRunnerResult();
        }
        LOG.info("Execution finished in {} msecs", (Object)result.getNetRuntime());
        Map accumulators = result.getAllAccumulatorResults();
        if (accumulators != null && !accumulators.isEmpty()) {
            LOG.info("Final accumulator values:");
            for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) {
                LOG.info("{} : {}", entry.getKey(), entry.getValue());
            }
        }
        FlinkRunnerResult flinkRunnerResult = new FlinkRunnerResult(accumulators, result.getNetRuntime());
        MetricsPusher metricsPusher = new MetricsPusher(flinkRunnerResult.getMetricsContainerStepMap(), (MetricsOptions)options.as(MetricsOptions.class), (PipelineResult)flinkRunnerResult);
        metricsPusher.start();
        return flinkRunnerResult;
    }

    @VisibleForTesting
    public FlinkPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public String toString() {
        return "FlinkRunner#" + ((Object)((Object)this)).hashCode();
    }

    void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
        this.ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
    }

    private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
        if (!this.ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
            final TreeSet ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet();
            pipeline.traverseTopologically((Pipeline.PipelineVisitor)new Pipeline.PipelineVisitor.Defaults(){

                public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                    if (FlinkRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                        ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                    }
                }

                public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
                    if (FlinkRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                        ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                    }
                    return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
                }
            });
            LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} because the key coder is not deterministic. Falling back to singleton implementation which may cause memory and/or performance problems. Future major versions of the Flink runner will require deterministic key coders.", ptransformViewNamesWithNonDeterministicKeyCoders);
        }
    }

    @VisibleForTesting
    JobGraph getJobGraph(Pipeline p) {
        FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(this.options);
        return env.getJobGraph(p);
    }
}

