/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.flink.FlinkPortableRunnerResult;
import org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.DetachedEnvironment;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPipelineRunner
implements PortablePipelineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
    private final FlinkPipelineOptions pipelineOptions;
    private final String confDir;
    private final List<String> filesToStage;

    public FlinkPipelineRunner(FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) {
        this.pipelineOptions = pipelineOptions;
        this.confDir = confDir;
        this.filesToStage = filesToStage;
    }

    public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
        MetricsEnvironment.setMetricsSupported((boolean)false);
        FlinkPortablePipelineTranslator<FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext> translator = !this.pipelineOptions.isStreaming() && !PipelineTranslatorUtils.hasUnboundedPCollections((RunnerApi.Pipeline)pipeline) ? FlinkBatchPortablePipelineTranslator.createTranslator() : new FlinkStreamingPortablePipelineTranslator();
        return this.runPipelineWithTranslator(pipeline, jobInfo, translator);
    }

    private <T extends FlinkPortablePipelineTranslator.TranslationContext> PortablePipelineResult runPipelineWithTranslator(RunnerApi.Pipeline pipeline, JobInfo jobInfo, FlinkPortablePipelineTranslator<T> translator) throws Exception {
        LOG.info("Translating pipeline to Flink program.");
        RunnerApi.Pipeline trimmedPipeline = PipelineTrimmer.trim((RunnerApi.Pipeline)pipeline, translator.knownUrns());
        RunnerApi.Pipeline fusedPipeline = trimmedPipeline.getComponents().getTransformsMap().values().stream().anyMatch(proto -> "beam:runner:executable_stage:v1".equals(proto.getSpec().getUrn())) ? trimmedPipeline : GreedyPipelineFuser.fuse((RunnerApi.Pipeline)trimmedPipeline).toPipeline();
        FlinkPortablePipelineTranslator.Executor executor = translator.translate(translator.createTranslationContext(jobInfo, this.pipelineOptions, this.confDir, this.filesToStage), fusedPipeline);
        JobExecutionResult result = executor.execute(this.pipelineOptions.getJobName());
        return this.createPortablePipelineResult(result, this.pipelineOptions);
    }

    private PortablePipelineResult createPortablePipelineResult(JobExecutionResult result, PipelineOptions options) {
        if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
            LOG.info("Pipeline submitted in Detached mode");
            return new FlinkPortableRunnerResult.Detached();
        }
        LOG.info("Execution finished in {} msecs", (Object)result.getNetRuntime());
        Map accumulators = result.getAllAccumulatorResults();
        if (accumulators != null && !accumulators.isEmpty()) {
            LOG.info("Final accumulator values:");
            for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) {
                LOG.info("{} : {}", entry.getKey(), entry.getValue());
            }
        }
        FlinkPortableRunnerResult flinkRunnerResult = new FlinkPortableRunnerResult(accumulators, result.getNetRuntime());
        MetricsPusher metricsPusher = new MetricsPusher(flinkRunnerResult.getMetricsContainerStepMap(), (MetricsOptions)options.as(MetricsOptions.class), (PipelineResult)flinkRunnerResult);
        metricsPusher.start();
        return flinkRunnerResult;
    }

    public static void main(String[] args) throws Exception {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)PipelineOptionsFactory.create());
        FlinkPipelineRunnerConfiguration configuration = FlinkPipelineRunner.parseArgs(args);
        RunnerApi.Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath();
        Struct options = PortablePipelineJarUtils.getPipelineOptionsFromClasspath();
        FlinkPipelineOptions flinkOptions = (FlinkPipelineOptions)PipelineOptionsTranslation.fromProto((Struct)options).as(FlinkPipelineOptions.class);
        String invocationId = String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
        ArtifactApi.ProxyManifest proxyManifest = PortablePipelineJarUtils.getArtifactManifestFromClassPath();
        String retrievalToken = PortablePipelineJarUtils.stageArtifacts((ArtifactApi.ProxyManifest)proxyManifest, (PipelineOptions)flinkOptions, (String)invocationId, (String)configuration.artifactStagingPath);
        FlinkPipelineRunner runner = new FlinkPipelineRunner(flinkOptions, configuration.flinkConfDir, PipelineResources.detectClassPathResourcesToStage((ClassLoader)FlinkPipelineRunner.class.getClassLoader()));
        JobInfo jobInfo = JobInfo.create((String)invocationId, (String)flinkOptions.getJobName(), (String)retrievalToken, (Struct)options);
        try {
            runner.run(pipeline, jobInfo);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Job %s failed.", invocationId), e);
        }
        LOG.info("Job {} finished successfully.", (Object)invocationId);
    }

    private static FlinkPipelineRunnerConfiguration parseArgs(String[] args) {
        FlinkPipelineRunnerConfiguration configuration = new FlinkPipelineRunnerConfiguration();
        CmdLineParser parser = new CmdLineParser((Object)configuration);
        try {
            parser.parseArgument(args);
        }
        catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", (Throwable)e);
            parser.printUsage((OutputStream)System.err);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
        return configuration;
    }

    private static class FlinkPipelineRunnerConfiguration {
        @Option(name="--artifacts-dir", usage="The location to store staged artifact files")
        private String artifactStagingPath = Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
        @Option(name="--flink-conf-dir", usage="Directory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.")
        private String flinkConfDir = null;

        private FlinkPipelineRunnerConfiguration() {
        }
    }
}

