/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;

public class BatchExecutionInternalTimeServiceManager<K>
implements InternalTimeServiceManager<K>,
KeyedStateBackend.KeySelectionListener<K> {
    private final ProcessingTimeService processingTimeService;
    private final Map<String, BatchExecutionInternalTimeService<K, ?>> timerServices = new HashMap();

    public BatchExecutionInternalTimeServiceManager(ProcessingTimeService processingTimeService) {
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
    }

    @Override
    public <N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        BatchExecutionInternalTimeService<K, Object> timerService = this.timerServices.get(name);
        if (timerService == null) {
            timerService = new BatchExecutionInternalTimeService<K, N>(this.processingTimeService, triggerable);
            this.timerServices.put(name, timerService);
        }
        return timerService;
    }

    @Override
    public void advanceWatermark(Watermark watermark) {
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            this.keySelected(null);
        }
    }

    @Override
    public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream context, String operatorName) throws Exception {
        throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution");
    }

    public static <K> InternalTimeServiceManager<K> create(CheckpointableKeyedStateBackend<K> keyedStatedBackend, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates, StreamTaskCancellationContext cancellationContext) {
        Preconditions.checkState((boolean)(keyedStatedBackend instanceof BatchExecutionKeyedStateBackend), (Object)"Batch execution specific time service can work only with BatchExecutionKeyedStateBackend");
        BatchExecutionInternalTimeServiceManager<K> timeServiceManager = new BatchExecutionInternalTimeServiceManager<K>(processingTimeService);
        keyedStatedBackend.registerKeySelectionListener(timeServiceManager);
        return timeServiceManager;
    }

    public void keySelected(K newKey) {
        try {
            for (BatchExecutionInternalTimeService<K, ?> value : this.timerServices.values()) {
                value.setCurrentKey(newKey);
            }
        }
        catch (Exception e) {
            throw new WrappingRuntimeException((Throwable)e);
        }
    }
}

