/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.ReaderConsumeProgressEvent;
import org.apache.paimon.flink.utils.TableScanUtils;

public class ConsumerProgressCalculator {
    private final TreeMap<Long, Long> minNextSnapshotPerCheckpoint = new TreeMap();
    private final Map<Integer, Long> assignedSnapshotPerReader;
    private final Map<Integer, Long> consumingSnapshotPerReader;

    public ConsumerProgressCalculator(int parallelism) {
        this.assignedSnapshotPerReader = new HashMap<Integer, Long>(parallelism);
        this.consumingSnapshotPerReader = new HashMap<Integer, Long>(parallelism);
    }

    public void updateConsumeProgress(int subtaskId, ReaderConsumeProgressEvent event) {
        this.consumingSnapshotPerReader.put(subtaskId, event.lastConsumeSnapshotId());
    }

    public void updateAssignInformation(int subtaskId, FileStoreSourceSplit split) {
        TableScanUtils.getSnapshotId(split).ifPresent(snapshotId -> this.assignedSnapshotPerReader.put(subtaskId, (Long)snapshotId));
    }

    public void notifySnapshotState(long checkpointId, Set<Integer> readersAwaitingSplit, Function<Integer, Long> unassignedCalculationFunction, int parallelism) {
        this.computeMinNextSnapshotId(readersAwaitingSplit, unassignedCalculationFunction, parallelism).ifPresent(minNextSnapshotId -> this.minNextSnapshotPerCheckpoint.put(checkpointId, (Long)minNextSnapshotId));
    }

    public OptionalLong notifyCheckpointComplete(long checkpointId) {
        NavigableMap<Long, Long> nextSnapshots = this.minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
        OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
        nextSnapshots.clear();
        return max;
    }

    private Optional<Long> computeMinNextSnapshotId(Set<Integer> readersAwaitingSplit, Function<Integer, Long> unassignedCalculationFunction, int parallelism) {
        long globalMinSnapshotId = Long.MAX_VALUE;
        for (int subtask = 0; subtask < parallelism; ++subtask) {
            Long snapshotIdForSubtask;
            if (readersAwaitingSplit.contains(subtask)) {
                snapshotIdForSubtask = unassignedCalculationFunction.apply(subtask);
            } else {
                Long consumingSnapshotId = this.consumingSnapshotPerReader.get(subtask);
                Long assignedSnapshotId = this.assignedSnapshotPerReader.get(subtask);
                if (consumingSnapshotId != null && assignedSnapshotId != null) {
                    snapshotIdForSubtask = Math.max(consumingSnapshotId, assignedSnapshotId);
                } else {
                    Long l = snapshotIdForSubtask = consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
                }
            }
            if (snapshotIdForSubtask == null) {
                return Optional.empty();
            }
            globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForSubtask);
        }
        return Optional.of(globalMinSnapshotId);
    }
}

