/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam.spi;

import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.source.ProducerFinder;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.service.ProducerFinderImpl;
import org.talend.sdk.component.runtime.manager.service.api.ComponentInstantiator;
import org.talend.sdk.component.runtime.serialization.SerializableService;

public class BeamProducerFinder
extends ProducerFinderImpl {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BeamProducerFinder.class);
    private static final int QUEUE_SIZE = 200;
    private static final int BEAM_PARALLELISM = 10;
    private static final Map<UUID, Queue<Record>> QUEUE = new ConcurrentHashMap<UUID, Queue<Record>>();

    public Iterator<Record> find(String familyName, String inputName, int version, Map<String, String> configuration) {
        ComponentInstantiator instantiator = this.getInstantiator(familyName, inputName);
        Mapper mapper = this.findMapper(instantiator, version, configuration);
        try {
            Input input = mapper.create();
            return this.iterator(input);
        }
        catch (Exception e) {
            log.warn("Component Kit Mapper instantiation failed, trying to wrap native beam mapper...");
            Object delegate = ((Delegated)Delegated.class.cast(mapper)).getDelegate();
            if (PTransform.class.isInstance(delegate)) {
                UUID uuid = UUID.randomUUID();
                QUEUE.put(uuid, new ArrayBlockingQueue(200, true));
                return new QueueInput(delegate, familyName, inputName, familyName, (PTransform<PBegin, PCollection<Record>>)((PTransform)PTransform.class.cast(delegate)), uuid);
            }
            throw new IllegalStateException(e);
        }
    }

    Object writeReplace() throws ObjectStreamException {
        return new SerializableService(this.plugin, ProducerFinder.class.getName());
    }

    static class QueueInput
    implements Iterator<Record>,
    Serializable {
        private final PTransform<PBegin, PCollection<Record>> transform;
        private final PipelineResult result;
        private boolean started;
        private boolean end;
        private Record next;
        private final UUID queueId;
        private Thread th;

        public QueueInput(Object delegate, String rootName, String name, String plugin, PTransform<PBegin, PCollection<Record>> transform, UUID queueId) {
            this.transform = transform;
            this.queueId = queueId;
            this.result = this.runDataReadingPipeline();
        }

        @Override
        public boolean hasNext() {
            if (this.next == null && !this.started) {
                this.next = this.findNext();
                this.started = true;
            }
            if (this.next == null) {
                QUEUE.remove(this.queueId);
            }
            return this.next != null;
        }

        @Override
        public Record next() {
            if (!this.hasNext()) {
                return null;
            }
            Record current = this.next;
            this.next = this.findNext();
            return current;
        }

        private Record findNext() {
            Queue recordQueue = (Queue)QUEUE.get(this.queueId);
            Record record = (Record)recordQueue.poll();
            int index = 0;
            while (record == null && !this.end) {
                boolean bl = this.end = this.result != null && this.result.getState() != PipelineResult.State.RUNNING;
                if (!this.end && index > 10) {
                    this.result.waitUntilFinish();
                } else {
                    ++index;
                    log.debug("findNext NULL, retry : end={}; size:{}", (Object)this.end, (Object)recordQueue.size());
                    this.sleep();
                }
                record = (Record)recordQueue.poll();
            }
            return record;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private PipelineResult runDataReadingPipeline() {
            ClassLoader beamAwareClassLoader = Pipeline.class.getClassLoader();
            ClassLoader callerClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(beamAwareClassLoader);
                DirectOptions options = (DirectOptions)PipelineOptionsFactory.as(DirectOptions.class);
                options.setRunner(DirectRunner.class);
                options.setTargetParallelism(10);
                options.setBlockOnRun(false);
                MyDoFn pushRecord = new MyDoFn(this.queueId);
                ParDo.SingleOutput of = ParDo.of((DoFn)pushRecord);
                Pipeline p = Pipeline.create((PipelineOptions)options);
                ((PCollection)p.apply(this.transform)).apply((PTransform)of);
                PipelineResult[] result = new PipelineResult[1];
                this.th = new Thread(() -> {
                    result[0] = p.run();
                });
                this.th.start();
                while (result[0] == null) {
                    this.sleep();
                }
                PipelineResult pipelineResult = result[0];
                return pipelineResult;
            }
            finally {
                Thread.currentThread().setContextClassLoader(callerClassLoader);
            }
        }

        private void sleep() {
            try {
                Thread.sleep(30L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class MyDoFn
    extends DoFn<Record, Void> {
        private final UUID queueId;

        public MyDoFn(UUID queueId) {
            this.queueId = queueId;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            Queue recordQueue = (Queue)QUEUE.get(this.queueId);
            boolean ok = recordQueue.offer((Record)context.element());
            log.debug("queue injected {}; ok={}; thread:{}", new Object[]{recordQueue.size(), ok, Thread.currentThread().getId()});
            while (!ok) {
                this.sleep();
                ok = recordQueue.offer((Record)context.element());
                log.debug("\tqueue injected retry {}; ok={}; thread:{}", new Object[]{recordQueue.size(), ok, Thread.currentThread().getId()});
            }
        }

        private void sleep() {
            try {
                Thread.sleep(20L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

