/*
 * Decompiled with CFR 0.152.
 */
package org.citrusframework.kafka.endpoint;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.citrusframework.context.TestContext;
import org.citrusframework.endpoint.EndpointConfiguration;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.kafka.endpoint.KafkaEndpointConfiguration;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer
extends AbstractMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    protected final KafkaEndpointConfiguration endpointConfiguration;
    private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;

    public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfiguration) {
        super(name, (EndpointConfiguration)endpointConfiguration);
        this.endpointConfiguration = endpointConfiguration;
        this.consumer = this.createConsumer();
    }

    public Message receive(TestContext context, long timeout) {
        ConsumerRecords records;
        String topic = context.replaceDynamicContentInString(Optional.ofNullable(this.endpointConfiguration.getTopic()).orElseThrow(() -> new CitrusRuntimeException("Missing Kafka topic to receive messages from - add topic to endpoint configuration")));
        if (logger.isDebugEnabled()) {
            logger.debug("Receiving Kafka message on topic: '" + topic);
        }
        if (this.consumer.subscription() == null || this.consumer.subscription().isEmpty()) {
            this.consumer.subscribe((Collection)Arrays.stream(topic.split(",")).collect(Collectors.toList()));
        }
        if ((records = this.consumer.poll(Duration.ofMillis(timeout))).isEmpty()) {
            throw new MessageTimeoutException(timeout, topic);
        }
        if (logger.isDebugEnabled()) {
            records.forEach(record -> logger.debug("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()));
        }
        Message received = this.endpointConfiguration.getMessageConverter().convertInbound((ConsumerRecord<Object, Object>)((ConsumerRecord)records.iterator().next()), this.endpointConfiguration, context);
        context.onInboundMessage(received);
        this.consumer.commitSync(Duration.ofMillis(this.endpointConfiguration.getTimeout()));
        logger.info("Received Kafka message on topic: '" + topic);
        return received;
    }

    public void stop() {
        try {
            if (this.consumer.subscription() != null && !this.consumer.subscription().isEmpty()) {
                this.consumer.unsubscribe();
            }
        }
        finally {
            this.consumer.close(Duration.ofMillis(10000L));
        }
    }

    private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createConsumer() {
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put("client.id", Optional.ofNullable(this.endpointConfiguration.getClientId()).orElseGet(() -> "citrus_kafka_consumer_" + UUID.randomUUID().toString()));
        consumerProps.put("group.id", this.endpointConfiguration.getConsumerGroup());
        consumerProps.put("bootstrap.servers", Optional.ofNullable(this.endpointConfiguration.getServer()).orElse("localhost:9092"));
        consumerProps.put("max.poll.records", 1);
        consumerProps.put("enable.auto.commit", this.endpointConfiguration.isAutoCommit());
        consumerProps.put("auto.commit.interval.ms", this.endpointConfiguration.getAutoCommitInterval());
        consumerProps.put("auto.offset.reset", this.endpointConfiguration.getOffsetReset());
        consumerProps.put("key.deserializer", this.endpointConfiguration.getKeyDeserializer());
        consumerProps.put("value.deserializer", this.endpointConfiguration.getValueDeserializer());
        consumerProps.putAll(this.endpointConfiguration.getConsumerProperties());
        return new org.apache.kafka.clients.consumer.KafkaConsumer(consumerProps);
    }

    public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer) {
        this.consumer = consumer;
    }
}

