/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationScope;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

public final class InstrumentedMessageFlux
extends FluxOperator<Message, Message> {
    private final EventHubsConsumerInstrumentation instrumentation;
    private final String partitionId;

    private InstrumentedMessageFlux(MessageFlux upstream, String partitionId, EventHubsConsumerInstrumentation instrumentation) {
        super((Flux)upstream);
        this.instrumentation = instrumentation;
        this.partitionId = partitionId;
    }

    public static Flux<Message> instrument(MessageFlux source, String partitionId, EventHubsConsumerInstrumentation instrumentation) {
        if (instrumentation.isEnabled()) {
            return new InstrumentedMessageFlux(source, partitionId, instrumentation);
        }
        return source;
    }

    public void subscribe(CoreSubscriber<? super Message> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe((CoreSubscriber)new TracingSubscriber(coreSubscriber, this.partitionId, this.instrumentation));
    }

    private static class TracingSubscriber
    extends BaseSubscriber<Message> {
        private final CoreSubscriber<? super Message> downstream;
        private final EventHubsConsumerInstrumentation instrumentation;
        private final String partitionId;

        TracingSubscriber(CoreSubscriber<? super Message> downstream, String partitionId, EventHubsConsumerInstrumentation instrumentation) {
            this.downstream = downstream;
            this.instrumentation = instrumentation;
            this.partitionId = partitionId;
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.downstream.onSubscribe((Subscription)this);
        }

        protected void hookOnNext(Message message) {
            try (InstrumentationScope scope = this.instrumentation.startAsyncConsume(message, this.partitionId);){
                this.downstream.onNext((Object)message);
            }
        }

        protected void hookOnError(Throwable throwable) {
            this.downstream.onError(throwable);
        }

        protected void hookOnComplete() {
            this.downstream.onComplete();
        }
    }
}

