/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.kafka.inbound.KafkaInboundEndpoint;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class KafkaMessageDrivenChannelAdapter<K, V>
extends MessageProducerSupport
implements KafkaInboundEndpoint,
OrderlyShutdownCapable,
Pausable {
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
    private final IntegrationBatchMessageListener batchListener = new IntegrationBatchMessageListener();
    private final ListenerMode mode;
    private RecordFilterStrategy<K, V> recordFilterStrategy;
    private boolean ackDiscarded;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private boolean filterInRetry;
    private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
    private boolean bindSourceRecord;
    private boolean containerDeliveryAttemptPresent;

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> messageListenerContainer) {
        this(messageListenerContainer, ListenerMode.record);
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> messageListenerContainer, ListenerMode mode) {
        Assert.notNull(messageListenerContainer, (String)"messageListenerContainer is required");
        Assert.isNull((Object)messageListenerContainer.getContainerProperties().getMessageListener(), (String)"Container must not already have a listener");
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.mode = mode;
        this.setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
        if (JacksonPresent.isJackson2Present()) {
            MessagingMessageConverter messageConverter = new MessagingMessageConverter();
            DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
            headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messageConverter.setHeaderMapper((KafkaHeaderMapper)headerMapper);
            this.recordListener.setMessageConverter((RecordMessageConverter)messageConverter);
            this.batchListener.setMessageConverter((RecordMessageConverter)messageConverter);
        }
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        if (messageConverter instanceof RecordMessageConverter) {
            RecordMessageConverter recordMessageConverter = (RecordMessageConverter)messageConverter;
            this.recordListener.setMessageConverter(recordMessageConverter);
        } else if (messageConverter instanceof BatchMessageConverter) {
            BatchMessageConverter batchMessageConverter = (BatchMessageConverter)messageConverter;
            this.batchListener.setBatchMessageConverter(batchMessageConverter);
        } else {
            throw new IllegalArgumentException("Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
        }
    }

    public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
        this.recordListener.setMessageConverter(messageConverter);
    }

    public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
        this.batchListener.setBatchMessageConverter(messageConverter);
    }

    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    public void setAckDiscarded(boolean ackDiscarded) {
        this.ackDiscarded = ackDiscarded;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.isTrue((retryTemplate == null || this.mode.equals((Object)ListenerMode.record) ? 1 : 0) != 0, (String)"Retry is not supported with mode=batch");
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setFilterInRetry(boolean filterInRetry) {
        this.filterInRetry = filterInRetry;
    }

    public void setPayloadType(Class<?> payloadType) {
        this.recordListener.setFallbackType(payloadType);
        this.batchListener.setFallbackType(payloadType);
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
        this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
        if (this.mode.equals((Object)ListenerMode.record)) {
            boolean doFilterInRetry;
            IntegrationRecordMessageListener listener = this.recordListener;
            boolean bl = doFilterInRetry = this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null;
            if (this.retryTemplate != null) {
                MessageChannel errorChannel = this.getErrorChannel();
                if (this.recoveryCallback != null && errorChannel != null) {
                    this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, this.getErrorMessageStrategy());
                }
            }
            if (!doFilterInRetry && this.recordFilterStrategy != null) {
                listener = new FilteringMessageListenerAdapter((MessageListener)listener, this.recordFilterStrategy, this.ackDiscarded);
            }
            containerProperties.setMessageListener((Object)listener);
        } else {
            IntegrationBatchMessageListener listener = this.batchListener;
            if (this.recordFilterStrategy != null) {
                listener = new FilteringBatchMessageListenerAdapter((BatchMessageListener)listener, this.recordFilterStrategy, this.ackDiscarded);
            }
            containerProperties.setMessageListener((Object)listener);
        }
        this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public void pause() {
        this.messageListenerContainer.pause();
    }

    public void resume() {
        this.messageListenerContainer.resume();
    }

    public boolean isPaused() {
        return this.messageListenerContainer.isContainerPaused();
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
        AttributeAccessor attributes;
        boolean needAttributes;
        boolean needHolder = ATTRIBUTES_HOLDER.get() == null && this.getErrorChannel() != null && (this.retryTemplate == null || conversionError);
        boolean bl = needAttributes = needHolder || this.retryTemplate != null;
        if (needHolder) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (needAttributes && (attributes = (AttributeAccessor)ATTRIBUTES_HOLDER.get()) != null) {
            attributes.setAttribute("inputMessage", message);
            attributes.setAttribute("kafka_data", record);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = (AttributeAccessor)ATTRIBUTES_HOLDER.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    private void sendMessageIfAny(Message<?> message, Object kafkaConsumedObject) {
        if (message != null) {
            try {
                this.sendMessage(message);
            }
            finally {
                if (this.retryTemplate == null) {
                    ATTRIBUTES_HOLDER.remove();
                }
            }
        } else {
            this.logger.debug(() -> "Converter returned a null message for: " + kafkaConsumedObject);
        }
    }

    public static enum ListenerMode {
        record,
        batch;

    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter<K, V> {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
                KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
            }
        }

        public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message message;
            try {
                message = this.toMessagingMessage(record, acknowledgment, consumer);
            }
            catch (RuntimeException ex) {
                ConversionException exception;
                if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
                    KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(record, null, true);
                }
                if (KafkaMessageDrivenChannelAdapter.this.sendErrorMessageIfNecessary(null, (Exception)(exception = new ConversionException("Failed to convert to message", record, (Throwable)ex)))) {
                    return;
                }
                throw ex;
            }
            RetryTemplate template = KafkaMessageDrivenChannelAdapter.this.retryTemplate;
            if (template != null) {
                KafkaMessageDrivenChannelAdapter.this.doWithRetry(template, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, record, acknowledgment, consumer, () -> {
                    if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry || this.passesFilter(record)) {
                        KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(this.enhanceHeadersAndSaveAttributes(message, record), record);
                    }
                });
            } else {
                KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(this.enhanceHeadersAndSaveAttributes(message, record), record);
            }
        }

        private boolean passesFilter(ConsumerRecord<K, V> record) {
            RecordFilterStrategy filter = KafkaMessageDrivenChannelAdapter.this.recordFilterStrategy;
            return filter == null || !filter.filter(record);
        }

        private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> record) {
            BiConsumer<String, Object> headersAcceptor;
            Supplier<Message> messageSupplier = () -> message;
            MessageHeaders messageHeaders = message.getHeaders();
            if (messageHeaders instanceof KafkaMessageHeaders) {
                KafkaMessageHeaders kafkaMessageHeaders = (KafkaMessageHeaders)messageHeaders;
                Map rawHeaders = kafkaMessageHeaders.getRawHeaders();
                headersAcceptor = rawHeaders::put;
            } else {
                MessageBuilder builder = MessageBuilder.fromMessage(message);
                headersAcceptor = (arg_0, arg_1) -> ((MessageBuilder)builder).setHeader(arg_0, arg_1);
                messageSupplier = () -> ((MessageBuilder)builder).build();
            }
            if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)KafkaInboundEndpoint.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
                headersAcceptor.accept("deliveryAttempt", deliveryAttempt);
            } else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
                Header header = record.headers().lastHeader("kafka_deliveryAttempt");
                headersAcceptor.accept("deliveryAttempt", new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
            }
            if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
                headersAcceptor.accept("sourceData", record);
            }
            Message messageToReturn = messageSupplier.get();
            KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(record, messageToReturn, false);
            return messageToReturn;
        }
    }

    private class IntegrationBatchMessageListener
    extends BatchMessagingMessageListenerAdapter<K, V> {
        IntegrationBatchMessageListener() {
            super(null, null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
                KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
            }
        }

        public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message = null;
            if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry) {
                message = this.toMessage(records, acknowledgment, consumer);
            }
            if (message != null) {
                KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(message, records);
            }
        }

        @Nullable
        private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message message = null;
            try {
                message = this.toMessagingMessage(records, acknowledgment, consumer);
                KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(records, message, false);
            }
            catch (RuntimeException ex) {
                ConversionException exception = new ConversionException("Failed to convert to message", new ArrayList(records), (Throwable)ex);
                MessageChannel errorChannel = KafkaMessageDrivenChannelAdapter.this.getErrorChannel();
                if (errorChannel != null) {
                    KafkaMessageDrivenChannelAdapter.this.getMessagingTemplate().send((Object)errorChannel, (Message)KafkaMessageDrivenChannelAdapter.this.buildErrorMessage(message, (Exception)exception));
                }
                throw ex;
            }
            return message;
        }
    }
}

