/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class KafkaTemplate<K, V>
implements KafkaOperations<K, V>,
ApplicationContextAware,
BeanNameAware,
ApplicationListener<ContextStoppedEvent>,
DisposableBean {
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final ProducerFactory<K, V> producerFactory;
    private final boolean customProducerFactory;
    private final boolean autoFlush;
    private final boolean transactional;
    private final ThreadLocal<Producer<K, V>> producers = new ThreadLocal();
    private final Map<String, String> micrometerTags = new HashMap<String, String>();
    private String beanName = "kafkaTemplate";
    private ApplicationContext applicationContext;
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private String defaultTopic;
    private ProducerListener<K, V> producerListener = new LoggingProducerListener();
    private String transactionIdPrefix;
    private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
    private boolean allowNonTransactional;
    private boolean converterSet;
    private ConsumerFactory<K, V> consumerFactory;
    private volatile boolean micrometerEnabled = true;
    private volatile MicrometerHolder micrometerHolder;

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
        this(producerFactory, false, configOverrides);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
        this(producerFactory, autoFlush, null);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) {
        Assert.notNull(producerFactory, (String)"'producerFactory' cannot be null");
        this.autoFlush = autoFlush;
        this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
        this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;
        this.producerFactory = this.customProducerFactory ? producerFactory.copyWithConfigurationOverride(configOverrides) : producerFactory;
        this.transactional = this.producerFactory.transactionCapable();
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory)this.producerFactory).setApplicationContext(applicationContext);
        }
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public void setProducerListener(@Nullable ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.messageConverter = messageConverter;
        this.converterSet = true;
    }

    public void setMessagingConverter(SmartMessageConverter messageConverter) {
        Assert.isTrue((!this.converterSet ? 1 : 0) != 0, (String)"Cannot set the SmartMessageConverter when setting the messageConverter, add the SmartConverter to the message converter instead");
        ((MessagingMessageConverter)this.messageConverter).setMessagingConverter(messageConverter);
    }

    @Override
    public boolean isTransactional() {
        return this.transactional;
    }

    public String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setTransactionIdPrefix(String transactionIdPrefix) {
        this.transactionIdPrefix = transactionIdPrefix;
    }

    public void setCloseTimeout(Duration closeTimeout) {
        Assert.notNull((Object)closeTimeout, (String)"'closeTimeout' cannot be null");
        this.closeTimeout = closeTimeout;
    }

    public void setAllowNonTransactional(boolean allowNonTransactional) {
        this.allowNonTransactional = allowNonTransactional;
    }

    @Override
    public boolean isAllowNonTransactional() {
        return this.allowNonTransactional;
    }

    public void setMicrometerEnabled(boolean micrometerEnabled) {
        this.micrometerEnabled = micrometerEnabled;
    }

    public void setMicrometerTags(Map<String, String> tags) {
        if (tags != null) {
            this.micrometerTags.putAll(tags);
        }
    }

    @Override
    public ProducerFactory<K, V> getProducerFactory() {
        return this.producerFactory;
    }

    protected ProducerFactory<K, V> getProducerFactory(String topic) {
        return this.producerFactory;
    }

    public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public void onApplicationEvent(ContextStoppedEvent event) {
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory)this.producerFactory).onApplicationEvent(event);
        }
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
        return this.send(this.defaultTopic, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {
        return this.send(this.defaultTopic, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
        return this.send(this.defaultTopic, partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {
        return this.send(this.defaultTopic, partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, timestamp, key, data);
        return this.doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
        Assert.notNull(record, (String)"'record' cannot be null");
        return this.doSend(record);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        byte[] correlationId;
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!producerRecord.headers().iterator().hasNext() && (correlationId = (byte[])message.getHeaders().get((Object)"kafka_correlationId", byte[].class)) != null) {
            producerRecord.headers().add("kafka_correlationId", correlationId);
        }
        return this.doSend(producerRecord);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        Producer<K, V> producer = this.getTheProducer();
        try {
            List list = producer.partitionsFor(topic);
            return list;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        Producer<K, V> producer = this.getTheProducer();
        try {
            Map map = producer.metrics();
            return map;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> T execute(KafkaOperations.ProducerCallback<K, V, T> callback) {
        Assert.notNull(callback, (String)"'callback' cannot be null");
        Producer<K, V> producer = this.getTheProducer();
        try {
            T t = callback.doInKafka(producer);
            return t;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K, V, T> callback) {
        String transactionIdSuffix;
        Assert.notNull(callback, (String)"'callback' cannot be null");
        Assert.state((boolean)this.transactional, (String)"Producer factory does not support transactions");
        Producer<K, V> producer = this.producers.get();
        Assert.state((producer == null ? 1 : 0) != 0, (String)"Nested calls to 'executeInTransaction' are not allowed");
        if (this.producerFactory.isProducerPerConsumerPartition()) {
            transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();
            TransactionSupport.clearTransactionIdSuffix();
        } else {
            transactionIdSuffix = null;
        }
        producer = this.producerFactory.createProducer(this.transactionIdPrefix);
        try {
            producer.beginTransaction();
        }
        catch (Exception e) {
            this.closeProducer(producer, false);
            throw e;
        }
        this.producers.set(producer);
        try {
            T result = callback.doInOperations(this);
            try {
                producer.commitTransaction();
            }
            catch (Exception e) {
                throw new SkipAbortException(e);
            }
            T t = result;
            return t;
        }
        catch (SkipAbortException e) {
            throw (RuntimeException)e.getCause();
        }
        catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
        finally {
            if (transactionIdSuffix != null) {
                TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);
            }
            this.producers.remove();
            this.closeProducer(producer, false);
        }
    }

    @Override
    public void flush() {
        Producer<K, V> producer = this.getTheProducer();
        try {
            producer.flush();
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    @Deprecated
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId());
    }

    @Override
    @Deprecated
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        this.producerForOffsets().sendOffsetsToTransaction(offsets, consumerGroupId);
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) {
        this.producerForOffsets().sendOffsetsToTransaction(offsets, groupMetadata);
    }

    @Override
    @Nullable
    public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
        Properties props = this.oneOnly();
        try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props);){
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            ConsumerRecord<K, V> consumerRecord = this.receiveOne(topicPartition, offset, pollTimeout, consumer);
            return consumerRecord;
        }
    }

    @Override
    public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
        Properties props = this.oneOnly();
        LinkedHashMap records = new LinkedHashMap();
        try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props);){
            requested.forEach(tpo -> {
                ConsumerRecord<K, V> one = this.receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
                records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList()).add(one);
            });
            ConsumerRecords consumerRecords = new ConsumerRecords(records);
            return consumerRecords;
        }
    }

    private Properties oneOnly() {
        Assert.notNull(this.consumerFactory, (String)"A consumerFactory is required");
        Properties props = new Properties();
        props.setProperty("max.poll.records", "1");
        return props;
    }

    @Nullable
    private ConsumerRecord<K, V> receiveOne(TopicPartition topicPartition, long offset, Duration pollTimeout, Consumer<K, V> consumer) {
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seek(topicPartition, offset);
        ConsumerRecords records = consumer.poll(pollTimeout);
        if (records.count() == 1) {
            return (ConsumerRecord)records.iterator().next();
        }
        return null;
    }

    private Producer<K, V> producerForOffsets() {
        Producer<K, V> producer = this.producers.get();
        if (producer == null) {
            KafkaResourceHolder resourceHolder = (KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(this.producerFactory));
            Assert.isTrue((resourceHolder != null ? 1 : 0) != 0, (String)"No transaction in process");
            producer = resourceHolder.getProducer();
        }
        return producer;
    }

    protected void closeProducer(Producer<K, V> producer, boolean inTx) {
        if (!inTx) {
            producer.close(this.closeTimeout);
        }
    }

    protected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord) {
        Future sendFuture;
        Producer<K, V> producer = this.getTheProducer(producerRecord.topic());
        this.logger.trace(() -> "Sending: " + producerRecord);
        SettableListenableFuture future = new SettableListenableFuture();
        Object sample = null;
        if (this.micrometerEnabled && this.micrometerHolder == null) {
            this.micrometerHolder = this.obtainMicrometerHolder();
        }
        if (this.micrometerHolder != null) {
            sample = this.micrometerHolder.start();
        }
        if ((sendFuture = producer.send(producerRecord, this.buildCallback(producerRecord, producer, future, sample))).isDone()) {
            try {
                sendFuture.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted", e);
            }
            catch (ExecutionException e) {
                throw new KafkaException("Send failed", e.getCause());
            }
        }
        if (this.autoFlush) {
            this.flush();
        }
        this.logger.trace(() -> "Sent: " + producerRecord);
        return future;
    }

    private Callback buildCallback(ProducerRecord<K, V> producerRecord, Producer<K, V> producer, SettableListenableFuture<SendResult<K, V>> future, @Nullable Object sample) {
        return (metadata, exception) -> {
            try {
                if (exception == null) {
                    if (sample != null) {
                        this.micrometerHolder.success(sample);
                    }
                    future.set(new SendResult(producerRecord, metadata));
                    if (this.producerListener != null) {
                        this.producerListener.onSuccess(producerRecord, metadata);
                    }
                    this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
                } else {
                    if (sample != null) {
                        this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
                    }
                    future.setException((Throwable)((Object)new KafkaProducerException(producerRecord, "Failed to send", (Throwable)exception)));
                    if (this.producerListener != null) {
                        this.producerListener.onError(producerRecord, metadata, exception);
                    }
                    this.logger.debug((Throwable)exception, () -> "Failed to send: " + producerRecord);
                }
            }
            finally {
                if (!this.transactional) {
                    this.closeProducer(producer, false);
                }
            }
        };
    }

    @Override
    public boolean inTransaction() {
        return this.transactional && (this.producers.get() != null || TransactionSynchronizationManager.getResource(this.producerFactory) != null || TransactionSynchronizationManager.isActualTransactionActive());
    }

    private Producer<K, V> getTheProducer() {
        return this.getTheProducer(null);
    }

    protected Producer<K, V> getTheProducer(@Nullable String topic) {
        boolean transactionalProducer = this.transactional;
        if (transactionalProducer) {
            boolean inTransaction = this.inTransaction();
            Assert.state((this.allowNonTransactional || inTransaction ? 1 : 0) != 0, (String)"No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
            if (!inTransaction) {
                transactionalProducer = false;
            }
        }
        if (transactionalProducer) {
            Producer<K, V> producer = this.producers.get();
            if (producer != null) {
                return producer;
            }
            KafkaResourceHolder<K, V> holder = ProducerFactoryUtils.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
            return holder.getProducer();
        }
        if (this.allowNonTransactional) {
            return this.producerFactory.createNonTransactionalProducer();
        }
        if (topic == null) {
            return this.producerFactory.createProducer();
        }
        return this.getProducerFactory(topic).createProducer();
    }

    @Nullable
    private MicrometerHolder obtainMicrometerHolder() {
        MicrometerHolder holder = null;
        try {
            if (KafkaUtils.MICROMETER_PRESENT) {
                holder = new MicrometerHolder(this.applicationContext, this.beanName, "spring.kafka.template", "KafkaTemplate Timer", this.micrometerTags);
            }
        }
        catch (IllegalStateException ex) {
            this.micrometerEnabled = false;
        }
        return holder;
    }

    public void destroy() {
        if (this.micrometerHolder != null) {
            this.micrometerHolder.destroy();
        }
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory)this.producerFactory).destroy();
        }
    }

    private static final class SkipAbortException
    extends RuntimeException {
        SkipAbortException(Throwable cause) {
            super(cause);
        }
    }
}

