/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.di.beam.components;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.runtime.di.JobStateAware;
import org.talend.sdk.component.runtime.di.beam.components.DIPipeline;

class PipelineInit {
    private static final Logger log = LoggerFactory.getLogger(PipelineInit.class);

    public static void lazyStart(final JobStateAware.State jobState, Supplier<DIPipeline> pipelineSupplier) {
        AtomicBoolean pipelineStarted = jobState.getPipelineStarted();
        if (!pipelineStarted.get() && pipelineStarted.compareAndSet(false, true)) {
            Pipeline pipeline = pipelineSupplier.get();
            TransformCounter counter = new TransformCounter();
            pipeline.traverseTopologically((Pipeline.PipelineVisitor)counter);
            if (counter.transforms.get() > 0) {
                final PipelineResult result = pipeline.run();
                new Thread("talend-component-kit-di-pipeline-awaiter"){

                    @Override
                    public void run() {
                        log.debug("Starting to watch beam pipeline");
                        try {
                            result.waitUntilFinish();
                        }
                        finally {
                            PipelineResult.State state = result.getState();
                            log.debug("Exited pipeline with state {}", (Object)state.name());
                            if (state.isTerminal()) {
                                log.info("Beam pipeline ended");
                            } else {
                                log.debug("Beam pipeline ended by interruption");
                            }
                            jobState.getPipelineDone().complete(true);
                        }
                    }
                }.start();
            } else {
                jobState.getPipelineDone().complete(true);
                log.warn("A pipeline was created but not transform were found, is your job correctly configured?");
            }
        }
    }

    static DIPipeline ensurePipeline(JobStateAware.State jobState) {
        DIPipeline pipeline = jobState.get(JobStateAware.IndirectInstances.Pipeline, DIPipeline.class);
        if (pipeline == null) {
            pipeline = PipelineInit.createPipeline(PipelineInit.readOptions());
            jobState.set(JobStateAware.IndirectInstances.Pipeline, (Object)pipeline);
        }
        return pipeline;
    }

    private static DIPipeline createPipeline(PipelineOptions options) {
        PipelineRunner.fromOptions((PipelineOptions)options);
        return new DIPipeline(options);
    }

    private static PipelineOptions readOptions() {
        String[] args = (String[])Stream.concat(System.getProperties().stringPropertyNames().stream().filter(s -> s.startsWith("talend.beam.")).map(s -> s.substring("talend.beam.".length()) + "=" + System.getProperty(s)), PipelineInit.enforcedArgs()).toArray(String[]::new);
        return PipelineOptionsFactory.fromArgs((String[])args).create();
    }

    private static Stream<String> enforcedArgs() {
        if (Boolean.getBoolean("talend.runner.skip-defaults")) {
            return Stream.empty();
        }
        return Stream.of("--blockOnRun=false", "--enforceImmutability=false", "--enforceEncodability=false", "--targetParallelism=" + Math.max(1, Runtime.getRuntime().availableProcessors()));
    }

    private PipelineInit() {
    }

    private static class TransformCounter
    extends Pipeline.PipelineVisitor.Defaults {
        private final AtomicInteger transforms = new AtomicInteger(0);

        private TransformCounter() {
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            if (node.isRootNode()) {
                return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
            }
            this.transforms.incrementAndGet();
            return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
        }

        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            this.transforms.incrementAndGet();
        }
    }
}

