/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.pulsar;

import org.apache.camel.Exchange;
import org.apache.camel.component.pulsar.PulsarConsumer;
import org.apache.camel.component.pulsar.PulsarEndpoint;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;

public class PulsarMessageListener
implements MessageListener<byte[]> {
    private final PulsarEndpoint endpoint;
    private final PulsarConsumer pulsarConsumer;

    public PulsarMessageListener(PulsarEndpoint endpoint, PulsarConsumer pulsarConsumer) {
        this.endpoint = endpoint;
        this.pulsarConsumer = pulsarConsumer;
    }

    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        Exchange exchange = PulsarMessageUtils.updateExchange(message, this.pulsarConsumer.createExchange(false));
        if (this.endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
            exchange.getIn().setHeader("message_receipt", (Object)this.endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
        }
        this.processAsync(exchange, consumer, message);
    }

    private void processAsync(Exchange exchange, Consumer<byte[]> consumer, Message<byte[]> message) {
        this.pulsarConsumer.getAsyncProcessor().process(exchange, doneSync -> {
            try {
                if (exchange.getException() != null) {
                    this.pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)exchange.getException());
                } else {
                    try {
                        this.acknowledge(consumer, message);
                    }
                    catch (Exception e) {
                        this.pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)exchange.getException());
                    }
                }
            }
            finally {
                this.pulsarConsumer.releaseExchange(exchange, false);
            }
        });
    }

    private void acknowledge(Consumer<byte[]> consumer, Message<byte[]> message) throws PulsarClientException {
        if (!this.endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
            consumer.acknowledge(message.getMessageId());
        }
    }
}

