/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.implementation.handler.LinkHandler;
import com.azure.core.util.logging.ClientLogger;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;

public class SendLinkHandler
extends LinkHandler {
    private final String senderName;
    private final AtomicBoolean isFirstFlow = new AtomicBoolean(true);
    private final UnicastProcessor<Integer> creditProcessor = UnicastProcessor.create();
    private final DirectProcessor<Delivery> deliveryProcessor = DirectProcessor.create();
    private final FluxSink<Integer> creditSink = this.creditProcessor.sink();
    private final FluxSink<Delivery> deliverySink = this.deliveryProcessor.sink();

    public SendLinkHandler(String connectionId, String hostname, String senderName, String entityPath) {
        super(connectionId, hostname, entityPath, new ClientLogger(SendLinkHandler.class));
        this.senderName = senderName;
    }

    public Flux<Integer> getLinkCredits() {
        return this.creditProcessor;
    }

    public Flux<Delivery> getDeliveredMessages() {
        return this.deliveryProcessor;
    }

    @Override
    public void close() {
        this.creditSink.complete();
        this.deliverySink.complete();
        super.close();
    }

    public void onLinkLocalOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Sender) {
            this.logger.verbose("onLinkLocalOpen senderName[{}], linkName[{}], localTarget[{}]", new Object[]{this.senderName, link.getName(), link.getTarget()});
        }
    }

    public void onLinkRemoteOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Sender) {
            if (link.getRemoteTarget() != null) {
                this.logger.info("onLinkRemoteOpen senderName[{}], linkName[{}], remoteTarget[{}]", new Object[]{this.senderName, link.getName(), link.getRemoteTarget()});
                if (this.isFirstFlow.getAndSet(false)) {
                    this.onNext(EndpointState.ACTIVE);
                }
            } else {
                this.logger.info("onLinkRemoteOpen senderName[{}], linkName[{}], remoteTarget[null], remoteSource[null], action[waitingForError]", new Object[]{this.senderName, link.getName()});
            }
        }
    }

    public void onLinkFlow(Event event) {
        if (this.isFirstFlow.getAndSet(false)) {
            this.onNext(EndpointState.ACTIVE);
        }
        Sender sender = event.getSender();
        this.creditSink.next((Object)sender.getRemoteCredit());
        this.logger.verbose("onLinkFlow senderName[{}], linkName[{}], unsettled[{}], credit[{}]", new Object[]{this.senderName, sender.getName(), sender.getUnsettled(), sender.getCredit()});
    }

    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        while (delivery != null) {
            Sender sender = (Sender)delivery.getLink();
            this.logger.verbose("onDelivery senderName[{}], linkName[{}], unsettled[{}], credit[{}], deliveryState[{}], delivery.isBuffered[{}], delivery.id[{}]", new Object[]{this.senderName, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(), StandardCharsets.UTF_8)});
            this.deliverySink.next((Object)delivery);
            delivery.settle();
            delivery = sender.current();
        }
    }
}

