/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.cosmosdb;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.implementation.apachecommons.lang.RandomStringUtils;
import java.util.List;
import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.cosmosdb.CosmosDbConfiguration;
import org.apache.camel.component.azure.cosmosdb.CosmosDbEndpoint;
import org.apache.camel.component.azure.cosmosdb.client.CosmosAsyncClientWrapper;
import org.apache.camel.component.azure.cosmosdb.operations.CosmosDbContainerOperations;
import org.apache.camel.component.azure.cosmosdb.operations.CosmosDbOperationsBuilder;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.EmptyAsyncCallback;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ObjectHelper;

public class CosmosDbConsumer
extends DefaultConsumer {
    private Synchronization onCompletion;
    private CosmosAsyncClientWrapper clientWrapper;
    private ChangeFeedProcessor changeFeedProcessor;

    public CosmosDbConsumer(CosmosDbEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
    }

    protected void doInit() throws Exception {
        super.doInit();
        this.clientWrapper = new CosmosAsyncClientWrapper(this.getEndpoint().getCosmosAsyncClient());
        this.onCompletion = new ConsumerOnCompletion();
        this.changeFeedProcessor = this.getContainerOperations().captureEventsWithChangeFeed(this.getLeaseContainerOperations().getContainer(), this.getHostName(), this::onEventListener, this.getConfiguration().getChangeFeedProcessorOptions());
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.changeFeedProcessor.start().subscribe(aVoid -> {}, this::onErrorListener);
    }

    protected void doStop() throws Exception {
        if (this.changeFeedProcessor != null) {
            this.changeFeedProcessor.stop().block();
        }
        super.doStop();
    }

    public CosmosDbConfiguration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    public CosmosDbEndpoint getEndpoint() {
        return (CosmosDbEndpoint)super.getEndpoint();
    }

    private void onEventListener(List<Map<String, ?>> record) {
        Exchange exchange = this.createAzureCosmosDbExchange(record);
        ((ExtendedExchange)exchange.adapt(ExtendedExchange.class)).addOnCompletion(this.onCompletion);
        this.getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
    }

    private Exchange createAzureCosmosDbExchange(List<Map<String, ?>> record) {
        Exchange exchange = this.createExchange(true);
        Message message = exchange.getIn();
        message.setBody(record);
        return exchange;
    }

    private void onErrorListener(Throwable error) {
        this.getExceptionHandler().handleException("Error processing exchange", error);
    }

    private CosmosDbContainerOperations getContainerOperations() {
        return CosmosDbOperationsBuilder.withClient(this.clientWrapper).withDatabaseName(this.getConfiguration().getDatabaseName()).withCreateDatabaseIfNotExist(this.getConfiguration().isCreateDatabaseIfNotExists()).withContainerName(this.getConfiguration().getContainerName()).withContainerPartitionKeyPath(this.getConfiguration().getContainerPartitionKeyPath()).withCreateContainerIfNotExist(this.getConfiguration().isCreateContainerIfNotExists()).withThroughputProperties(this.getConfiguration().getThroughputProperties()).buildContainerOperations();
    }

    private CosmosDbContainerOperations getLeaseContainerOperations() {
        String leaseContainerPartitionKeyPath = "/id";
        String leaseDatabaseName = ObjectHelper.isEmpty((Object)this.getConfiguration().getLeaseDatabaseName()) ? this.getConfiguration().getDatabaseName() : this.getConfiguration().getLeaseDatabaseName();
        return CosmosDbOperationsBuilder.withClient(this.clientWrapper).withDatabaseName(leaseDatabaseName).withCreateDatabaseIfNotExist(this.getConfiguration().isCreateLeaseDatabaseIfNotExists()).withContainerName(this.getConfiguration().getLeaseContainerName()).withContainerPartitionKeyPath("/id").withCreateContainerIfNotExist(this.getConfiguration().isCreateLeaseContainerIfNotExists()).withThroughputProperties(this.getConfiguration().getThroughputProperties()).buildContainerOperations();
    }

    private String getHostName() {
        if (ObjectHelper.isEmpty((Object)this.getConfiguration().getHostName())) {
            return RandomStringUtils.randomAlphabetic((int)10);
        }
        return this.getConfiguration().getHostName();
    }

    private class ConsumerOnCompletion
    extends SynchronizationAdapter {
        private ConsumerOnCompletion() {
        }

        public void onFailure(Exchange exchange) {
            Exception cause = exchange.getException();
            if (cause != null) {
                CosmosDbConsumer.this.getExceptionHandler().handleException("Error during processing exchange.", exchange, (Throwable)cause);
            }
        }
    }
}

