/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import reactor.core.publisher.Signal;

class PartitionPumpManager {
    private final ClientLogger logger = new ClientLogger(PartitionPumpManager.class);
    private final CheckpointStore checkpointStore;
    private final Map<String, EventHubConsumerAsyncClient> partitionPumps = new ConcurrentHashMap<String, EventHubConsumerAsyncClient>();
    private final Supplier<PartitionProcessor> partitionProcessorFactory;
    private final EventHubClientBuilder eventHubClientBuilder;
    private final TracerProvider tracerProvider;
    private final boolean trackLastEnqueuedEventProperties;
    private final Map<String, EventPosition> initialPartitionEventPosition;

    PartitionPumpManager(CheckpointStore checkpointStore, Supplier<PartitionProcessor> partitionProcessorFactory, EventHubClientBuilder eventHubClientBuilder, boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Map<String, EventPosition> initialPartitionEventPosition) {
        this.checkpointStore = checkpointStore;
        this.partitionProcessorFactory = partitionProcessorFactory;
        this.eventHubClientBuilder = eventHubClientBuilder;
        this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
        this.tracerProvider = tracerProvider;
        this.initialPartitionEventPosition = initialPartitionEventPosition;
    }

    void stopAllPartitionPumps() {
        this.partitionPumps.forEach((partitionId, eventHubConsumer) -> {
            try {
                eventHubConsumer.close();
            }
            catch (Exception ex) {
                this.logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, new Object[]{partitionId, ex});
            }
            finally {
                this.partitionPumps.remove(partitionId);
            }
        });
    }

    void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoint) {
        if (this.partitionPumps.containsKey(claimedOwnership.getPartitionId())) {
            this.logger.verbose("Consumer is already running for this partition  {}", new Object[]{claimedOwnership.getPartitionId()});
            return;
        }
        try {
            PartitionContext partitionContext = new PartitionContext(claimedOwnership.getFullyQualifiedNamespace(), claimedOwnership.getEventHubName(), claimedOwnership.getConsumerGroup(), claimedOwnership.getPartitionId());
            PartitionProcessor partitionProcessor = this.partitionProcessorFactory.get();
            InitializationContext initializationContext = new InitializationContext(partitionContext);
            partitionProcessor.initialize(initializationContext);
            EventPosition startFromEventPosition = null;
            startFromEventPosition = checkpoint != null && checkpoint.getOffset() != null ? EventPosition.fromOffset(checkpoint.getOffset()) : (checkpoint != null && checkpoint.getSequenceNumber() != null ? EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber()) : (this.initialPartitionEventPosition.containsKey(claimedOwnership.getPartitionId()) ? this.initialPartitionEventPosition.get(claimedOwnership.getPartitionId()) : EventPosition.latest()));
            this.logger.info("Starting event processing from {} for partition {}", new Object[]{startFromEventPosition, claimedOwnership.getPartitionId()});
            ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L).setTrackLastEnqueuedEventProperties(this.trackLastEnqueuedEventProperties);
            EventHubConsumerAsyncClient eventHubConsumer = this.eventHubClientBuilder.buildAsyncClient().createConsumer(claimedOwnership.getConsumerGroup(), 500);
            this.partitionPumps.put(claimedOwnership.getPartitionId(), eventHubConsumer);
            eventHubConsumer.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions).subscribe(partitionEvent -> this.processEvent(partitionContext, partitionProcessor, eventHubConsumer, (PartitionEvent)partitionEvent), ex -> this.handleError(claimedOwnership, eventHubConsumer, partitionProcessor, (Throwable)ex, partitionContext), () -> {
                partitionProcessor.close(new CloseContext(partitionContext, CloseReason.EVENT_PROCESSOR_SHUTDOWN));
                this.cleanup(claimedOwnership, eventHubConsumer);
            });
        }
        catch (Exception ex2) {
            if (this.partitionPumps.containsKey(claimedOwnership.getPartitionId())) {
                this.cleanup(claimedOwnership, this.partitionPumps.get(claimedOwnership.getPartitionId()));
            }
            throw this.logger.logExceptionAsError((RuntimeException)((Object)new PartitionProcessorException("Error occurred while starting partition pump for partition " + claimedOwnership.getPartitionId(), ex2)));
        }
    }

    private void processEvent(PartitionContext partitionContext, PartitionProcessor partitionProcessor, EventHubConsumerAsyncClient eventHubConsumer, PartitionEvent partitionEvent) {
        EventData eventData = partitionEvent.getData();
        Context processSpanContext = this.startProcessTracingSpan(eventData, eventHubConsumer.getEventHubName(), eventHubConsumer.getFullyQualifiedNamespace());
        if (processSpanContext.getData((Object)"span-context").isPresent()) {
            eventData.addContext("span-context", processSpanContext);
        }
        try {
            partitionProcessor.processEvent(new EventContext(partitionContext, eventData, this.checkpointStore, partitionEvent.getLastEnqueuedEventProperties()));
            this.endProcessTracingSpan(processSpanContext, (Signal<Void>)Signal.complete());
        }
        catch (Throwable throwable) {
            this.endProcessTracingSpan(processSpanContext, (Signal<Void>)Signal.error((Throwable)throwable));
            throw this.logger.logExceptionAsError((RuntimeException)((Object)new PartitionProcessorException("Error in event processing callback", throwable)));
        }
    }

    Map<String, EventHubConsumerAsyncClient> getPartitionPumps() {
        return this.partitionPumps;
    }

    private void handleError(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer, PartitionProcessor partitionProcessor, Throwable throwable, PartitionContext partitionContext) {
        boolean shouldRethrow = true;
        if (!(throwable instanceof PartitionProcessorException)) {
            shouldRethrow = false;
            this.logger.warning("Error receiving events from partition {}", new Object[]{partitionContext.getPartitionId(), throwable});
            partitionProcessor.processError(new ErrorContext(partitionContext, throwable));
        }
        CloseReason closeReason = CloseReason.LOST_PARTITION_OWNERSHIP;
        partitionProcessor.close(new CloseContext(partitionContext, closeReason));
        this.cleanup(claimedOwnership, eventHubConsumer);
        if (shouldRethrow) {
            PartitionProcessorException exception = (PartitionProcessorException)((Object)throwable);
            throw this.logger.logExceptionAsError((RuntimeException)((Object)exception));
        }
    }

    private void cleanup(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer) {
        try {
            this.logger.info("Closing consumer for partition id {}", new Object[]{claimedOwnership.getPartitionId()});
            eventHubConsumer.close();
        }
        catch (Throwable throwable) {
            this.logger.info("Removing partition id {} from list of processing partitions", new Object[]{claimedOwnership.getPartitionId()});
            this.partitionPumps.remove(claimedOwnership.getPartitionId());
            throw throwable;
        }
        this.logger.info("Removing partition id {} from list of processing partitions", new Object[]{claimedOwnership.getPartitionId()});
        this.partitionPumps.remove(claimedOwnership.getPartitionId());
    }

    private Context startProcessTracingSpan(EventData eventData, String eventHubName, String fullyQualifiedNamespace) {
        Object diagnosticId = eventData.getProperties().get("diagnostic-id");
        if (diagnosticId == null || !this.tracerProvider.isEnabled()) {
            return Context.NONE;
        }
        Context spanContext = this.tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);
        Context entityContext = spanContext.addData((Object)"entity-path", (Object)eventHubName);
        return this.tracerProvider.startSpan(entityContext.addData((Object)"hostname", (Object)fullyQualifiedNamespace), ProcessKind.PROCESS);
    }

    private void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal) {
        Optional spanScope = processSpanContext.getData((Object)"scope");
        if (!spanScope.isPresent() || !this.tracerProvider.isEnabled()) {
            return;
        }
        if (spanScope.get() instanceof Closeable) {
            Closeable close = (Closeable)processSpanContext.getData((Object)"scope").get();
            try {
                close.close();
                this.tracerProvider.endSpan(processSpanContext, signal);
            }
            catch (IOException ioException) {
                this.logger.error(Messages.EVENT_PROCESSOR_RUN_END, new Object[]{ioException});
            }
        } else {
            this.logger.warning(String.format(Locale.US, Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.get() != null ? spanScope.getClass() : "null"), new Object[0]);
        }
    }
}

