/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support;

import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
    private final boolean autoCommitEnabled;
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;

    public KafkaRecordProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
        this.autoCommitEnabled = configuration.isAutoCommitEnable();
        this.configuration = configuration;
        this.processor = processor;
        this.commitManager = commitManager;
    }

    private void setupExchangeMessage(Message message, ConsumerRecord record) {
        message.setHeader("kafka.PARTITION", (Object)record.partition());
        message.setHeader("kafka.TOPIC", (Object)record.topic());
        message.setHeader("kafka.OFFSET", (Object)record.offset());
        message.setHeader("kafka.HEADERS", (Object)record.headers());
        message.setHeader("kafka.TIMESTAMP", (Object)record.timestamp());
        message.setHeader("CamelMessageTimestamp", (Object)record.timestamp());
        if (record.key() != null) {
            message.setHeader("kafka.KEY", record.key());
        }
        LOG.debug("Setting up the exchange for message from partition {} and offset {}", (Object)record.partition(), (Object)record.offset());
        message.setBody(record.value());
    }

    private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), (Object)header.value(), exchange);
    }

    private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange) {
        HeaderFilterStrategy headerFilterStrategy = this.configuration.getHeaderFilterStrategy();
        KafkaHeaderDeserializer headerDeserializer = this.configuration.getHeaderDeserializer();
        StreamSupport.stream(record.headers().spliterator(), false).filter(header -> this.shouldBeFiltered((Header)header, exchange, headerFilterStrategy)).forEach(header -> exchange.getIn().setHeader(header.key(), headerDeserializer.deserialize(header.key(), header.value())));
    }

    public ProcessingResult processExchange(Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) {
        Message message = exchange.getMessage();
        this.setupExchangeMessage(message, record);
        this.propagateHeaders(record, exchange);
        if (!this.autoCommitEnabled) {
            message.setHeader("kafka.LAST_RECORD_BEFORE_COMMIT", (Object)(!recordHasNext ? 1 : 0));
            message.setHeader("kafka.LAST_POLL_RECORD", (Object)(!recordHasNext && !partitionHasNext ? 1 : 0));
        }
        if (this.configuration.isAllowManualCommit()) {
            KafkaManualCommit manual = this.commitManager.getManualCommit(exchange, topicPartition, record);
            message.setHeader("CamelKafkaManualCommit", (Object)manual);
            message.setHeader("kafka.LAST_POLL_RECORD", (Object)(!recordHasNext && !partitionHasNext ? 1 : 0));
        }
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        if (exchange.getException() != null) {
            LOG.debug("An exception was thrown for record at partition {} and offset {}", (Object)record.partition(), (Object)record.offset());
            boolean breakOnErrorExit = this.processException(exchange, topicPartition, record, lastResult, exceptionHandler);
            return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
        }
        return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null);
    }

    private boolean processException(Exchange exchange, TopicPartition topicPartition, ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) {
        if (this.configuration.isBreakOnFirstError()) {
            if (lastResult.getPartition() != -1L && lastResult.getPartition() != (long)record.partition()) {
                LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}.  The last result was on partition {} with offset {} but was expecting partition {} with offset {}", new Object[]{topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), record.partition(), record.offset()});
            }
            if (LOG.isWarnEnabled()) {
                Exception exc = exchange.getException();
                LOG.warn("Error during processing {} from topic: {} due to {}", new Object[]{exchange, topicPartition.topic(), exc.getMessage()});
                LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", (Object)record.offset(), (Object)record.partition());
            }
            if (lastResult.getPartitionLastOffset() != -1L) {
                this.commitManager.commit(topicPartition);
            }
            return true;
        }
        exceptionHandler.handleException("Error during processing", exchange, (Throwable)exchange.getException());
        return false;
    }

    public static String serializeOffsetKey(TopicPartition topicPartition) {
        return topicPartition.topic() + "/" + topicPartition.partition();
    }

    public static long deserializeOffsetValue(String offset) {
        return Long.parseLong(offset);
    }
}

