/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.Maps;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformReplacements;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ReplacementOutputs;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

@Experimental(value=Experimental.Kind.SPLITTABLE_DO_FN)
public class SplittableParDo<InputT, OutputT, RestrictionT>
extends PTransform<PCollection<InputT>, PCollectionTuple> {
    private final DoFn<InputT, OutputT> doFn;
    private final List<PCollectionView<?>> sideInputs;
    private final TupleTag<OutputT> mainOutputTag;
    private final TupleTagList additionalOutputTags;
    private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
    public static final String SPLITTABLE_PROCESS_URN = "beam:runners_core:transforms:splittable_process:v1";
    public static final String SPLITTABLE_GBKIKWI_URN = "beam:runners_core:transforms:splittable_gbkikwi:v1";

    private SplittableParDo(DoFn<InputT, OutputT> doFn, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags, Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
        Preconditions.checkArgument(DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(), "fn must be a splittable DoFn");
        this.doFn = doFn;
        this.sideInputs = sideInputs;
        this.mainOutputTag = mainOutputTag;
        this.additionalOutputTags = additionalOutputTags;
        this.outputTagsToCoders = outputTagsToCoders;
    }

    public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forAppliedParDo(AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> parDo) {
        Preconditions.checkArgument(parDo != null, "parDo must not be null");
        try {
            HashMap<TupleTag<?>, Coder<?>> outputTagsToCoders = Maps.newHashMap();
            for (Map.Entry entry : parDo.getOutputs().entrySet()) {
                outputTagsToCoders.put((TupleTag)entry.getKey(), ((PCollection)entry.getValue()).getCoder());
            }
            return new SplittableParDo(ParDoTranslation.getDoFn(parDo), ParDoTranslation.getSideInputs(parDo), ParDoTranslation.getMainOutputTag(parDo), ParDoTranslation.getAdditionalOutputTags(parDo), outputTagsToCoders);
        }
        catch (IOException exc) {
            throw new RuntimeException(exc);
        }
    }

    public PCollectionTuple expand(PCollection<InputT> input) {
        Coder restrictionCoder = DoFnInvokers.invokerFor(this.doFn).invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
        KvCoder splitCoder = KvCoder.of((Coder)input.getCoder(), (Coder)restrictionCoder);
        PCollection keyedRestrictions = (PCollection)((PCollection)((PCollection)((PCollection)input.apply("Pair with initial restriction", (PTransform)ParDo.of(new PairWithRestrictionFn(this.doFn)))).setCoder((Coder)splitCoder).apply("Split restriction", (PTransform)ParDo.of(new SplitRestrictionFn(this.doFn)))).setCoder((Coder)splitCoder).apply("Explode windows", (PTransform)ParDo.of(new ExplodeWindowsFn()))).apply("Assign unique key", (PTransform)WithKeys.of(new RandomUniqueKeyFn()));
        return (PCollectionTuple)keyedRestrictions.apply("ProcessKeyedElements", new ProcessKeyedElements(this.doFn, input.getCoder(), restrictionCoder, input.getWindowingStrategy(), this.sideInputs, this.mainOutputTag, this.additionalOutputTags, this.outputTagsToCoders));
    }

    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
        return PCollectionViews.toAdditionalInputs(this.sideInputs);
    }

    private static class SplitRestrictionFn<InputT, RestrictionT>
    extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> {
        private final DoFn<InputT, ?> splittableFn;
        @Nullable
        private transient DoFnInvoker<InputT, ?> invoker;

        SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
            this.splittableFn = splittableFn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.splittableFn);
            this.invoker.invokeSetup();
        }

        @DoFn.ProcessElement
        public void processElement(final DoFn.ProcessContext c) {
            final Object element = ((KV)c.element()).getKey();
            this.invoker.invokeSplitRestriction(element, ((KV)c.element()).getValue(), new DoFn.OutputReceiver<RestrictionT>(){

                public void output(RestrictionT part) {
                    c.output((Object)KV.of((Object)element, part));
                }

                public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
                    throw new UnsupportedOperationException();
                }
            });
        }

        @DoFn.Teardown
        public void tearDown() {
            this.invoker.invokeTeardown();
            this.invoker = null;
        }
    }

    private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
    extends DoFn<InputT, KV<InputT, RestrictionT>> {
        private DoFn<InputT, OutputT> fn;
        @Nullable
        private transient DoFnInvoker<InputT, OutputT> invoker;

        PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
            this.fn = fn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
            this.invoker.invokeSetup();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            context.output((Object)KV.of((Object)context.element(), (Object)this.invoker.invokeGetInitialRestriction(context.element())));
        }

        @DoFn.Teardown
        public void tearDown() {
            this.invoker.invokeTeardown();
            this.invoker = null;
        }
    }

    private static class RandomUniqueKeyFn<T>
    implements SerializableFunction<T, byte[]> {
        private RandomUniqueKeyFn() {
        }

        public byte[] apply(T input) {
            byte[] key = new byte[128];
            ThreadLocalRandom.current().nextBytes(key);
            return key;
        }
    }

    public static class ProcessKeyedElementsTranslator
    implements PTransformTranslation.TransformPayloadTranslator<ProcessKeyedElements<?, ?, ?>> {
        public static PTransformTranslation.TransformPayloadTranslator create() {
            return new ProcessKeyedElementsTranslator();
        }

        private ProcessKeyedElementsTranslator() {
        }

        @Override
        public String getUrn(ProcessKeyedElements<?, ?, ?> transform) {
            return PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN;
        }

        @Override
        public RunnerApi.FunctionSpec translate(AppliedPTransform<?, ?, ProcessKeyedElements<?, ?, ?>> transform, SdkComponents components) throws IOException {
            final ProcessKeyedElements pke = (ProcessKeyedElements)transform.getTransform();
            final DoFn fn = pke.getFn();
            final DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
            final String restrictionCoderId = components.registerCoder(pke.getRestrictionCoder());
            RunnerApi.ParDoPayload payload = ParDoTranslation.payloadForParDoLike(new ParDoTranslation.ParDoLike(){

                @Override
                public RunnerApi.SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
                    return ParDoTranslation.translateDoFn(fn, pke.getMainOutputTag(), newComponents);
                }

                @Override
                public List<RunnerApi.Parameter> translateParameters() {
                    return ParDoTranslation.translateParameters(signature.processElement().extraParameters());
                }

                @Override
                public Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components) {
                    return ParDoTranslation.translateSideInputs(pke.getSideInputs(), components);
                }

                @Override
                public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components) {
                    return ImmutableMap.of();
                }

                @Override
                public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents components) {
                    return ImmutableMap.of();
                }

                @Override
                public boolean isSplittable() {
                    return true;
                }

                @Override
                public String translateRestrictionCoderId(SdkComponents newComponents) {
                    return restrictionCoderId;
                }
            }, components);
            return RunnerApi.FunctionSpec.newBuilder().setUrn(this.getUrn(pke)).setPayload(payload.toByteString()).build();
        }
    }

    @AutoService(value=TransformPayloadTranslatorRegistrar.class)
    public static class Registrar
    implements TransformPayloadTranslatorRegistrar {
        @Override
        public Map<? extends Class<? extends PTransform>, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
            return ImmutableMap.builder().put(ProcessKeyedElements.class, new ProcessKeyedElementsTranslator()).build();
        }
    }

    public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
    extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
        private final DoFn<InputT, OutputT> fn;
        private final Coder<InputT> elementCoder;
        private final Coder<RestrictionT> restrictionCoder;
        private final WindowingStrategy<InputT, ?> windowingStrategy;
        private final List<PCollectionView<?>> sideInputs;
        private final TupleTag<OutputT> mainOutputTag;
        private final TupleTagList additionalOutputTags;
        private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;

        public ProcessKeyedElements(DoFn<InputT, OutputT> fn, Coder<InputT> elementCoder, Coder<RestrictionT> restrictionCoder, WindowingStrategy<InputT, ?> windowingStrategy, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags, Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
            this.fn = fn;
            this.elementCoder = elementCoder;
            this.restrictionCoder = restrictionCoder;
            this.windowingStrategy = windowingStrategy;
            this.sideInputs = sideInputs;
            this.mainOutputTag = mainOutputTag;
            this.additionalOutputTags = additionalOutputTags;
            this.outputTagsToCoders = outputTagsToCoders;
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public Coder<InputT> getElementCoder() {
            return this.elementCoder;
        }

        public Coder<RestrictionT> getRestrictionCoder() {
            return this.restrictionCoder;
        }

        public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
            return this.windowingStrategy;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.mainOutputTag;
        }

        public TupleTagList getAdditionalOutputTags() {
            return this.additionalOutputTags;
        }

        public Map<TupleTag<?>, Coder<?>> getOutputTagsToCoders() {
            return this.outputTagsToCoders;
        }

        public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>> input) {
            return ProcessKeyedElements.createPrimitiveOutputFor(input, this.fn, this.mainOutputTag, this.additionalOutputTags, this.outputTagsToCoders, this.windowingStrategy);
        }

        public static <OutputT> PCollectionTuple createPrimitiveOutputFor(PCollection<?> input, DoFn<?, OutputT> fn, TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags, Map<TupleTag<?>, Coder<?>> outputTagsToCoders, WindowingStrategy<?, ?> windowingStrategy) {
            DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
            PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal((Pipeline)input.getPipeline(), (TupleTagList)TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()), outputTagsToCoders, windowingStrategy, (PCollection.IsBounded)input.isBounded().and(signature.isBoundedPerElement()));
            outputs.get(mainOutputTag).setTypeDescriptor(fn.getOutputTypeDescriptor());
            return outputs;
        }

        public Map<TupleTag<?>, PValue> getAdditionalInputs() {
            return PCollectionViews.toAdditionalInputs(this.sideInputs);
        }
    }

    private static class ExplodeWindowsFn<InputT>
    extends DoFn<InputT, InputT> {
        private ExplodeWindowsFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, BoundedWindow window) {
            c.output(c.element());
        }
    }

    public static class OverrideFactory<InputT, OutputT>
    implements PTransformOverrideFactory<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
        public PTransformOverrideFactory.PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), SplittableParDo.forAppliedParDo(transform));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
            return ReplacementOutputs.tagged(outputs, (POutput)newOutput);
        }
    }
}

