/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam.impl;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.base.Serializer;
import org.talend.sdk.component.runtime.base.lang.exception.InvocationExceptionWrapper;
import org.talend.sdk.component.runtime.beam.error.ErrorFactory;
import org.talend.sdk.component.runtime.beam.impl.InMemoryArgumentProvider;
import org.talend.sdk.component.runtime.output.InputFactory;
import org.talend.sdk.component.runtime.output.OutputFactory;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.serialization.ContainerFinder;
import org.talend.sdk.component.runtime.serialization.EnhancedObjectInputStream;

public class BeamProcessorImpl
implements Processor,
Serializable,
Delegated {
    private static final Object[] EMPTY_ARGS = new Object[0];
    private static final Supplier<Object[]> EMPTY_ARGS_SUPPLIER = () -> EMPTY_ARGS;
    private final PipelineOptions options = PipelineOptionsFactory.create();
    private final Object original;
    private final String family;
    private final String name;
    private final String plugin;
    private final DoFn<?, ?> delegate;
    private final InMemoryArgumentProvider argumentProvider;
    private final Method processElement;
    private final Method setup;
    private final Method tearDown;
    private final Method startBundle;
    private final Method finishBundle;
    private final Supplier<Object[]> processArgumentFactory;
    private final Supplier<Object[]> startBundleArgumentFactory;
    private final Supplier<Object[]> finishBundleArgumentFactory;
    private final ClassLoader loader;

    protected BeamProcessorImpl(Object initialInstance, DoFn<?, ?> transform, String plugin, String family, String name) {
        this.loader = ContainerFinder.Instance.get().find(plugin).classloader();
        this.original = initialInstance;
        this.delegate = transform;
        if (this.delegate == null) {
            throw new IllegalArgumentException("Didn't find the do fn associated with " + transform);
        }
        this.argumentProvider = new InMemoryArgumentProvider(this.options);
        this.processElement = this.findMethod(this.delegate.getClass(), DoFn.ProcessElement.class).findFirst().orElseThrow(() -> new IllegalArgumentException("No @ProcessElement on " + this.delegate));
        this.setup = this.findMethod(this.delegate.getClass(), DoFn.Setup.class).findFirst().orElse(null);
        this.tearDown = this.findMethod(this.delegate.getClass(), DoFn.Teardown.class).findFirst().orElse(null);
        this.startBundle = this.findMethod(this.delegate.getClass(), DoFn.StartBundle.class).findFirst().orElse(null);
        this.finishBundle = this.findMethod(this.delegate.getClass(), DoFn.FinishBundle.class).findFirst().orElse(null);
        Stream.of(this.setup, this.tearDown, this.startBundle, this.finishBundle, this.processElement).filter(Objects::nonNull).forEach(m -> {
            if (!m.isAccessible()) {
                m.setAccessible(true);
            }
        });
        Collection argSupplier = Stream.of(this.processElement.getParameters()).map(p -> {
            Class<?> type = p.getType();
            if (DoFn.ProcessContext.class == type) {
                return () -> this.argumentProvider.processContext(this.delegate);
            }
            if (DoFn.OnTimerContext.class == type) {
                return () -> this.argumentProvider.onTimerContext(this.delegate);
            }
            if (BoundedWindow.class.isAssignableFrom(type)) {
                return this.argumentProvider::window;
            }
            if (PipelineOptions.class == type) {
                return () -> this.options;
            }
            if (RestrictionTracker.class.isAssignableFrom(type)) {
                return this.argumentProvider::restrictionTracker;
            }
            if (Timer.class == type) {
                String id = Optional.ofNullable(p.getAnnotation(DoFn.TimerId.class)).map(DoFn.TimerId::value).orElseThrow(() -> new IllegalArgumentException("Missing @TimerId on " + p.getName()));
                return () -> this.argumentProvider.timer(id);
            }
            if (State.class == type) {
                String id = Optional.ofNullable(p.getAnnotation(DoFn.StateId.class)).map(DoFn.StateId::value).orElseThrow(() -> new IllegalArgumentException("Missing @StateId on " + p.getName()));
                return () -> this.argumentProvider.state(id);
            }
            if (p.isAnnotationPresent(DoFn.Element.class)) {
                return () -> this.argumentProvider.element(null);
            }
            if (p.isAnnotationPresent(DoFn.Timestamp.class)) {
                return Instant::now;
            }
            if (p.getType() == DoFn.OutputReceiver.class) {
                return () -> this.argumentProvider.outputReceiver(null);
            }
            if (p.getType() == DoFn.MultiOutputReceiver.class) {
                return () -> this.argumentProvider.taggedOutputReceiver(null);
            }
            throw new IllegalArgumentException("unsupported parameter of type " + type);
        }).collect(Collectors.toList());
        this.processArgumentFactory = () -> argSupplier.stream().map(Supplier::get).toArray(Object[]::new);
        this.startBundleArgumentFactory = this.startBundle != null ? ((argSupplier = (Collection)Stream.of(this.startBundle.getParameters()).map(p -> {
            Class<?> type = p.getType();
            if (DoFn.StartBundleContext.class == type) {
                return () -> this.argumentProvider.startBundleContext(this.delegate);
            }
            throw new IllegalArgumentException("unsupported parameter of type " + type + " for " + this.startBundle);
        }).collect(Collectors.toList())).isEmpty() ? EMPTY_ARGS_SUPPLIER : () -> argSupplier.stream().map(Supplier::get).toArray(Object[]::new)) : null;
        this.finishBundleArgumentFactory = this.finishBundle != null ? ((argSupplier = (Collection)Stream.of(this.finishBundle.getParameters()).map(p -> {
            Class<?> type = p.getType();
            if (DoFn.FinishBundleContext.class == type) {
                return () -> this.argumentProvider.finishBundleContext(this.delegate);
            }
            throw new IllegalArgumentException("unsupported parameter of type " + type + " for " + this.finishBundle);
        }).collect(Collectors.toList())).isEmpty() ? EMPTY_ARGS_SUPPLIER : () -> argSupplier.stream().map(Supplier::get).toArray(Object[]::new)) : null;
        this.plugin = plugin;
        this.family = family;
        this.name = name;
    }

    public void start() {
        this.execute(() -> {
            if (this.setup != null) {
                try {
                    this.setup.invoke(this.delegate, new Object[0]);
                }
                catch (IllegalAccessException e) {
                    throw ErrorFactory.toIllegalState(e);
                }
                catch (InvocationTargetException e) {
                    throw InvocationExceptionWrapper.toRuntimeException((InvocationTargetException)e);
                }
            }
        });
    }

    public void beforeGroup() {
        this.execute(() -> {
            if (this.startBundle != null) {
                try {
                    this.startBundle.invoke(this.delegate, this.startBundleArgumentFactory.get());
                }
                catch (IllegalAccessException e) {
                    throw ErrorFactory.toIllegalState(e);
                }
                catch (InvocationTargetException e) {
                    throw InvocationExceptionWrapper.toRuntimeException((InvocationTargetException)e);
                }
            }
        });
    }

    public void onNext(InputFactory input, OutputFactory output) {
        this.execute(() -> {
            this.argumentProvider.setInputs(input);
            this.argumentProvider.setOutputs(output);
            try {
                this.processElement.invoke(this.delegate, this.processArgumentFactory.get());
            }
            catch (IllegalAccessException e) {
                throw ErrorFactory.toIllegalState(e);
            }
            catch (InvocationTargetException e) {
                throw InvocationExceptionWrapper.toRuntimeException((InvocationTargetException)e);
            }
            finally {
                this.argumentProvider.setInputs(null);
                this.argumentProvider.setOutputs(null);
            }
        });
    }

    public void afterGroup(OutputFactory outputs) {
        this.execute(() -> {
            if (this.finishBundle != null) {
                this.argumentProvider.setOutputs(outputs);
                try {
                    this.finishBundle.invoke(this.delegate, this.finishBundleArgumentFactory.get());
                }
                catch (IllegalAccessException e) {
                    throw ErrorFactory.toIllegalState(e);
                }
                catch (InvocationTargetException e) {
                    throw InvocationExceptionWrapper.toRuntimeException((InvocationTargetException)e);
                }
                finally {
                    this.argumentProvider.setOutputs(null);
                }
            }
        });
    }

    public void stop() {
        this.execute(() -> {
            if (this.tearDown != null) {
                try {
                    this.tearDown.invoke(this.delegate, new Object[0]);
                }
                catch (IllegalAccessException e) {
                    throw ErrorFactory.toIllegalState(e);
                }
                catch (InvocationTargetException e) {
                    throw InvocationExceptionWrapper.toRuntimeException((InvocationTargetException)e);
                }
            }
        });
    }

    private Stream<Method> findMethod(Class<?> aClass, Class<? extends Annotation> marker) {
        return Stream.concat(Stream.of(aClass.getDeclaredMethods()).filter(m -> m.isAnnotationPresent(marker)), DoFn.class == aClass.getSuperclass() ? Stream.empty() : this.findMethod(aClass.getSuperclass(), marker));
    }

    private void execute(Runnable task) {
        Thread thread = Thread.currentThread();
        ClassLoader tccl = thread.getContextClassLoader();
        thread.setContextClassLoader(this.loader);
        try {
            task.run();
        }
        catch (IllegalArgumentException | IllegalStateException ex) {
            throw ex;
        }
        catch (RuntimeException ex) {
            throw ErrorFactory.toIllegalState(ex);
        }
        finally {
            thread.setContextClassLoader(tccl);
        }
    }

    public String plugin() {
        return this.plugin;
    }

    public String rootName() {
        return this.family;
    }

    public String name() {
        return this.name;
    }

    public Object getDelegate() {
        return this.original;
    }

    Object writeReplace() throws ObjectStreamException {
        return new SerializationReplacer(this.plugin(), this.rootName(), this.name(), Serializer.toBytes(this.delegate));
    }

    private static class SerializationReplacer
    implements Serializable {
        private final String plugin;
        private final String component;
        private final String name;
        private final byte[] value;

        Object readResolve() throws ObjectStreamException {
            try {
                DoFn doFn = (DoFn)DoFn.class.cast(this.loadDelegate());
                return new BeamProcessorImpl(doFn, doFn, this.plugin, this.component, this.name);
            }
            catch (IOException | ClassNotFoundException e) {
                throw new InvalidObjectException(e.getMessage());
            }
        }

        private Serializable loadDelegate() throws IOException, ClassNotFoundException {
            try (EnhancedObjectInputStream ois = new EnhancedObjectInputStream((InputStream)new ByteArrayInputStream(this.value), ContainerFinder.Instance.get().find(this.plugin).classloader());){
                Serializable serializable = (Serializable)Serializable.class.cast(ois.readObject());
                return serializable;
            }
        }

        public SerializationReplacer(String plugin, String component, String name, byte[] value) {
            this.plugin = plugin;
            this.component = component;
            this.name = name;
            this.value = value;
        }
    }
}

