/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

class EventHubPartitionAsyncConsumer
implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(EventHubPartitionAsyncConsumer.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<LastEnqueuedEventProperties> lastEnqueuedEventProperties = new AtomicReference();
    private final AmqpReceiveLinkProcessor amqpReceiveLinkProcessor;
    private final MessageSerializer messageSerializer;
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final String consumerGroup;
    private final String partitionId;
    private final boolean trackLastEnqueuedEventProperties;
    private final Scheduler scheduler;
    private final EmitterProcessor<PartitionEvent> emitterProcessor;
    private final EventPosition initialPosition;
    private volatile Long currentOffset;

    EventHubPartitionAsyncConsumer(AmqpReceiveLinkProcessor amqpReceiveLinkProcessor, MessageSerializer messageSerializer, String fullyQualifiedNamespace, String eventHubName, String consumerGroup, String partitionId, AtomicReference<Supplier<EventPosition>> currentEventPosition, boolean trackLastEnqueuedEventProperties, Scheduler scheduler) {
        this.initialPosition = Objects.requireNonNull(currentEventPosition.get().get(), "'currentEventPosition.get().get()' cannot be null.");
        this.amqpReceiveLinkProcessor = amqpReceiveLinkProcessor;
        this.messageSerializer = messageSerializer;
        this.fullyQualifiedNamespace = fullyQualifiedNamespace;
        this.eventHubName = eventHubName;
        this.consumerGroup = consumerGroup;
        this.partitionId = partitionId;
        this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
        this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        if (trackLastEnqueuedEventProperties) {
            this.lastEnqueuedEventProperties.set(new LastEnqueuedEventProperties(null, null, null, null));
        }
        currentEventPosition.set(() -> {
            Long offset = this.currentOffset;
            return offset == null ? this.initialPosition : EventPosition.fromOffset(offset);
        });
        this.emitterProcessor = (EmitterProcessor)amqpReceiveLinkProcessor.map(this::onMessageReceived).doOnNext(event -> {
            Long offset = event.getData().getOffset();
            if (offset != null) {
                this.currentOffset = offset;
            } else {
                this.logger.warning("Offset for received event should not be null. Partition Id: {}. Consumer group: {}. Data: {}", new Object[]{event.getPartitionContext().getPartitionId(), event.getPartitionContext().getConsumerGroup(), event.getData().getBodyAsString()});
            }
        }).subscribeWith((Subscriber)EmitterProcessor.create((int)amqpReceiveLinkProcessor.getPrefetch(), (boolean)false));
    }

    @Override
    public void close() {
        if (!this.isDisposed.getAndSet(true)) {
            this.emitterProcessor.onComplete();
            if (!this.amqpReceiveLinkProcessor.isTerminated()) {
                this.amqpReceiveLinkProcessor.cancel();
            }
            this.logger.info("Closed consumer for partition {}", new Object[]{this.partitionId});
        }
    }

    Flux<PartitionEvent> receive() {
        return this.emitterProcessor.publishOn(this.scheduler);
    }

    private PartitionEvent onMessageReceived(Message message) {
        LastEnqueuedEventProperties enqueuedEventProperties;
        EventData event = (EventData)this.messageSerializer.deserialize(message, EventData.class);
        if (this.trackLastEnqueuedEventProperties && (enqueuedEventProperties = (LastEnqueuedEventProperties)this.messageSerializer.deserialize(message, LastEnqueuedEventProperties.class)) != null) {
            LastEnqueuedEventProperties updated = new LastEnqueuedEventProperties(enqueuedEventProperties.getSequenceNumber(), enqueuedEventProperties.getOffset(), enqueuedEventProperties.getEnqueuedTime(), enqueuedEventProperties.getRetrievalTime());
            this.lastEnqueuedEventProperties.set(updated);
        }
        PartitionContext partitionContext = new PartitionContext(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.partitionId);
        return new PartitionEvent(partitionContext, event, this.lastEnqueuedEventProperties.get());
    }
}

