/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.eventhubs.operations;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
import org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class EventHubsProducerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(EventHubsProducerOperations.class);
    private final EventHubProducerAsyncClient producerAsyncClient;
    private final EventHubsConfigurationOptionsProxy configurationOptionsProxy;

    public EventHubsProducerOperations(EventHubProducerAsyncClient producerAsyncClient, EventHubsConfiguration configuration) {
        ObjectHelper.notNull((Object)producerAsyncClient, (String)"client cannot be null");
        this.producerAsyncClient = producerAsyncClient;
        this.configurationOptionsProxy = new EventHubsConfigurationOptionsProxy(configuration);
    }

    public boolean sendEvents(Exchange exchange, AsyncCallback callback) {
        ObjectHelper.notNull((Object)exchange, (String)"exchange cannot be null");
        ObjectHelper.notNull((Object)callback, (String)"callback cannot be null");
        SendOptions sendOptions = this.createSendOptions(this.configurationOptionsProxy.getPartitionKey(exchange), this.configurationOptionsProxy.getPartitionId(exchange));
        Iterable<EventData> eventData = this.createEventData(exchange);
        return this.sendAsyncEvents(eventData, sendOptions, exchange, callback);
    }

    private boolean sendAsyncEvents(Iterable<EventData> eventData, SendOptions sendOptions, Exchange exchange, AsyncCallback asyncCallback) {
        this.sendAsyncEventsWithSuitableMethod(eventData, sendOptions).subscribe(unused -> LOG.debug("Processed one event..."), error -> {
            LOG.debug("Error processing async exchange with error: {}", (Object)error.getMessage());
            exchange.setException(error);
            asyncCallback.done(false);
        }, () -> {
            LOG.debug("All events with exchange have been sent successfully.");
            asyncCallback.done(false);
        });
        return false;
    }

    private Mono<Void> sendAsyncEventsWithSuitableMethod(Iterable<EventData> eventData, SendOptions sendOptions) {
        if (ObjectHelper.isEmpty((Object)sendOptions)) {
            return this.producerAsyncClient.send(eventData);
        }
        return this.producerAsyncClient.send(eventData, sendOptions);
    }

    private SendOptions createSendOptions(String partitionKey, String partitionId) {
        if (ObjectHelper.isNotEmpty((Object)partitionKey) && ObjectHelper.isNotEmpty((Object)partitionId)) {
            throw new IllegalArgumentException("Both partitionKey and partitionId are set. Only one or the other can be set.");
        }
        if (ObjectHelper.isEmpty((Object)partitionKey) && ObjectHelper.isEmpty((Object)partitionId)) {
            return null;
        }
        return new SendOptions().setPartitionId(partitionId).setPartitionKey(partitionKey);
    }

    private Iterable<EventData> createEventData(Exchange exchange) {
        if (exchange.getIn().getBody() instanceof Iterable) {
            return this.createEventDataFromIterable((Iterable)exchange.getIn().getBody(), exchange.getContext().getTypeConverter(), exchange.getIn().getHeaders());
        }
        return Collections.singletonList(this.createEventDataFromExchange(exchange));
    }

    private Iterable<EventData> createEventDataFromIterable(Iterable<Object> inputData, TypeConverter converter, Map<String, Object> headers) {
        LinkedList<EventData> finalEventData = new LinkedList<EventData>();
        inputData.forEach(data -> {
            if (data instanceof Exchange) {
                finalEventData.add(this.createEventDataFromExchange((Exchange)data));
            } else if (data instanceof Message) {
                finalEventData.add(this.createEventDataFromMessage((Message)data));
            } else {
                finalEventData.add(this.createEventDataFromObject(data, converter, headers));
            }
        });
        return finalEventData;
    }

    private EventData createEventDataFromExchange(Exchange exchange) {
        return this.createEventDataFromMessage(exchange.getIn());
    }

    private EventData createEventDataFromMessage(Message message) {
        return this.createEventDataFromObject(message.getBody(), message.getExchange().getContext().getTypeConverter(), message.getHeaders());
    }

    private EventData createEventDataFromObject(Object inputData, TypeConverter converter, Map<String, Object> headers) {
        byte[] data = (byte[])converter.convertTo(byte[].class, inputData);
        if (ObjectHelper.isEmpty((Object)data)) {
            throw new IllegalArgumentException(String.format("Cannot convert message body %s to byte[]. You will need to make sure the data encoded in byte[] or add a Camel TypeConverter to convert the data to byte[]", inputData));
        }
        EventData eventData = new EventData(data);
        eventData.getProperties().putAll(headers);
        return eventData;
    }
}

