/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import java.util.Objects;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

final class FluxTraceV2
extends FluxOperator<ServiceBusReceivedMessage, ServiceBusReceivedMessage> {
    private final ServiceBusReceiverInstrumentation instrumentation;

    FluxTraceV2(Flux<? extends ServiceBusReceivedMessage> upstream, ServiceBusReceiverInstrumentation instrumentation) {
        super(upstream);
        this.instrumentation = instrumentation;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessage> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe((CoreSubscriber)new TracingSubscriber(coreSubscriber, this.instrumentation));
    }

    private static class TracingSubscriber
    extends BaseSubscriber<ServiceBusReceivedMessage> {
        private final CoreSubscriber<? super ServiceBusReceivedMessage> downstream;
        private final ServiceBusReceiverInstrumentation instrumentation;

        TracingSubscriber(CoreSubscriber<? super ServiceBusReceivedMessage> downstream, ServiceBusReceiverInstrumentation instrumentation) {
            this.downstream = downstream;
            this.instrumentation = instrumentation;
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.downstream.onSubscribe((Subscription)this);
        }

        protected void hookOnNext(ServiceBusReceivedMessage message) {
            this.instrumentation.instrumentProcess(message, ReceiverKind.ASYNC_RECEIVER, msg -> {
                this.downstream.onNext(msg);
                return null;
            });
        }

        protected void hookOnError(Throwable throwable) {
            this.downstream.onError(throwable);
        }

        protected void hookOnComplete() {
            this.downstream.onComplete();
        }
    }
}

