/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.ElectionState;
import org.apache.kafka.raft.EpochState;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.slf4j.Logger;

public class LeaderState<T>
implements EpochState {
    static final long OBSERVER_SESSION_TIMEOUT_MS = 300000L;
    private final int localId;
    private final int epoch;
    private final long epochStartOffset;
    private final Set<Integer> grantingVoters;
    private Optional<LogOffsetMetadata> highWatermark = Optional.empty();
    private final Map<Integer, ReplicaState> voterStates = new HashMap<Integer, ReplicaState>();
    private final Map<Integer, ReplicaState> observerStates = new HashMap<Integer, ReplicaState>();
    private final Logger log;
    private final BatchAccumulator<T> accumulator;
    private volatile boolean resignRequested = false;

    protected LeaderState(int localId, int epoch, long epochStartOffset, Set<Integer> voters, Set<Integer> grantingVoters, BatchAccumulator<T> accumulator, LogContext logContext) {
        this.localId = localId;
        this.epoch = epoch;
        this.epochStartOffset = epochStartOffset;
        for (int voterId : voters) {
            boolean hasAcknowledgedLeader = voterId == localId;
            this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader));
        }
        this.grantingVoters = Collections.unmodifiableSet(new HashSet<Integer>(grantingVoters));
        this.log = logContext.logger(LeaderState.class);
        this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
    }

    public BatchAccumulator<T> accumulator() {
        return this.accumulator;
    }

    private static List<LeaderChangeMessage.Voter> convertToVoters(Set<Integer> voterIds) {
        return voterIds.stream().map(follower -> new LeaderChangeMessage.Voter().setVoterId(follower.intValue())).collect(Collectors.toList());
    }

    public void appendLeaderChangeMessage(long currentTimeMs) {
        List<LeaderChangeMessage.Voter> voters = LeaderState.convertToVoters(this.voterStates.keySet());
        List<LeaderChangeMessage.Voter> grantingVoters = LeaderState.convertToVoters(this.grantingVoters());
        LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage().setVersion((short)0).setLeaderId(this.election().leaderId()).setVoters(voters).setGrantingVoters(grantingVoters);
        this.accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs);
        this.accumulator.forceDrain();
    }

    public boolean isResignRequested() {
        return this.resignRequested;
    }

    public void requestResign() {
        this.resignRequested = true;
    }

    @Override
    public Optional<LogOffsetMetadata> highWatermark() {
        return this.highWatermark;
    }

    @Override
    public ElectionState election() {
        return ElectionState.withElectedLeader(this.epoch, this.localId, this.voterStates.keySet());
    }

    @Override
    public int epoch() {
        return this.epoch;
    }

    public Set<Integer> grantingVoters() {
        return this.grantingVoters;
    }

    public int localId() {
        return this.localId;
    }

    public Set<Integer> nonAcknowledgingVoters() {
        HashSet<Integer> nonAcknowledging = new HashSet<Integer>();
        for (ReplicaState state : this.voterStates.values()) {
            if (state.hasAcknowledgedLeader) continue;
            nonAcknowledging.add(state.nodeId);
        }
        return nonAcknowledging;
    }

    private boolean maybeUpdateHighWatermark() {
        List<ReplicaState> followersByDescendingFetchOffset = this.followersByDescendingFetchOffset();
        int indexOfHw = this.voterStates.size() / 2;
        Optional<LogOffsetMetadata> highWatermarkUpdateOpt = followersByDescendingFetchOffset.get((int)indexOfHw).endOffset;
        if (highWatermarkUpdateOpt.isPresent()) {
            LogOffsetMetadata highWatermarkUpdateMetadata = highWatermarkUpdateOpt.get();
            long highWatermarkUpdateOffset = highWatermarkUpdateMetadata.offset;
            if (highWatermarkUpdateOffset > this.epochStartOffset) {
                if (this.highWatermark.isPresent()) {
                    LogOffsetMetadata currentHighWatermarkMetadata = this.highWatermark.get();
                    if (highWatermarkUpdateOffset > currentHighWatermarkMetadata.offset || highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset && !highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata)) {
                        Optional<LogOffsetMetadata> oldHighWatermark = this.highWatermark;
                        this.highWatermark = highWatermarkUpdateOpt;
                        this.logHighWatermarkUpdate(oldHighWatermark, highWatermarkUpdateMetadata, indexOfHw, followersByDescendingFetchOffset);
                        return true;
                    }
                    if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) {
                        this.log.error("The latest computed high watermark {} is smaller than the current value {}, which suggests that one of the voters has lost committed data. Full voter replication state: {}", new Object[]{highWatermarkUpdateOffset, currentHighWatermarkMetadata.offset, this.voterStates.values()});
                        return false;
                    }
                    return false;
                }
                Optional<LogOffsetMetadata> oldHighWatermark = this.highWatermark;
                this.highWatermark = highWatermarkUpdateOpt;
                this.logHighWatermarkUpdate(oldHighWatermark, highWatermarkUpdateMetadata, indexOfHw, followersByDescendingFetchOffset);
                return true;
            }
        }
        return false;
    }

    private void logHighWatermarkUpdate(Optional<LogOffsetMetadata> oldHighWatermark, LogOffsetMetadata newHighWatermark, int indexOfHw, List<ReplicaState> followersByDescendingFetchOffset) {
        if (oldHighWatermark.isPresent()) {
            this.log.debug("High watermark set to {} from {} based on indexOfHw {} and voters {}", new Object[]{newHighWatermark, oldHighWatermark.get(), indexOfHw, followersByDescendingFetchOffset});
        } else {
            this.log.info("High watermark set to {} for the first time for epoch {} based on indexOfHw {} and voters {}", new Object[]{newHighWatermark, this.epoch, indexOfHw, followersByDescendingFetchOffset});
        }
    }

    public boolean updateLocalState(LogOffsetMetadata endOffsetMetadata) {
        ReplicaState state = this.getOrCreateReplicaState(this.localId);
        state.endOffset.ifPresent(currentEndOffset -> {
            if (currentEndOffset.offset > endOffsetMetadata.offset) {
                throw new IllegalStateException("Detected non-monotonic update of local end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
            }
        });
        state.updateLeaderState(endOffsetMetadata);
        return this.maybeUpdateHighWatermark();
    }

    public boolean updateReplicaState(int replicaId, long currentTimeMs, LogOffsetMetadata fetchOffsetMetadata) {
        if (replicaId < 0) {
            return false;
        }
        if (replicaId == this.localId) {
            throw new IllegalStateException("Remote replica ID " + replicaId + " matches the local leader ID");
        }
        ReplicaState state = this.getOrCreateReplicaState(replicaId);
        state.endOffset.ifPresent(currentEndOffset -> {
            if (currentEndOffset.offset > fetchOffsetMetadata.offset) {
                this.log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}", new Object[]{state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset});
            }
        });
        Optional<LogOffsetMetadata> leaderEndOffsetOpt = this.voterStates.get((Object)Integer.valueOf((int)this.localId)).endOffset;
        state.updateFollowerState(currentTimeMs, fetchOffsetMetadata, leaderEndOffsetOpt);
        return this.isVoter(state.nodeId) && this.maybeUpdateHighWatermark();
    }

    public List<Integer> nonLeaderVotersByDescendingFetchOffset() {
        return this.followersByDescendingFetchOffset().stream().filter(state -> state.nodeId != this.localId).map(state -> state.nodeId).collect(Collectors.toList());
    }

    private List<ReplicaState> followersByDescendingFetchOffset() {
        return new ArrayList<ReplicaState>(this.voterStates.values()).stream().sorted().collect(Collectors.toList());
    }

    public void addAcknowledgementFrom(int remoteNodeId) {
        ReplicaState voterState = this.ensureValidVoter(remoteNodeId);
        voterState.hasAcknowledgedLeader = true;
    }

    private ReplicaState ensureValidVoter(int remoteNodeId) {
        ReplicaState state = this.voterStates.get(remoteNodeId);
        if (state == null) {
            throw new IllegalArgumentException("Unexpected acknowledgement from non-voter " + remoteNodeId);
        }
        return state;
    }

    public long epochStartOffset() {
        return this.epochStartOffset;
    }

    private ReplicaState getOrCreateReplicaState(int remoteNodeId) {
        ReplicaState state = this.voterStates.get(remoteNodeId);
        if (state == null) {
            this.observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false));
            return this.observerStates.get(remoteNodeId);
        }
        return state;
    }

    public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) {
        this.clearInactiveObservers(currentTimeMs);
        return new DescribeQuorumResponseData.PartitionData().setErrorCode(Errors.NONE.code()).setLeaderId(this.localId).setLeaderEpoch(this.epoch).setHighWatermark(this.highWatermark.map(offsetMetadata -> offsetMetadata.offset).orElse(-1L).longValue()).setCurrentVoters(this.describeReplicaStates(this.voterStates, currentTimeMs)).setObservers(this.describeReplicaStates(this.observerStates, currentTimeMs));
    }

    private List<DescribeQuorumResponseData.ReplicaState> describeReplicaStates(Map<Integer, ReplicaState> state, long currentTimeMs) {
        return state.values().stream().map(replicaState -> this.describeReplicaState((ReplicaState)replicaState, currentTimeMs)).collect(Collectors.toList());
    }

    private DescribeQuorumResponseData.ReplicaState describeReplicaState(ReplicaState replicaState, long currentTimeMs) {
        long lastFetchTimestamp;
        long lastCaughtUpTimestamp;
        if (replicaState.nodeId == this.localId) {
            lastCaughtUpTimestamp = currentTimeMs;
            lastFetchTimestamp = currentTimeMs;
        } else {
            lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp;
            lastFetchTimestamp = replicaState.lastFetchTimestamp;
        }
        return new DescribeQuorumResponseData.ReplicaState().setReplicaId(replicaState.nodeId).setLogEndOffset(replicaState.endOffset.map(md -> md.offset).orElse(-1L).longValue()).setLastCaughtUpTimestamp(lastCaughtUpTimestamp).setLastFetchTimestamp(lastFetchTimestamp);
    }

    private void clearInactiveObservers(long currentTimeMs) {
        this.observerStates.entrySet().removeIf(integerReplicaStateEntry -> currentTimeMs - ((ReplicaState)integerReplicaStateEntry.getValue()).lastFetchTimestamp >= 300000L);
    }

    private boolean isVoter(int remoteNodeId) {
        return this.voterStates.containsKey(remoteNodeId);
    }

    @Override
    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
        this.log.debug("Rejecting vote request from candidate {} since we are already leader in epoch {}", (Object)candidateId, (Object)this.epoch);
        return false;
    }

    public String toString() {
        return String.format("Leader(localId=%d, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s)", this.localId, this.epoch, this.epochStartOffset, this.highWatermark, this.voterStates);
    }

    @Override
    public String name() {
        return "Leader";
    }

    @Override
    public void close() {
        this.accumulator.close();
    }

    private static class ReplicaState
    implements Comparable<ReplicaState> {
        final int nodeId;
        Optional<LogOffsetMetadata> endOffset;
        long lastFetchTimestamp;
        long lastFetchLeaderLogEndOffset;
        long lastCaughtUpTimestamp;
        boolean hasAcknowledgedLeader;

        public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
            this.nodeId = nodeId;
            this.endOffset = Optional.empty();
            this.lastFetchTimestamp = -1L;
            this.lastFetchLeaderLogEndOffset = -1L;
            this.lastCaughtUpTimestamp = -1L;
            this.hasAcknowledgedLeader = hasAcknowledgedLeader;
        }

        void updateLeaderState(LogOffsetMetadata endOffsetMetadata) {
            this.endOffset = Optional.of(endOffsetMetadata);
        }

        void updateFollowerState(long currentTimeMs, LogOffsetMetadata fetchOffsetMetadata, Optional<LogOffsetMetadata> leaderEndOffsetOpt) {
            leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
                if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
                    this.lastCaughtUpTimestamp = Math.max(this.lastCaughtUpTimestamp, currentTimeMs);
                } else if (this.lastFetchLeaderLogEndOffset > 0L && fetchOffsetMetadata.offset >= this.lastFetchLeaderLogEndOffset) {
                    this.lastCaughtUpTimestamp = Math.max(this.lastCaughtUpTimestamp, this.lastFetchTimestamp);
                }
                this.lastFetchLeaderLogEndOffset = leaderEndOffset.offset;
            });
            this.lastFetchTimestamp = Math.max(this.lastFetchTimestamp, currentTimeMs);
            this.endOffset = Optional.of(fetchOffsetMetadata);
            this.hasAcknowledgedLeader = true;
        }

        @Override
        public int compareTo(ReplicaState that) {
            if (this.endOffset.equals(that.endOffset)) {
                return Integer.compare(this.nodeId, that.nodeId);
            }
            if (!this.endOffset.isPresent()) {
                return 1;
            }
            if (!that.endOffset.isPresent()) {
                return -1;
            }
            return Long.compare(that.endOffset.get().offset, this.endOffset.get().offset);
        }

        public String toString() {
            return String.format("ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)", this.nodeId, this.endOffset, this.lastFetchTimestamp, this.lastCaughtUpTimestamp, this.hasAcknowledgedLeader);
        }
    }
}

