/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.core;

import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.repackaged.beam_runners_core_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_core_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.DoFnRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.LateDataUtils;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
    private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
    private final LateDataFilter lateDataFilter;
    public static final String DROPPED_DUE_TO_LATENESS = "droppedDueToLateness";

    public LateDataDroppingDoFnRunner(DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals) {
        this.doFnRunner = doFnRunner;
        this.lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals);
    }

    @Override
    public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getFn() {
        return this.doFnRunner.getFn();
    }

    @Override
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    @Override
    public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
        Iterable nonLateElements = this.lateDataFilter.filter(((KeyedWorkItem)elem.getValue()).key(), ((KeyedWorkItem)elem.getValue()).elementsIterable());
        KeyedWorkItem keyedWorkItem = KeyedWorkItems.workItem(((KeyedWorkItem)elem.getValue()).key(), ((KeyedWorkItem)elem.getValue()).timersIterable(), nonLateElements);
        this.doFnRunner.processElement(elem.withValue(keyedWorkItem));
    }

    @Override
    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        this.doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
    }

    @Override
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }

    @VisibleForTesting
    static class LateDataFilter {
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final TimerInternals timerInternals;
        private final Counter droppedDueToLateness;

        public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals) {
            this.windowingStrategy = windowingStrategy;
            this.timerInternals = timerInternals;
            this.droppedDueToLateness = Metrics.counter(LateDataDroppingDoFnRunner.class, (String)LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS);
        }

        public <K, InputT> Iterable<WindowedValue<InputT>> filter(K key, Iterable<WindowedValue<InputT>> elements) {
            Iterable windowsExpandedElements = StreamSupport.stream(elements.spliterator(), false).map(input -> input.getWindows().stream().map(window -> WindowedValue.of((Object)input.getValue(), (Instant)input.getTimestamp(), (BoundedWindow)window, (PaneInfo)input.getPane())).collect(Collectors.toList())).collect(Collectors.toList());
            Iterable<WindowedValue> concatElements = Iterables.concat(windowsExpandedElements);
            for (WindowedValue input2 : concatElements) {
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(input2.getWindows());
                if (!this.canDropDueToExpiredWindow(window)) continue;
                this.droppedDueToLateness.inc();
                WindowTracing.debug((String)"{}: Dropping element at {} for key:{}; window:{} since too far behind inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{LateDataFilter.class.getSimpleName(), input2.getTimestamp(), key, window, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
            }
            Iterable nonLateElements = StreamSupport.stream(concatElements.spliterator(), false).filter(input -> {
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(input.getWindows());
                return !this.canDropDueToExpiredWindow(window);
            }).collect(Collectors.toList());
            return nonLateElements;
        }

        private boolean canDropDueToExpiredWindow(BoundedWindow window) {
            Instant inputWM = this.timerInternals.currentInputWatermarkTime();
            return LateDataUtils.garbageCollectionTime(window, this.windowingStrategy).isBefore((ReadableInstant)inputWM);
        }
    }
}

