/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.vertx.mutiny.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class KafkaLatestCommit
extends ContextHolder
implements KafkaCommitHandler {
    private final ReactiveKafkaConsumer<?, ?> consumer;
    private final Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();

    public KafkaLatestCommit(Vertx vertx, KafkaConnectorIncomingConfiguration configuration, ReactiveKafkaConsumer<?, ?> consumer) {
        super(vertx.getDelegate(), configuration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000));
        this.consumer = consumer;
    }

    @Override
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> record) {
        this.runOnContext(() -> {
            HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
            TopicPartition key = new TopicPartition(record.getTopic(), record.getPartition());
            Long last = this.offsets.get(key);
            if (last == null || last < record.getOffset() + 1L) {
                this.offsets.put(key, record.getOffset() + 1L);
                map.put(key, new OffsetAndMetadata(record.getOffset() + 1L, null));
                this.consumer.commitAsync(map).subscribe().with(ignored -> {}, throwable -> KafkaLogging.log.failedToCommitAsync(key, record.getOffset() + 1L));
            }
        });
        return CompletableFuture.completedFuture(null);
    }
}

