/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.UserStateReference;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContextFactory;
import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<InputT, OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageDoFnOperator.class);
    private final RunnerApi.ExecutableStagePayload payload;
    private final JobInfo jobInfo;
    private final FlinkExecutableStageContextFactory contextFactory;
    private final Map<String, TupleTag<?>> outputMap;
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds;
    private final ReentrantLock stateBackendLock;
    private transient ExecutableStageContext stageContext;
    private transient StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient StageBundleFactory stageBundleFactory;
    private transient ExecutableStage executableStage;
    private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
    private transient FlinkMetricContainer flinkMetricContainer;
    private transient long backupWatermarkHold = Long.MIN_VALUE;

    public ExecutableStageDoFnOperator(String stepName, Coder<WindowedValue<InputT>> windowedInputCoder, Coder<InputT> inputCoder, Map<TupleTag<?>, Coder<?>> outputCoders, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds, PipelineOptions options, RunnerApi.ExecutableStagePayload payload, JobInfo jobInfo, FlinkExecutableStageContextFactory contextFactory, Map<String, TupleTag<?>> outputMap, WindowingStrategy windowingStrategy, Coder keyCoder, KeySelector<WindowedValue<InputT>, ?> keySelector) {
        super(new NoOpDoFn(), stepName, windowedInputCoder, inputCoder, outputCoders, mainOutputTag, additionalOutputTags, outputManagerFactory, windowingStrategy, sideInputTagMapping, sideInputs, options, keyCoder, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        this.payload = payload;
        this.jobInfo = jobInfo;
        this.contextFactory = contextFactory;
        this.outputMap = outputMap;
        this.sideInputIds = sideInputIds;
        this.stateBackendLock = new ReentrantLock();
    }

    @Override
    protected Lock getLockToAcquireForStateAccessDuringBundles() {
        return this.stateBackendLock;
    }

    @Override
    public void open() throws Exception {
        this.executableStage = ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)this.payload);
        ExecutableStageDoFnOperator.initializeUserState(this.executableStage, this.getKeyedStateBackend());
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.flinkMetricContainer = new FlinkMetricContainer((RuntimeContext)this.getRuntimeContext());
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(this.executableStage);
        this.stateRequestHandler = this.getStateRequestHandler(this.executableStage);
        this.progressHandler = new BundleProgressHandler(){

            public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {
                ExecutableStageDoFnOperator.this.flinkMetricContainer.updateMetrics(ExecutableStageDoFnOperator.this.stepName, progress.getMonitoringInfosList());
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
                ExecutableStageDoFnOperator.this.flinkMetricContainer.updateMetrics(ExecutableStageDoFnOperator.this.stepName, response.getMonitoringInfosList());
            }
        };
        super.open();
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage) {
        StateRequestHandler userStateRequestHandler;
        StateRequestHandler sideInputStateHandler;
        if (executableStage.getSideInputs().size() > 0) {
            org.apache.flink.util.Preconditions.checkNotNull((Object)this.sideInputHandler);
            StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory = (StateRequestHandlers.SideInputHandlerFactory)Preconditions.checkNotNull((Object)FlinkStreamingSideInputHandlerFactory.forStage(executableStage, this.sideInputIds, this.sideInputHandler));
            try {
                sideInputStateHandler = StateRequestHandlers.forSideInputHandlerFactory((Map)ProcessBundleDescriptors.getSideInputs((ExecutableStage)executableStage), (StateRequestHandlers.SideInputHandlerFactory)sideInputHandlerFactory);
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to initialize SideInputHandler", e);
            }
        } else {
            sideInputStateHandler = StateRequestHandler.unsupported();
        }
        if (executableStage.getUserStates().size() > 0) {
            if (this.keyedStateInternals == null) {
                throw new IllegalStateException("Input must be keyed when user state is used");
            }
            userStateRequestHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)this.stageBundleFactory.getProcessBundleDescriptor(), new BagUserStateFactory(this.keyedStateInternals, this.getKeyedStateBackend(), this.stateBackendLock));
        } else {
            userStateRequestHandler = StateRequestHandler.unsupported();
        }
        EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap = new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(BeamFnApi.StateKey.TypeCase.class);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, userStateRequestHandler);
        return StateRequestHandlers.delegateBasedUponType(handlerMap);
    }

    public void setKeyContextElement1(StreamRecord record) {
    }

    public void setCurrentKey(Object key) {
    }

    public ByteBuffer getCurrentKey() {
        Preconditions.checkState((boolean)this.stateBackendLock.isLocked(), (Object)"State backend must be locked when retrieving the current key.");
        return (ByteBuffer)this.getKeyedStateBackend().getCurrentKey();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerData timerData) {
        try {
            LOG.debug("Setting timer: {} {}", timerElement, (Object)timerData);
            ByteBuffer encodedKey = (ByteBuffer)this.keySelector.getKey(timerElement);
            try {
                this.stateBackendLock.lock();
                this.getKeyedStateBackend().setCurrentKey((Object)encodedKey);
                if (timerData.getTimestamp().isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    this.timerInternals.deleteTimer(timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain());
                } else {
                    this.timerInternals.setTimer(timerData);
                }
            }
            finally {
                this.stateBackendLock.unlock();
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't set timer", e);
        }
    }

    @Override
    protected void fireTimer(InternalTimer<ByteBuffer, TimerInternals.TimerData> timer) {
        ByteBuffer encodedKey = (ByteBuffer)timer.getKey();
        try {
            this.stateBackendLock.lock();
            this.getKeyedStateBackend().setCurrentKey((Object)encodedKey);
            super.fireTimer(timer);
        }
        finally {
            this.stateBackendLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dispose() throws Exception {
        if (this.stageContext != null) {
            try (StageBundleFactory bundleFactoryCloser = this.stageBundleFactory;
                 ExecutableStageContext closable = this.stageContext;){
                super.dispose();
            }
            finally {
                this.stageContext = null;
            }
        }
    }

    @Override
    protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
        WindowedValue value = (WindowedValue)((RawUnionValue)streamRecord.getValue()).getValue();
        PCollectionView sideInput = (PCollectionView)this.sideInputTagMapping.get(((RawUnionValue)streamRecord.getValue()).getUnionTag());
        this.sideInputHandler.addSideInputValue(sideInput, value.withValue((Object)((Iterable)((KV)value.getValue()).getValue())));
    }

    @Override
    protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(DoFnRunner<InputT, OutputT> wrappedRunner) {
        this.sdkHarnessRunner = new SdkHarnessDoFnRunner(this.executableStage.getInputPCollection().getId(), this.stageBundleFactory, this.stateRequestHandler, this.progressHandler, this.outputManager, this.outputMap, (Coder<BoundedWindow>)this.windowingStrategy.getWindowFn().windowCoder(), this::setTimer, () -> FlinkKeyUtils.decodeKey(this.getCurrentKey(), this.keyCoder));
        return this.ensureStateCleanup(this.sdkHarnessRunner);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (this.sdkHarnessRunner.isBundleInProgress()) {
            if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                this.invokeFinishBundle();
                this.setPushedBackWatermark(Long.MAX_VALUE);
            } else {
                this.backupWatermarkHold = Math.max(this.backupWatermarkHold, this.getPushbackWatermarkHold());
                this.setPushedBackWatermark(Math.min(this.currentOutputWatermark, this.backupWatermarkHold));
                super.setBundleFinishedCallback(() -> {
                    try {
                        LOG.debug("processing pushed back watermark: {}", (Object)mark);
                        this.setPushedBackWatermark(this.backupWatermarkHold);
                        super.processWatermark(mark);
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Failed to process pushed back watermark after finished bundle.", e);
                    }
                });
            }
        }
        super.processWatermark(mark);
    }

    private DoFnRunner<InputT, OutputT> ensureStateCleanup(SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner) {
        if (this.keyCoder == null) {
            return sdkHarnessRunner;
        }
        Coder windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
        CleanupTimer cleanupTimer = new CleanupTimer(this.timerInternals, this.stateBackendLock, this.windowingStrategy, this.keyCoder, windowCoder, (KeyedStateBackend<ByteBuffer>)this.getKeyedStateBackend());
        List<String> userStates = this.executableStage.getUserStates().stream().map(UserStateReference::localName).collect(Collectors.toList());
        final KeyedStateBackend stateBackend = this.getKeyedStateBackend();
        final StateCleaner stateCleaner = new StateCleaner(userStates, windowCoder, () -> (ByteBuffer)stateBackend.getCurrentKey());
        return new StatefulDoFnRunner<InputT, OutputT, BoundedWindow>(sdkHarnessRunner, this.windowingStrategy, cleanupTimer, stateCleaner){

            public void finishBundle() {
                super.finishBundle();
                if (!stateCleaner.cleanupQueue.isEmpty()) {
                    try {
                        ExecutableStageDoFnOperator.this.stateBackendLock.lock();
                        stateCleaner.cleanupState(ExecutableStageDoFnOperator.this.keyedStateInternals, key -> stateBackend.setCurrentKey(key));
                    }
                    finally {
                        ExecutableStageDoFnOperator.this.stateBackendLock.unlock();
                    }
                }
            }
        };
    }

    private static void initializeUserState(ExecutableStage executableStage, @Nullable KeyedStateBackend keyedStateBackend) {
        executableStage.getUserStates().forEach(ref -> {
            try {
                keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor(ref.localName(), new CoderTypeSerializer(ByteStringCoder.of())));
            }
            catch (Exception e) {
                throw new RuntimeException("Couldn't initialize user states.", e);
            }
        });
    }

    private static class NoOpDoFn<InputT, OutputT>
    extends DoFn<InputT, OutputT> {
        private NoOpDoFn() {
        }

        @DoFn.ProcessElement
        public void doNothing(DoFn.ProcessContext context) {
        }
    }

    static class StateCleaner
    implements StatefulDoFnRunner.StateCleaner<BoundedWindow> {
        private final List<String> userStateNames;
        private final Coder windowCoder;
        private final ArrayDeque<KV<ByteBuffer, BoundedWindow>> cleanupQueue;
        private final Supplier<ByteBuffer> keyedStateBackend;

        StateCleaner(List<String> userStateNames, Coder windowCoder, Supplier<ByteBuffer> keyedStateBackend) {
            this.userStateNames = userStateNames;
            this.windowCoder = windowCoder;
            this.keyedStateBackend = keyedStateBackend;
            this.cleanupQueue = new ArrayDeque();
        }

        public void clearForWindow(BoundedWindow window) {
            this.cleanupQueue.add((KV<ByteBuffer, BoundedWindow>)KV.of((Object)this.keyedStateBackend.get(), (Object)window));
        }

        void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer> keyContextConsumer) {
            while (!this.cleanupQueue.isEmpty()) {
                KV<ByteBuffer, BoundedWindow> kv = this.cleanupQueue.remove();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("State cleanup for {} {}", (Object)Arrays.toString(((ByteBuffer)kv.getKey()).array()), kv.getValue());
                }
                keyContextConsumer.accept((ByteBuffer)kv.getKey());
                for (String userState : this.userStateNames) {
                    StateNamespace namespace = StateNamespaces.window((Coder)this.windowCoder, (BoundedWindow)((BoundedWindow)kv.getValue()));
                    StateTag bagStateStateTag = StateTags.bag((String)userState, (Coder)VoidCoder.of());
                    BagState state = (BagState)stateInternals.state(namespace, bagStateStateTag);
                    state.clear();
                }
            }
        }
    }

    static class CleanupTimer<InputT>
    implements StatefulDoFnRunner.CleanupTimer<InputT> {
        private static final String GC_TIMER_ID = "__user-state-cleanup__";
        private final TimerInternals timerInternals;
        private final Lock stateBackendLock;
        private final WindowingStrategy windowingStrategy;
        private final Coder keyCoder;
        private final Coder windowCoder;
        private final KeyedStateBackend<ByteBuffer> keyedStateBackend;

        CleanupTimer(TimerInternals timerInternals, Lock stateBackendLock, WindowingStrategy windowingStrategy, Coder keyCoder, Coder windowCoder, KeyedStateBackend<ByteBuffer> keyedStateBackend) {
            this.timerInternals = timerInternals;
            this.stateBackendLock = stateBackendLock;
            this.windowingStrategy = windowingStrategy;
            this.keyCoder = keyCoder;
            this.windowCoder = windowCoder;
            this.keyedStateBackend = keyedStateBackend;
        }

        public Instant currentInputWatermarkTime() {
            return this.timerInternals.currentInputWatermarkTime();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setForWindow(InputT input, BoundedWindow window) {
            Preconditions.checkNotNull(input, (Object)"Null input passed to CleanupTimer");
            Instant gcTime = LateDataUtils.garbageCollectionTime((BoundedWindow)window, (WindowingStrategy)this.windowingStrategy).plus(1L);
            ByteBuffer key = FlinkKeyUtils.encodeKey(((KV)input).getKey(), this.keyCoder);
            try {
                this.stateBackendLock.lock();
                this.keyedStateBackend.setCurrentKey((Object)key);
                this.timerInternals.setTimer(StateNamespaces.window((Coder)this.windowCoder, (BoundedWindow)window), GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
            }
            finally {
                this.stateBackendLock.unlock();
            }
        }

        public boolean isForWindow(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
            boolean isEventTimer = timeDomain.equals((Object)TimeDomain.EVENT_TIME);
            Instant gcTime = LateDataUtils.garbageCollectionTime((BoundedWindow)window, (WindowingStrategy)this.windowingStrategy).plus(1L);
            return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals((Object)timestamp);
        }
    }

    private static class SdkHarnessDoFnRunner<InputT, OutputT>
    implements DoFnRunner<InputT, OutputT> {
        private final String mainInput;
        private final LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
        private final StageBundleFactory stageBundleFactory;
        private final StateRequestHandler stateRequestHandler;
        private final BundleProgressHandler progressHandler;
        private final DoFnOperator.BufferedOutputManager<OutputT> outputManager;
        private final Map<String, TupleTag<?>> outputMap;
        private final Map<String, ProcessBundleDescriptors.TimerSpec> timerOutputIdToSpecMap;
        private final Coder<BoundedWindow> windowCoder;
        private final BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration;
        private final Supplier<Object> keyForTimer;
        private RemoteBundle remoteBundle;
        private FnDataReceiver<WindowedValue<?>> mainInputReceiver;

        public SdkHarnessDoFnRunner(String mainInput, StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, DoFnOperator.BufferedOutputManager<OutputT> outputManager, Map<String, TupleTag<?>> outputMap, Coder<BoundedWindow> windowCoder, BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration, Supplier<Object> keyForTimer) {
            this.mainInput = mainInput;
            this.stageBundleFactory = stageBundleFactory;
            this.stateRequestHandler = stateRequestHandler;
            this.progressHandler = progressHandler;
            this.outputManager = outputManager;
            this.outputMap = outputMap;
            this.timerRegistration = timerRegistration;
            this.timerOutputIdToSpecMap = new HashMap<String, ProcessBundleDescriptors.TimerSpec>();
            this.keyForTimer = keyForTimer;
            for (Map transformTimerMap : stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : transformTimerMap.values()) {
                    this.timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
                }
            }
            this.windowCoder = windowCoder;
            this.outputQueue = new LinkedBlockingQueue();
        }

        public void startBundle() {
            OutputReceiverFactory receiverFactory = new OutputReceiverFactory(){

                public FnDataReceiver<OutputT> create(String pCollectionId) {
                    return receivedElement -> outputQueue.put(KV.of((Object)pCollectionId, (Object)receivedElement));
                }
            };
            try {
                this.remoteBundle = this.stageBundleFactory.getBundle(receiverFactory, this.stateRequestHandler, this.progressHandler);
                this.mainInputReceiver = (FnDataReceiver)Preconditions.checkNotNull((Object)((FnDataReceiver)this.remoteBundle.getInputReceivers().get(this.mainInput)), (Object)"Failed to retrieve main input receiver.");
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to start remote bundle", e);
            }
        }

        public void processElement(WindowedValue<InputT> element) {
            try {
                LOG.debug("Sending value: {}", element);
                this.mainInputReceiver.accept(element);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to process element with SDK harness.", e);
            }
            this.emitResults();
        }

        public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
            Object timerKey = this.keyForTimer.get();
            Preconditions.checkNotNull((Object)timerKey, (Object)"Key for timer needs to be set before calling onTimer");
            Preconditions.checkNotNull((Object)this.remoteBundle, (Object)"Call to onTimer outside of a bundle");
            LOG.debug("timer callback: {} {} {} {}", new Object[]{timerId, window, timestamp, timeDomain});
            FnDataReceiver timerReceiver = (FnDataReceiver)Preconditions.checkNotNull((Object)((FnDataReceiver)this.remoteBundle.getInputReceivers().get(timerId)), (String)"No receiver found for timer %s", (Object)timerId);
            WindowedValue timerValue = WindowedValue.of((Object)KV.of((Object)timerKey, (Object)Timer.of((Instant)timestamp, (Object)new byte[0])), (Instant)timestamp, Collections.singleton(window), (PaneInfo)PaneInfo.NO_FIRING);
            try {
                timerReceiver.accept((Object)timerValue);
            }
            catch (Exception e) {
                throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer %s", timerReceiver), e);
            }
        }

        public void finishBundle() {
            try {
                this.remoteBundle.close();
                this.emitResults();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to finish remote bundle", e);
            }
            finally {
                this.remoteBundle = null;
            }
        }

        boolean isBundleInProgress() {
            return this.remoteBundle != null;
        }

        private void emitResults() {
            KV<String, OutputT> result;
            while ((result = this.outputQueue.poll()) != null) {
                String outputPCollectionId = (String)Preconditions.checkNotNull((Object)((String)result.getKey()));
                TupleTag<?> tag = this.outputMap.get(outputPCollectionId);
                WindowedValue windowedValue = (WindowedValue)Preconditions.checkNotNull((Object)((WindowedValue)result.getValue()), (String)"Received a null value from the SDK harness for %s", (Object)outputPCollectionId);
                if (tag != null) {
                    this.outputManager.output(tag, windowedValue);
                    continue;
                }
                ProcessBundleDescriptors.TimerSpec timerSpec = (ProcessBundleDescriptors.TimerSpec)Preconditions.checkNotNull((Object)this.timerOutputIdToSpecMap.get(outputPCollectionId), (String)"Unknown Pcollectionid %s", (Object)outputPCollectionId);
                Timer timer = (Timer)Preconditions.checkNotNull((Object)((Timer)((KV)windowedValue.getValue()).getValue()), (String)"Received null Timer from SDK harness: %s", (Object)windowedValue);
                LOG.debug("Timer received: {} {}", (Object)outputPCollectionId, (Object)timer);
                for (Object window : windowedValue.getWindows()) {
                    StateNamespace namespace = StateNamespaces.window(this.windowCoder, (BoundedWindow)((BoundedWindow)window));
                    TimerInternals.TimerData timerData = TimerInternals.TimerData.of((String)timerSpec.inputCollectionId(), (StateNamespace)namespace, (Instant)timer.getTimestamp(), (TimeDomain)timerSpec.getTimerSpec().getTimeDomain());
                    this.timerRegistration.accept(windowedValue, timerData);
                }
            }
        }

        public DoFn<InputT, OutputT> getFn() {
            throw new UnsupportedOperationException();
        }
    }

    private static class BagUserStateFactory<K extends ByteString, V, W extends BoundedWindow>
    implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
        private final StateInternals stateInternals;
        private final KeyedStateBackend<ByteBuffer> keyedStateBackend;
        private final Lock stateBackendLock;

        private BagUserStateFactory(StateInternals stateInternals, KeyedStateBackend<ByteBuffer> keyedStateBackend, Lock stateBackendLock) {
            this.stateInternals = stateInternals;
            this.keyedStateBackend = keyedStateBackend;
            this.stateBackendLock = stateBackendLock;
        }

        public StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(final String pTransformId, final String userStateId, Coder<K> keyCoder, final Coder<V> valueCoder, final Coder<W> windowCoder) {
            return new StateRequestHandlers.BagUserStateHandler<K, V, W>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public Iterable<V> get(K key, W window) {
                    try {
                        stateBackendLock.lock();
                        this.prepareStateBackend(key);
                        StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("State get for {} {} {} {}", new Object[]{pTransformId, userStateId, Arrays.toString(((ByteBuffer)keyedStateBackend.getCurrentKey()).array()), window});
                        }
                        BagState bagState = (BagState)stateInternals.state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                        Iterable iterable = bagState.read();
                        return iterable;
                    }
                    finally {
                        stateBackendLock.unlock();
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void append(K key, W window, Iterator<V> values) {
                    try {
                        stateBackendLock.lock();
                        this.prepareStateBackend(key);
                        StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("State append for {} {} {} {}", new Object[]{pTransformId, userStateId, Arrays.toString(((ByteBuffer)keyedStateBackend.getCurrentKey()).array()), window});
                        }
                        BagState bagState = (BagState)stateInternals.state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                        while (values.hasNext()) {
                            bagState.add(values.next());
                        }
                    }
                    finally {
                        stateBackendLock.unlock();
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void clear(K key, W window) {
                    try {
                        stateBackendLock.lock();
                        this.prepareStateBackend(key);
                        StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("State clear for {} {} {} {}", new Object[]{pTransformId, userStateId, Arrays.toString(((ByteBuffer)keyedStateBackend.getCurrentKey()).array()), window});
                        }
                        BagState bagState = (BagState)stateInternals.state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                        bagState.clear();
                    }
                    finally {
                        stateBackendLock.unlock();
                    }
                }

                private void prepareStateBackend(K key) {
                    ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray());
                    keyedStateBackend.setCurrentKey((Object)encodedKey);
                }
            };
        }
    }
}

