/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.SparkPortableStreamingPipelineOptions;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkStreamingPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkStreamingTranslationContext;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.SparkCommon;
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.joda.time.Instant;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkPipelineRunner
implements PortablePipelineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
    private final SparkPipelineOptions pipelineOptions;

    public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
        SparkPipelineResult result;
        boolean isStreaming = this.pipelineOptions.isStreaming() || PipelineTranslatorUtils.hasUnboundedPCollections((RunnerApi.Pipeline)pipeline);
        SparkPortablePipelineTranslator<SparkStreamingTranslationContext> translator = isStreaming ? new SparkStreamingPortablePipelineTranslator() : new SparkBatchPortablePipelineTranslator();
        RunnerApi.Pipeline pipelineWithSdfExpanded = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)pipeline, (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
        RunnerApi.Pipeline trimmedPipeline = TrivialNativeTransformExpander.forKnownUrns((RunnerApi.Pipeline)pipelineWithSdfExpanded, translator.knownUrns());
        RunnerApi.Pipeline fusedPipeline = trimmedPipeline.getComponents().getTransformsMap().values().stream().anyMatch(proto -> "beam:runner:executable_stage:v1".equals(proto.getSpec().getUrn())) ? trimmedPipeline : GreedyPipelineFuser.fuse((RunnerApi.Pipeline)trimmedPipeline).toPipeline();
        SparkCommonPipelineOptions.prepareFilesToStage(this.pipelineOptions);
        JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.pipelineOptions);
        long startTime = Instant.now().getMillis();
        EventLoggingListener eventLoggingListener = SparkCommon.startEventLoggingListener(jsc, this.pipelineOptions, startTime);
        AggregatorsAccumulator.init(this.pipelineOptions, jsc);
        MetricsEnvironment.setMetricsSupported((boolean)true);
        MetricsAccumulator.init(this.pipelineOptions, jsc);
        SparkStreamingTranslationContext context = translator.createTranslationContext(jsc, this.pipelineOptions, jobInfo);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
        if (isStreaming) {
            JavaStreamingContext jssc = context.getStreamingContext();
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper((JavaStreamingListener)new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper((JavaStreamingListener)new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
            for (JavaStreamingListener listener : ((SparkContextOptions)this.pipelineOptions.as(SparkContextOptions.class)).getListeners()) {
                LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
                jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper(listener));
            }
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper((JavaStreamingListener)new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
            jssc.checkpoint(this.pipelineOptions.getCheckpointDir());
            Long timeout = ((SparkPortableStreamingPipelineOptions)this.pipelineOptions.as(SparkPortableStreamingPipelineOptions.class)).getStreamingTimeoutMs();
            Future<?> submissionFuture = executorService.submit(() -> {
                translator.translate(fusedPipeline, context);
                LOG.info(String.format("Job %s: Pipeline translated successfully. Computing outputs", jobInfo.jobId()));
                context.computeOutputs();
                jssc.start();
                try {
                    jssc.awaitTerminationOrTimeout(timeout.longValue());
                }
                catch (InterruptedException e) {
                    LOG.warn("Streaming context interrupted, shutting down.", (Throwable)e);
                }
                jssc.stop();
                LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
            });
            result = new SparkPipelineResult.PortableStreamingMode(submissionFuture, jssc);
        } else {
            Future<?> submissionFuture = executorService.submit(() -> {
                translator.translate(fusedPipeline, context);
                LOG.info(String.format("Job %s: Pipeline translated successfully. Computing outputs", jobInfo.jobId()));
                context.computeOutputs();
                LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
            });
            result = new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
        }
        executorService.shutdown();
        result.waitUntilFinish();
        MetricsPusher metricsPusher = new MetricsPusher(MetricsAccumulator.getInstance().value(), (MetricsOptions)this.pipelineOptions.as(MetricsOptions.class), (PipelineResult)result);
        metricsPusher.start();
        if (eventLoggingListener != null) {
            eventLoggingListener.onApplicationStart(SparkCompat.buildSparkListenerApplicationStart(jsc, this.pipelineOptions, startTime, result));
            eventLoggingListener.onApplicationEnd(new SparkListenerApplicationEnd(Instant.now().getMillis()));
            eventLoggingListener.stop();
        }
        return result;
    }

    public static void main(String[] args) throws Exception {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)PipelineOptionsFactory.create());
        SparkPipelineRunnerConfiguration configuration = SparkPipelineRunner.parseArgs(args);
        String baseJobName = configuration.baseJobName == null ? PortablePipelineJarUtils.getDefaultJobName() : configuration.baseJobName;
        Preconditions.checkArgument((baseJobName != null ? 1 : 0) != 0, (Object)"No default job name found. Job name must be set using --base-job-name.");
        RunnerApi.Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath((String)baseJobName);
        Struct originalOptions = PortablePipelineJarUtils.getPipelineOptionsFromClasspath((String)baseJobName);
        String retrievalToken = (String)ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant);
        SparkPipelineOptions sparkOptions = (SparkPipelineOptions)PipelineOptionsTranslation.fromProto((Struct)originalOptions).as(SparkPipelineOptions.class);
        String invocationId = String.format("%s_%s", sparkOptions.getJobName(), UUID.randomUUID().toString());
        if (sparkOptions.getAppName() == null) {
            LOG.debug("App name was null. Using invocationId {}", (Object)invocationId);
            sparkOptions.setAppName(invocationId);
        }
        SparkPipelineRunner runner = new SparkPipelineRunner(sparkOptions);
        JobInfo jobInfo = JobInfo.create((String)invocationId, (String)sparkOptions.getJobName(), (String)retrievalToken, (Struct)PipelineOptionsTranslation.toProto((PipelineOptions)sparkOptions));
        try {
            runner.run(pipeline, jobInfo);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Job %s failed.", invocationId), e);
        }
        LOG.info("Job {} finished successfully.", (Object)invocationId);
    }

    private static SparkPipelineRunnerConfiguration parseArgs(String[] args) {
        SparkPipelineRunnerConfiguration configuration = new SparkPipelineRunnerConfiguration();
        CmdLineParser parser = new CmdLineParser((Object)configuration);
        try {
            parser.parseArgument(args);
        }
        catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", (Throwable)e);
            parser.printUsage((OutputStream)System.err);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
        return configuration;
    }

    private static class SparkPipelineRunnerConfiguration {
        @Option(name="--base-job-name", usage="The job to run. This must correspond to a subdirectory of the jar's BEAM-PIPELINE directory. *Only needs to be specified if the jar contains multiple pipelines.*")
        @Nullable
        private String baseJobName = null;

        private SparkPipelineRunnerConfiguration() {
        }
    }
}

