/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.MonoSink;

class CommittableBatch {
    final Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
    private final Map<TopicPartition, List<Long>> uncommitted = new HashMap<TopicPartition, List<Long>>();
    private final Map<TopicPartition, List<Long>> deferred = new HashMap<TopicPartition, List<Long>>();
    private final Map<TopicPartition, Long> latestOffsets = new HashMap<TopicPartition, Long>();
    boolean outOfOrderCommits;
    private int batchSize;
    private List<MonoSink<Void>> callbackEmitters = new ArrayList<MonoSink<Void>>();

    CommittableBatch() {
    }

    public synchronized int updateOffset(TopicPartition topicPartition, long offset) {
        if (this.outOfOrderCommits) {
            this.deferred.computeIfAbsent(topicPartition, tp -> new LinkedList()).add(offset);
            ++this.batchSize;
        } else if (!Long.valueOf(offset).equals(this.consumedOffsets.put(topicPartition, offset))) {
            ++this.batchSize;
        }
        return this.batchSize;
    }

    public synchronized void addCallbackEmitter(MonoSink<Void> emitter) {
        this.callbackEmitters.add(emitter);
    }

    public synchronized boolean isEmpty() {
        return this.batchSize == 0;
    }

    public synchronized int batchSize() {
        return this.batchSize;
    }

    public synchronized void addUncommitted(ConsumerRecords<?, ?> records) {
        records.partitions().forEach(tp -> {
            List offsets = this.uncommitted.computeIfAbsent((TopicPartition)tp, part -> new LinkedList());
            records.records(tp).forEach(rec -> offsets.add(rec.offset()));
        });
    }

    public synchronized int deferredCount() {
        int count = 0;
        for (List<Long> offsets : this.deferred.values()) {
            count += offsets.size();
        }
        return count;
    }

    public synchronized CommitArgs getAndClearOffsets() {
        List<MonoSink<Void>> currentCallbackEmitters;
        HashMap<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        if (this.outOfOrderCommits) {
            this.deferred.forEach((tp, offsets) -> {
                if (offsets.size() > 0) {
                    Collections.sort(offsets);
                    List<Long> uncomittedThisPart = this.uncommitted.get(tp);
                    long lastThisPart = -1L;
                    while (offsets.size() > 0 && ((Long)offsets.get(0)).equals(uncomittedThisPart.get(0))) {
                        lastThisPart = (Long)offsets.get(0);
                        offsets.remove(0);
                        uncomittedThisPart.remove(0);
                    }
                    if (lastThisPart >= 0L) {
                        offsetMap.put((TopicPartition)tp, new OffsetAndMetadata(lastThisPart + 1L));
                    }
                }
            });
            this.batchSize = this.deferredCount();
        } else {
            this.latestOffsets.putAll(this.consumedOffsets);
            Iterator<Map.Entry<TopicPartition, Long>> iterator = this.consumedOffsets.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<TopicPartition, Long> entry = iterator.next();
                offsetMap.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
                iterator.remove();
            }
            this.batchSize = 0;
        }
        if (!this.callbackEmitters.isEmpty()) {
            currentCallbackEmitters = this.callbackEmitters;
            this.callbackEmitters = new ArrayList<MonoSink<Void>>();
        } else {
            currentCallbackEmitters = null;
        }
        return new CommitArgs(offsetMap, currentCallbackEmitters);
    }

    public synchronized void restoreOffsets(CommitArgs commitArgs, boolean restoreCallbackEmitters) {
        if (this.outOfOrderCommits) {
            commitArgs.offsets.forEach((tp, offset) -> {
                this.deferred.get(tp).add(0, offset.offset() - 1L);
                this.uncommitted.get(tp).add(0, offset.offset() - 1L);
            });
        } else {
            for (Map.Entry entry : commitArgs.offsets.entrySet()) {
                TopicPartition topicPart = (TopicPartition)entry.getKey();
                long offset2 = ((OffsetAndMetadata)entry.getValue()).offset();
                Long latestOffset = this.latestOffsets.get(topicPart);
                if (latestOffset != null && latestOffset > offset2 - 1L) continue;
                this.consumedOffsets.putIfAbsent(topicPart, offset2 - 1L);
            }
        }
        if (restoreCallbackEmitters && commitArgs.callbackEmitters != null) {
            this.callbackEmitters = commitArgs.callbackEmitters;
        }
    }

    public synchronized String toString() {
        return String.valueOf(this.consumedOffsets);
    }

    public static class CommitArgs {
        private Map<TopicPartition, OffsetAndMetadata> offsets;
        private List<MonoSink<Void>> callbackEmitters;

        CommitArgs(Map<TopicPartition, OffsetAndMetadata> offsets, List<MonoSink<Void>> callbackEmitters) {
            this.offsets = offsets;
            this.callbackEmitters = callbackEmitters;
        }

        public Map<TopicPartition, OffsetAndMetadata> offsets() {
            return this.offsets;
        }

        List<MonoSink<Void>> callbackEmitters() {
            return this.callbackEmitters;
        }
    }
}

