/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.BackOffHandler;
import org.springframework.kafka.listener.BatchListenerFailedException;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ExceptionClassifier;
import org.springframework.kafka.listener.FailedRecordProcessor;
import org.springframework.kafka.listener.FallbackBatchErrorHandler;
import org.springframework.kafka.listener.KafkaExceptionLogLevelAware;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RetryListener;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;

public abstract class FailedBatchProcessor
extends FailedRecordProcessor {
    private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
    private final CommonErrorHandler fallbackBatchHandler;

    public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, CommonErrorHandler fallbackHandler) {
        this(recoverer, backOff, null, fallbackHandler);
    }

    public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {
        super(recoverer, backOff, backOffHandler);
        this.fallbackBatchHandler = fallbackHandler;
    }

    @Override
    public void setRetryListeners(RetryListener ... listeners) {
        super.setRetryListeners(listeners);
        CommonErrorHandler commonErrorHandler = this.fallbackBatchHandler;
        if (commonErrorHandler instanceof FallbackBatchErrorHandler) {
            FallbackBatchErrorHandler handler = (FallbackBatchErrorHandler)commonErrorHandler;
            handler.setRetryListeners(listeners);
        }
    }

    @Override
    public void setLogLevel(KafkaException.Level logLevel) {
        super.setLogLevel(logLevel);
        CommonErrorHandler commonErrorHandler = this.fallbackBatchHandler;
        if (commonErrorHandler instanceof KafkaExceptionLogLevelAware) {
            KafkaExceptionLogLevelAware handler = (KafkaExceptionLogLevelAware)((Object)commonErrorHandler);
            handler.setLogLevel(logLevel);
        }
    }

    @Override
    protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
        CommonErrorHandler commonErrorHandler = this.fallbackBatchHandler;
        if (commonErrorHandler instanceof ExceptionClassifier) {
            ExceptionClassifier handler = (ExceptionClassifier)((Object)commonErrorHandler);
            notRetryable.forEach(ex -> handler.addNotRetryableExceptions((Class<? extends Exception>)ex));
        }
    }

    @Override
    public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
        super.setClassifications(classifications, defaultValue);
        CommonErrorHandler commonErrorHandler = this.fallbackBatchHandler;
        if (commonErrorHandler instanceof ExceptionClassifier) {
            ExceptionClassifier handler = (ExceptionClassifier)((Object)commonErrorHandler);
            handler.setClassifications(classifications, defaultValue);
        }
    }

    @Override
    @Nullable
    public Boolean removeClassification(Class<? extends Exception> exceptionType) {
        Boolean removed = super.removeClassification(exceptionType);
        CommonErrorHandler commonErrorHandler = this.fallbackBatchHandler;
        if (commonErrorHandler instanceof ExceptionClassifier) {
            ExceptionClassifier handler = (ExceptionClassifier)((Object)commonErrorHandler);
            handler.removeClassification(exceptionType);
        }
        return removed;
    }

    protected CommonErrorHandler getFallbackBatchHandler() {
        return this.fallbackBatchHandler;
    }

    protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        this.handle(thrownException, data, consumer, container, invokeListener);
    }

    protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        BatchListenerFailedException batchListenerFailedException = this.getBatchListenerFailedException(thrownException);
        if (batchListenerFailedException == null) {
            this.logger.debug((Throwable)thrownException, (CharSequence)"Expected a BatchListenerFailedException; re-delivering full batch");
            this.fallback(thrownException, data, consumer, container, invokeListener);
        } else {
            int index;
            this.getRetryListeners().forEach(listener -> listener.failedDelivery(data, thrownException, 1));
            ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
            int n = index = record != null ? this.findIndex(data, record) : batchListenerFailedException.getIndex();
            if (index < 0 || index >= data.count()) {
                this.logger.warn((Throwable)((Object)batchListenerFailedException), () -> {
                    if (record != null) {
                        return String.format("Record not found in batch: %s-%d@%d; re-seeking batch", record.topic(), record.partition(), record.offset());
                    }
                    return String.format("Record not found in batch, index %d out of bounds (0, %d); re-seeking batch", index, data.count() - 1);
                });
                this.fallback(thrownException, data, consumer, container, invokeListener);
            } else {
                return this.seekOrRecover(thrownException, data, consumer, container, index);
            }
        }
        return ConsumerRecords.empty();
    }

    private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
    }

    private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
        ConsumerRecord candidate;
        if (record == null) {
            return -1;
        }
        int i = 0;
        Iterator iterator = data.iterator();
        while (iterator.hasNext() && (!(candidate = (ConsumerRecord)iterator.next()).topic().equals(record.topic()) || candidate.partition() != record.partition() || candidate.offset() != record.offset())) {
            ++i;
        }
        return i;
    }

    private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, int indexArg) {
        if (data == null) {
            return ConsumerRecords.empty();
        }
        Iterator iterator = data.iterator();
        ArrayList<ConsumerRecord> toCommit = new ArrayList<ConsumerRecord>();
        ArrayList remaining = new ArrayList();
        int index = indexArg;
        while (iterator.hasNext()) {
            ConsumerRecord record = (ConsumerRecord)iterator.next();
            if (index-- > 0) {
                toCommit.add(record);
                continue;
            }
            remaining.add(record);
        }
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()), (key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1L)));
        if (offsets.size() > 0) {
            this.commit(consumer, container, offsets);
        }
        if (this.isSeekAfterError()) {
            if (remaining.size() > 0) {
                SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, this.getFailureTracker()::recovered, this.logger, this.getLogLevel());
                ConsumerRecord recovered = (ConsumerRecord)remaining.get(0);
                this.commit(consumer, container, Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1L)));
                if (remaining.size() > 1) {
                    throw new KafkaException("Seek to current after exception", this.getLogLevel(), thrownException);
                }
            }
            return ConsumerRecords.empty();
        }
        if (indexArg == 0) {
            return data;
        }
        try {
            if (this.getFailureTracker().recovered((ConsumerRecord)remaining.get(0), thrownException, container, consumer)) {
                remaining.remove(0);
            }
        }
        catch (Exception recovered) {
            // empty catch block
        }
        HashMap remains = new HashMap();
        remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), tp -> new ArrayList()).add(rec));
        return new ConsumerRecords(remains);
    }

    private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
        boolean syncCommits = container.getContainerProperties().isSyncCommits();
        Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
        if (syncCommits) {
            consumer.commitSync(offsets, timeout);
        } else {
            OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
            if (commitCallback == null) {
                commitCallback = LOGGING_COMMIT_CALLBACK;
            }
            consumer.commitAsync(offsets, commitCallback);
        }
    }

    @Nullable
    private BatchListenerFailedException getBatchListenerFailedException(Throwable throwableArg) {
        if (throwableArg == null || throwableArg instanceof BatchListenerFailedException) {
            return (BatchListenerFailedException)((Object)throwableArg);
        }
        BatchListenerFailedException target = null;
        Throwable throwable = throwableArg;
        HashSet<Throwable> checked = new HashSet<Throwable>();
        while (throwable.getCause() != null && !checked.contains(throwable.getCause())) {
            throwable = throwable.getCause();
            checked.add(throwable);
            if (!(throwable instanceof BatchListenerFailedException)) continue;
            target = (BatchListenerFailedException)((Object)throwable);
            break;
        }
        return target;
    }
}

