/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam.chain.impl;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.runtime.beam.TalendFn;
import org.talend.sdk.component.runtime.beam.TalendIO;
import org.talend.sdk.component.runtime.beam.coder.registry.SchemaRegistryCoder;
import org.talend.sdk.component.runtime.beam.transform.AutoKVWrapper;
import org.talend.sdk.component.runtime.beam.transform.CoGroupByKeyResultMappingTransform;
import org.talend.sdk.component.runtime.beam.transform.RecordBranchFilter;
import org.talend.sdk.component.runtime.beam.transform.RecordBranchMapper;
import org.talend.sdk.component.runtime.beam.transform.RecordBranchUnwrapper;
import org.talend.sdk.component.runtime.beam.transform.RecordKVUnwrapper;
import org.talend.sdk.component.runtime.beam.transform.RecordNormalizer;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.chain.GroupKeyProvider;
import org.talend.sdk.component.runtime.manager.chain.Job;
import org.talend.sdk.component.runtime.manager.chain.internal.JobImpl;
import org.talend.sdk.component.runtime.output.Processor;

public class BeamExecutor
implements Job.ExecutorBuilder {
    private final JobImpl.JobExecutor delegate;

    public Job.ExecutorBuilder property(String name, Object value) {
        this.delegate.property(name, value);
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void run() {
        try {
            Map<String, Mapper> mappers = this.delegate.getLevels().values().stream().flatMap(Collection::stream).filter(Job.Component::isSource).collect(Collectors.toMap(Job.Component::getId, e -> (Mapper)this.delegate.getManager().findMapper(e.getNode().getFamily(), e.getNode().getComponent(), e.getNode().getVersion(), e.getNode().getConfiguration()).orElseThrow(() -> new IllegalStateException("No mapper found for: " + e.getNode()))));
            Map<String, Processor> processors = this.delegate.getLevels().values().stream().flatMap(Collection::stream).filter(component -> !component.isSource()).collect(Collectors.toMap(Job.Component::getId, e -> (Processor)this.delegate.getManager().findProcessor(e.getNode().getFamily(), e.getNode().getComponent(), e.getNode().getVersion(), e.getNode().getConfiguration()).orElseThrow(() -> new IllegalStateException("No processor found for:" + e.getNode()))));
            Pipeline pipeline = Pipeline.create((PipelineOptions)this.createPipelineOptions());
            HashMap pCollections = new HashMap();
            this.delegate.getLevels().values().stream().flatMap(Collection::stream).forEach(component -> {
                if (component.isSource()) {
                    Mapper mapper = (Mapper)mappers.get(component.getId());
                    pCollections.put(component.getId(), ((PCollection)pipeline.apply(this.toName("TalendIO", (Job.Component)component), TalendIO.read(mapper))).apply(this.toName("RecordNormalizer", (Job.Component)component), RecordNormalizer.of(mapper.plugin())));
                } else {
                    PCollection preparedInput;
                    Processor processor = (Processor)processors.get(component.getId());
                    List<Job.Edge> joins = this.getEdges(this.delegate.getEdges(), (Job.Component)component, e -> e.getTo().getNode());
                    Map<String, PCollection> inputs = joins.stream().collect(Collectors.toMap(e -> e.getTo().getBranch(), e -> {
                        PCollection pc = (PCollection)pCollections.get(e.getFrom().getNode().getId());
                        PCollection filteredInput = (PCollection)pc.apply(this.toName("RecordBranchFilter", (Job.Component)component, (Job.Edge)e), RecordBranchFilter.of(processor.plugin(), e.getFrom().getBranch()));
                        PCollection mappedInput = e.getFrom().getBranch().equals(e.getTo().getBranch()) ? filteredInput : (PCollection)filteredInput.apply(this.toName("RecordBranchMapper", (Job.Component)component, (Job.Edge)e), RecordBranchMapper.of(processor.plugin(), e.getFrom().getBranch(), e.getTo().getBranch()));
                        return (PCollection)((PCollection)mappedInput.apply(this.toName("RecordBranchUnwrapper", (Job.Component)component, (Job.Edge)e), RecordBranchUnwrapper.of(processor.plugin(), e.getTo().getBranch()))).apply(this.toName("AutoKVWrapper", (Job.Component)component, (Job.Edge)e), AutoKVWrapper.of(processor.plugin(), (Function<GroupKeyProvider.GroupContext, String>)this.delegate.getKeyProvider(component.getId()), component.getId(), e.getFrom().getBranch()));
                    }));
                    if (inputs.size() == 1) {
                        Map.Entry<String, PCollection> input = inputs.entrySet().iterator().next();
                        preparedInput = (PCollection)((PCollection)input.getValue().apply(this.toName("RecordKVUnwrapper", (Job.Component)component), (PTransform)ParDo.of((DoFn)new RecordKVUnwrapper()))).setCoder((Coder)SchemaRegistryCoder.of()).apply(this.toName("RecordNormalizer", (Job.Component)component), RecordNormalizer.of(processor.plugin()));
                    } else {
                        KeyedPCollectionTuple join = null;
                        for (Map.Entry<String, PCollection> entry : inputs.entrySet()) {
                            TupleTag branch = new TupleTag(entry.getKey());
                            join = join == null ? KeyedPCollectionTuple.of((TupleTag)branch, (PCollection)entry.getValue()) : join.and(branch, entry.getValue());
                        }
                        preparedInput = (PCollection)((PCollection)join.apply(this.toName("CoGroupByKey", (Job.Component)component), (PTransform)CoGroupByKey.create())).apply(this.toName("CoGroupByKeyResultMappingTransform", (Job.Component)component), new CoGroupByKeyResultMappingTransform(processor.plugin(), true));
                    }
                    if (this.getEdges(this.delegate.getEdges(), (Job.Component)component, e -> e.getFrom().getNode()).isEmpty()) {
                        TalendIO.Write write = TalendIO.write(processor);
                        preparedInput.apply(this.toName("Output", (Job.Component)component), (PTransform)write);
                    } else {
                        PTransform<PCollection<Record>, PCollection<Record>> process = TalendFn.asFn(processor);
                        pCollections.put(component.getId(), preparedInput.apply(this.toName("Processor", (Job.Component)component), process));
                    }
                }
            });
            PipelineResult result = pipeline.run();
            result.waitUntilFinish();
            while (PipelineResult.State.RUNNING.equals((Object)result.getState())) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e2) {
                    throw new IllegalStateException("the job was aborted", e2);
                    return;
                }
            }
        }
        finally {
            this.delegate.getLevels().values().stream().flatMap(Collection::stream).map(Job.Component::getId).forEach(JobImpl.LocalSequenceHolder::clean);
        }
    }

    private String toName(String transform, Job.Component component, Job.Edge e) {
        return String.format(transform + "/step=%s,from=%s(%s)-to=%s(%s)", component.getId(), e.getFrom().getNode().getId(), e.getFrom().getBranch(), e.getTo().getNode().getId(), e.getTo().getBranch());
    }

    private String toName(String transform, Job.Component component) {
        return String.format(transform + "/%s", component.getId());
    }

    private List<Job.Edge> getEdges(List<Job.Edge> edges, Job.Component step, Function<Job.Edge, Job.Component> componentMapper) {
        return edges.stream().filter(edge -> ((Job.Component)componentMapper.apply((Job.Edge)edge)).equals((Object)step)).collect(Collectors.toList());
    }

    private PipelineOptions createPipelineOptions() {
        return PipelineOptionsFactory.fromArgs((String[])((String[])System.getProperties().stringPropertyNames().stream().filter(p -> p.startsWith("talend.beam.job.")).map(k -> "--" + k.substring("talend.beam.job.".length()) + "=" + System.getProperty(k)).toArray(String[]::new))).create();
    }

    public BeamExecutor(JobImpl.JobExecutor delegate) {
        this.delegate = delegate;
    }
}

