/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.RaftClient;
import org.slf4j.Logger;

public class ReplicatedCounter
implements RaftClient.Listener<Integer> {
    private final int nodeId;
    private final Logger log;
    private final RaftClient<Integer> client;
    private int committed;
    private int uncommitted;
    private Optional<Integer> claimedEpoch;

    public ReplicatedCounter(int nodeId, RaftClient<Integer> client, LogContext logContext) {
        this.nodeId = nodeId;
        this.client = client;
        this.log = logContext.logger(ReplicatedCounter.class);
        this.committed = 0;
        this.uncommitted = 0;
        this.claimedEpoch = Optional.empty();
    }

    public synchronized boolean isWritable() {
        return this.claimedEpoch.isPresent();
    }

    public synchronized void increment() {
        if (!this.claimedEpoch.isPresent()) {
            throw new KafkaException("Counter is not currently writable");
        }
        int epoch = this.claimedEpoch.get();
        ++this.uncommitted;
        Long offset = this.client.scheduleAppend(epoch, Collections.singletonList(this.uncommitted));
        if (offset != null) {
            this.log.debug("Scheduled append of record {} with epoch {} at offset {}", new Object[]{this.uncommitted, epoch, offset});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void handleCommit(BatchReader<Integer> reader) {
        try {
            int initialValue = this.committed;
            while (reader.hasNext()) {
                BatchReader.Batch batch = (BatchReader.Batch)reader.next();
                this.log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), (Object)batch.baseOffset());
                for (Integer value : batch.records()) {
                    if (value != this.committed + 1) {
                        throw new AssertionError((Object)("Expected next committed value to be " + (this.committed + 1) + ", but instead found " + value + " on node " + this.nodeId));
                    }
                    this.committed = value;
                }
            }
            this.log.debug("Counter incremented from {} to {}", (Object)initialValue, (Object)this.committed);
        }
        finally {
            reader.close();
        }
    }

    @Override
    public synchronized void handleClaim(int epoch) {
        this.log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", (Object)this.committed, (Object)epoch);
        this.uncommitted = this.committed;
        this.claimedEpoch = Optional.of(epoch);
    }

    @Override
    public synchronized void handleResign(int epoch) {
        this.log.debug("Counter uncommitted value reset after resigning leadership");
        this.uncommitted = -1;
        this.claimedEpoch = Optional.empty();
    }
}

