/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

public class FlinkStreamingSideInputHandlerFactory
implements StateRequestHandlers.SideInputHandlerFactory {
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputToCollection;
    private final SideInputHandler runnerHandler;

    public static FlinkStreamingSideInputHandlerFactory forStage(ExecutableStage stage, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> viewMapping, SideInputHandler runnerHandler) {
        ImmutableMap.Builder sideInputBuilder = ImmutableMap.builder();
        for (SideInputReference sideInput : stage.getSideInputs()) {
            RunnerApi.ExecutableStagePayload.SideInputId sideInputId = RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(sideInput.transform().getId()).setLocalName(sideInput.localName()).build();
            sideInputBuilder.put((Object)sideInputId, (Object)((PCollectionView)Preconditions.checkNotNull(viewMapping.get(sideInputId), (String)"No side input for %s/%s", (Object)sideInputId.getTransformId(), (Object)sideInputId.getLocalName())));
        }
        FlinkStreamingSideInputHandlerFactory factory = new FlinkStreamingSideInputHandlerFactory((Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>)sideInputBuilder.build(), runnerHandler);
        return factory;
    }

    private FlinkStreamingSideInputHandlerFactory(Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputToCollection, SideInputHandler runnerHandler) {
        this.sideInputToCollection = sideInputToCollection;
        this.runnerHandler = runnerHandler;
    }

    public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String transformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, Coder<T> elementCoder, Coder<W> windowCoder) {
        PCollectionView<?> collectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
        Preconditions.checkArgument((collectionNode != null ? 1 : 0) != 0, (String)"No side input for %s/%s", (Object)transformId, (Object)sideInputId);
        if (PTransformTranslation.ITERABLE_SIDE_INPUT.equals(accessPattern.getUrn())) {
            Coder<T> outputCoder = elementCoder;
            return this.forIterableSideInput(collectionNode, outputCoder);
        }
        if (PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
            KvCoder kvCoder = (KvCoder)elementCoder;
            return this.forMultimapSideInput(collectionNode, kvCoder.getKeyCoder(), kvCoder.getValueCoder());
        }
        throw new IllegalArgumentException(String.format("Unknown side input access pattern: %s", accessPattern));
    }

    private <T, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<T, W> forIterableSideInput(final PCollectionView<?> collection, final Coder<T> elementCoder) {
        return new StateRequestHandlers.SideInputHandler<T, W>(){

            public Iterable<T> get(byte[] key, W window) {
                return (Iterable)Preconditions.checkNotNull((Object)FlinkStreamingSideInputHandlerFactory.this.runnerHandler.getIterable(collection, window), (Object)"Element processed by SDK before side input is ready");
            }

            public Coder<T> resultCoder() {
                return elementCoder;
            }
        };
    }

    private <K, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forMultimapSideInput(final PCollectionView<?> collection, final Coder<K> keyCoder, final Coder<V> valueCoder) {
        return new StateRequestHandlers.SideInputHandler<V, W>(){

            public Iterable<V> get(byte[] key, W window) {
                Iterable values = FlinkStreamingSideInputHandlerFactory.this.runnerHandler.getIterable(collection, window);
                ArrayList<Object> result = new ArrayList<Object>();
                for (KV kv : values) {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    try {
                        keyCoder.encode(kv.getKey(), (OutputStream)bos);
                        if (!Arrays.equals(key, bos.toByteArray())) continue;
                        result.add(kv.getValue());
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                return result;
            }

            public Coder<V> resultCoder() {
                return valueCoder;
            }
        };
    }
}

