/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceUtil;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.concurrent.NeverCompleteFuture;

@Internal
class ProcessingTimeServiceImpl
implements ProcessingTimeService {
    private final TimerService timerService;
    private final Function<ProcessingTimeCallback, ProcessingTimeCallback> processingTimeCallbackWrapper;
    private final AtomicInteger numRunningTimers;
    private final CompletableFuture<Void> quiesceCompletedFuture;
    private volatile boolean quiesced;

    ProcessingTimeServiceImpl(TimerService timerService, Function<ProcessingTimeCallback, ProcessingTimeCallback> processingTimeCallbackWrapper) {
        this.timerService = timerService;
        this.processingTimeCallbackWrapper = processingTimeCallbackWrapper;
        this.numRunningTimers = new AtomicInteger(0);
        this.quiesceCompletedFuture = new CompletableFuture();
        this.quiesced = false;
    }

    @Override
    public long getCurrentProcessingTime() {
        return this.timerService.getCurrentProcessingTime();
    }

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        if (this.isQuiesced()) {
            return new NeverCompleteFuture(ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, this.getCurrentProcessingTime()));
        }
        return this.timerService.registerTimer(timestamp, this.addQuiesceProcessingToCallback(this.processingTimeCallbackWrapper.apply(target)));
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        if (this.isQuiesced()) {
            return new NeverCompleteFuture(initialDelay);
        }
        return this.timerService.scheduleAtFixedRate(this.addQuiesceProcessingToCallback(this.processingTimeCallbackWrapper.apply(callback)), initialDelay, period);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
        if (this.isQuiesced()) {
            return new NeverCompleteFuture(initialDelay);
        }
        return this.timerService.scheduleWithFixedDelay(this.addQuiesceProcessingToCallback(this.processingTimeCallbackWrapper.apply(callback)), initialDelay, period);
    }

    @Override
    public CompletableFuture<Void> quiesce() {
        if (!this.quiesced) {
            this.quiesced = true;
            if (this.numRunningTimers.get() == 0) {
                this.quiesceCompletedFuture.complete(null);
            }
        }
        return this.quiesceCompletedFuture;
    }

    private boolean isQuiesced() {
        return this.quiesced;
    }

    private ProcessingTimeCallback addQuiesceProcessingToCallback(ProcessingTimeCallback callback) {
        return timestamp -> {
            if (this.isQuiesced()) {
                return;
            }
            this.numRunningTimers.incrementAndGet();
            try {
                if (!this.isQuiesced()) {
                    callback.onProcessingTime(timestamp);
                }
            }
            finally {
                if (this.numRunningTimers.decrementAndGet() == 0 && this.isQuiesced()) {
                    this.quiesceCompletedFuture.complete(null);
                }
            }
        };
    }
}

