/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.io.Serializable;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class WatermarkCallbackExecutor {
    private final ConcurrentMap<PipelineNode.PTransformNode, PriorityQueue<WatermarkCallback>> callbacks = new ConcurrentHashMap<PipelineNode.PTransformNode, PriorityQueue<WatermarkCallback>>();
    private final Executor executor;

    public static WatermarkCallbackExecutor create(Executor executor) {
        return new WatermarkCallbackExecutor(executor);
    }

    private WatermarkCallbackExecutor(Executor executor) {
        this.executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void callOnGuaranteedFiring(PipelineNode.PTransformNode step, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback callback = WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null && this.callbacks.putIfAbsent(step, callbackQueue = new PriorityQueue(11, new CallbackOrdering())) != null) {
            callbackQueue = (PriorityQueue)this.callbacks.get(step);
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            callbackQueue.offer(callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void callOnWindowExpiration(PipelineNode.PTransformNode step, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback callback = WatermarkCallback.afterWindowExpiration(window, windowingStrategy, runnable);
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null && this.callbacks.putIfAbsent(step, callbackQueue = new PriorityQueue(11, new CallbackOrdering())) != null) {
            callbackQueue = (PriorityQueue)this.callbacks.get(step);
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            callbackQueue.offer(callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fireForWatermark(PipelineNode.PTransformNode step, Instant watermark) {
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null) {
            return;
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            while (!callbackQueue.isEmpty() && ((WatermarkCallback)callbackQueue.peek()).shouldFire(watermark)) {
                this.executor.execute(((WatermarkCallback)callbackQueue.poll()).getCallback());
            }
        }
    }

    private static class CallbackOrdering
    extends Ordering<WatermarkCallback>
    implements Serializable {
        private CallbackOrdering() {
        }

        public int compare(WatermarkCallback left, WatermarkCallback right) {
            return ComparisonChain.start().compare((Comparable)left.fireAfter, (Comparable)right.fireAfter).compare((Object)left.callback, (Object)right.callback, (Comparator)Ordering.arbitrary()).result();
        }
    }

    private static class WatermarkCallback {
        private final Instant fireAfter;
        private final Runnable callback;

        public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
            Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring(window);
            return new WatermarkCallback(firingAfter, callback);
        }

        public static <W extends BoundedWindow> WatermarkCallback afterWindowExpiration(BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
            Instant firingAfter = window.maxTimestamp().plus((ReadableDuration)strategy.getAllowedLateness()).plus(1L);
            return new WatermarkCallback(firingAfter, callback);
        }

        private WatermarkCallback(Instant fireAfter, Runnable callback) {
            this.fireAfter = fireAfter;
            this.callback = callback;
        }

        public boolean shouldFire(Instant currentWatermark) {
            return currentWatermark.isAfter((ReadableInstant)this.fireAfter) || currentWatermark.equals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
        }

        public Runnable getCallback() {
            return this.callback;
        }
    }
}

