/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTranslationContext {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTranslationContext.class);
    private final Map<PValue, Dataset<?>> datasets;
    private final Set<Dataset<?>> leaves;
    private final SerializablePipelineOptions serializablePipelineOptions;
    @SuppressFBWarnings(value={"URF_UNREAD_FIELD"})
    private AppliedPTransform<?, ?, ?> currentTransform;
    @SuppressFBWarnings(value={"URF_UNREAD_FIELD"})
    private final SparkSession sparkSession;
    private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;

    public AbstractTranslationContext(SparkStructuredStreamingPipelineOptions options) {
        int numPartitions;
        String sparkMaster;
        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster(options.getSparkMaster());
        sparkConf.setAppName(options.getAppName());
        if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
            sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
        }
        if ((sparkMaster = options.getSparkMaster()) != null && sparkMaster.startsWith("local[") && System.getProperty("spark.sql.shuffle.partitions") == null && (numPartitions = Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1))) > 0) {
            sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions));
        }
        this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
        this.serializablePipelineOptions = new SerializablePipelineOptions((PipelineOptions)options);
        this.datasets = new HashMap();
        this.leaves = new HashSet();
        this.broadcastDataSets = new HashMap();
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    public SerializablePipelineOptions getSerializableOptions() {
        return this.serializablePipelineOptions;
    }

    public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
        this.currentTransform = currentTransform;
    }

    public AppliedPTransform<?, ?, ?> getCurrentTransform() {
        return this.currentTransform;
    }

    public <T> Dataset<T> emptyDataset() {
        return this.sparkSession.emptyDataset(EncoderHelpers.fromBeamCoder(VoidCoder.of()));
    }

    public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
        Dataset<?> dataset = this.datasets.get(value);
        this.leaves.remove(dataset);
        return dataset;
    }

    public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> dataset) {
        if (!this.datasets.containsKey(value)) {
            this.datasets.put(value, dataset);
            this.leaves.add(dataset);
        }
    }

    public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset) {
        if (!this.datasets.containsKey(value)) {
            this.datasets.put(value, dataset);
            this.leaves.add(dataset);
        }
    }

    public <ViewT, ElemT> void setSideInputDataset(PCollectionView<ViewT> value, Dataset<WindowedValue<ElemT>> set) {
        if (!this.broadcastDataSets.containsKey(value)) {
            this.broadcastDataSets.put(value, set);
        }
    }

    public <T> Dataset<T> getSideInputDataSet(PCollectionView<?> value) {
        return this.broadcastDataSets.get(value);
    }

    public PValue getInput() {
        return (PValue)Iterables.getOnlyElement((Iterable)TransformInputs.nonAdditionalInputs(this.currentTransform));
    }

    public Map<TupleTag<?>, PCollection<?>> getInputs() {
        return this.currentTransform.getInputs();
    }

    public PValue getOutput() {
        return (PValue)Iterables.getOnlyElement(this.currentTransform.getOutputs().values());
    }

    public Map<TupleTag<?>, PCollection<?>> getOutputs() {
        return this.currentTransform.getOutputs();
    }

    public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
        return this.currentTransform.getOutputs().entrySet().stream().filter(e -> e.getValue() instanceof PCollection).collect(Collectors.toMap(Map.Entry::getKey, e -> ((PCollection)e.getValue()).getCoder()));
    }

    public void startPipeline() {
        SparkStructuredStreamingPipelineOptions options = (SparkStructuredStreamingPipelineOptions)this.serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class);
        int datasetIndex = 0;
        for (Dataset<?> dataset : this.leaves) {
            if (options.isStreaming()) {
                DataStreamWriter dataStreamWriter = dataset.writeStream();
                if (options.getCheckpointDir() != null) {
                    dataStreamWriter = dataStreamWriter.option("checkpointLocation", options.getCheckpointDir());
                }
                this.launchStreaming(dataStreamWriter.foreach(new NoOpForeachWriter()));
                continue;
            }
            if (options.getTestMode()) {
                LOG.debug("**** dataset {} catalyst execution plans ****", (Object)(++datasetIndex));
                dataset.explain(true);
            }
            dataset.foreach((ForeachFunction & Serializable)t -> {});
        }
    }

    public abstract void launchStreaming(DataStreamWriter<?> var1);

    public static void printDatasetContent(Dataset<WindowedValue> dataset) {
        List windowedValues = dataset.collectAsList();
        for (WindowedValue windowedValue : windowedValues) {
            LOG.debug("**** dataset content {} ****", (Object)windowedValue.toString());
        }
    }

    private static class NoOpForeachWriter<T>
    extends ForeachWriter<T> {
        private NoOpForeachWriter() {
        }

        public boolean open(long partitionId, long epochId) {
            return false;
        }

        public void process(T value) {
        }

        public void close(Throwable errorOrNull) {
        }
    }
}

