/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.milo.opcua.sdk.client.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.OpcUaSession;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.UaSession;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.UaRequestMessageType;
import org.eclipse.milo.opcua.stack.core.types.UaStructuredType;
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.structured.DataChangeNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.EventFieldList;
import org.eclipse.milo.opcua.stack.core.types.structured.EventNotificationList;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.NotificationMessage;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RepublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RequestHeader;
import org.eclipse.milo.opcua.stack.core.types.structured.StatusChangeNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.SubscriptionAcknowledgement;
import org.eclipse.milo.opcua.stack.core.util.TaskQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishingManager {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final ConcurrentMap<NodeId, AtomicLong> pendingCountMap = new ConcurrentHashMap<NodeId, AtomicLong>();
    private final Map<UInteger, SubscriptionDetails> subscriptionDetails = new ConcurrentHashMap<UInteger, SubscriptionDetails>();
    private final TaskQueue processingQueue;
    private final OpcUaClient client;

    public PublishingManager(OpcUaClient client) {
        this.client = client;
        this.processingQueue = new TaskQueue((Executor)client.getTransport().getConfig().getExecutor());
        client.addSessionActivityListener(new SessionActivityListener(){

            @Override
            public void onSessionActive(UaSession session) {
                PublishingManager.this.maybeSendPublishRequests();
            }
        });
    }

    void addSubscription(OpcUaSubscription subscription) {
        subscription.getSubscriptionId().ifPresent(id -> this.subscriptionDetails.put((UInteger)id, new SubscriptionDetails(subscription)));
        this.maybeSendPublishRequests();
    }

    void removeSubscription(OpcUaSubscription subscription) {
        subscription.getSubscriptionId().ifPresent(this.subscriptionDetails::remove);
        this.maybeSendPublishRequests();
    }

    private void maybeSendPublishRequests() {
        long maxPendingPublishes = this.getMaxPendingPublishes();
        if (maxPendingPublishes > 0L) {
            this.client.getSessionAsync().whenComplete((session, ex) -> {
                if (session != null) {
                    AtomicLong pendingCount = this.pendingCountMap.computeIfAbsent(session.getSessionId(), id -> new AtomicLong(0L));
                    for (long i = pendingCount.get(); i < maxPendingPublishes; ++i) {
                        if (pendingCount.incrementAndGet() <= maxPendingPublishes) {
                            this.sendPublishRequest((OpcUaSession)session, pendingCount);
                            continue;
                        }
                        pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
                    }
                    if (this.pendingCountMap.size() > 1) {
                        this.pendingCountMap.entrySet().removeIf(e -> !((NodeId)e.getKey()).equals((Object)session.getSessionId()));
                    }
                } else {
                    this.logger.debug("Session not available", ex);
                    this.pendingCountMap.clear();
                }
            });
        }
    }

    void sendPublishRequest(OpcUaSession session, AtomicLong pendingCount) {
        ArrayList subscriptionAcknowledgements = new ArrayList();
        this.subscriptionDetails.values().forEach(subscription -> {
            List<UInteger> list = subscription.availableAcknowledgements;
            synchronized (list) {
                subscription.availableAcknowledgements.forEach(sequenceNumber -> subscription.subscription.getSubscriptionId().ifPresent(subscriptionId -> subscriptionAcknowledgements.add(new SubscriptionAcknowledgement(subscriptionId, sequenceNumber))));
                subscription.availableAcknowledgements.clear();
            }
        });
        RequestHeader requestHeader = this.client.newRequestHeader(session.getAuthenticationToken(), this.getTimeoutHint());
        UInteger requestHandle = requestHeader.getRequestHandle();
        PublishRequest request = new PublishRequest(requestHeader, subscriptionAcknowledgements.toArray(new SubscriptionAcknowledgement[0]));
        if (this.logger.isDebugEnabled()) {
            Object[] ackStrings = (String[])subscriptionAcknowledgements.stream().map(ack -> String.format("id=%s/seq=%s", ack.getSubscriptionId(), ack.getSequenceNumber())).toArray(String[]::new);
            this.logger.debug("Sending PublishRequest, requestHandle={}, acknowledgements={}", (Object)requestHandle, (Object)Arrays.toString(ackStrings));
        }
        this.client.sendRequestAsync((UaRequestMessageType)request).whenCompleteAsync((response, ex) -> {
            if (response instanceof PublishResponse) {
                PublishResponse publishResponse = (PublishResponse)response;
                this.logger.debug("Received PublishResponse, requestHandle={}, sequenceNumber={}", (Object)publishResponse.getResponseHeader().getRequestHandle(), (Object)publishResponse.getNotificationMessage().getSequenceNumber());
                UInteger subscriptionId = publishResponse.getSubscriptionId();
                SubscriptionDetails details = this.subscriptionDetails.get(subscriptionId);
                if (details != null) {
                    details.subscription.resetWatchdogTimer();
                }
                this.processingQueue.execute(() -> this.processPublishResponse(publishResponse, pendingCount));
            } else {
                StatusCode statusCode = UaException.extract((Throwable)ex).map(UaException::getStatusCode).orElse(StatusCode.BAD);
                pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
                long code = statusCode.value();
                if (code == 2149974016L || code == 2149908480L) {
                    this.subscriptionDetails.values().forEach(d -> d.subscription.cancelWatchdogTimer());
                } else if (code != 2155413504L && code != 0x80780000L) {
                    this.maybeSendPublishRequests();
                }
                this.logger.debug("Publish service failure (requestHandle={}): {}", new Object[]{requestHandle, statusCode, ex});
            }
        }, (Executor)this.client.getTransport().getConfig().getExecutor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPublishResponse(PublishResponse response, AtomicLong pendingCount) {
        CompletionStage callback;
        UInteger[] availableSequenceNumbers;
        UInteger subscriptionId = response.getSubscriptionId();
        SubscriptionDetails details = this.subscriptionDetails.get(subscriptionId);
        if (details == null) {
            pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
            this.maybeSendPublishRequests();
            return;
        }
        NotificationMessage notificationMessage = response.getNotificationMessage();
        boolean isKeepAlive = notificationMessage.getNotificationData() == null || notificationMessage.getNotificationData().length == 0;
        long receivedSequenceNumber = notificationMessage.getSequenceNumber().longValue();
        long expectedSequenceNumber = details.lastSequenceNumber + 1L;
        this.logger.debug("Processing PublishResponse, subscriptionId={}, isKeepAlive={}, lastSequenceNumber={}, receivedSequenceNumber={}, expectedSequenceNumber={}", new Object[]{subscriptionId, isKeepAlive, details.lastSequenceNumber, receivedSequenceNumber, expectedSequenceNumber});
        if (receivedSequenceNumber > expectedSequenceNumber) {
            boolean republishSuccess = true;
            for (long sequenceNumber = expectedSequenceNumber; sequenceNumber < receivedSequenceNumber; ++sequenceNumber) {
                try {
                    RepublishResponse republishResponse = this.client.republish(subscriptionId, Unsigned.uint((long)sequenceNumber));
                    NotificationMessage republishNotificationMessage = republishResponse.getNotificationMessage();
                    details.subscription.getDeliveryQueue().execute(() -> this.deliverNotificationMessage(details, republishNotificationMessage));
                    continue;
                }
                catch (UaException e) {
                    this.logger.warn("Republish service failure, sequenceNumber={}", (Object)sequenceNumber, (Object)e);
                    republishSuccess = false;
                }
            }
            if (!republishSuccess) {
                details.subscription.notifyNotificationDataLost();
            }
            details.lastSequenceNumber = expectedSequenceNumber;
        }
        if (receivedSequenceNumber == 1L || !isKeepAlive) {
            details.lastSequenceNumber = receivedSequenceNumber;
        }
        if ((availableSequenceNumbers = response.getAvailableSequenceNumbers()) != null && availableSequenceNumbers.length > 0) {
            List<UInteger> sequenceNumber = details.availableAcknowledgements;
            synchronized (sequenceNumber) {
                details.availableAcknowledgements.clear();
                Collections.addAll(details.availableAcknowledgements, availableSequenceNumbers);
            }
        }
        if ((callback = details.subscription.getDeliveryQueue().submit(() -> this.deliverNotificationMessage(details, notificationMessage))) != null) {
            callback.whenCompleteAsync((unit, ex) -> {
                if (ex != null) {
                    this.logger.warn("Notification delivery threw an unexpected Exception: {}", (Object)ex.getMessage(), ex);
                }
                pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
                this.maybeSendPublishRequests();
            }, this.client.getTransport().getConfig().getExecutor());
        }
    }

    private void deliverNotificationMessage(SubscriptionDetails details, NotificationMessage notificationMessage) {
        ExtensionObject[] notificationData = notificationMessage.getNotificationData();
        if (notificationData == null || notificationData.length == 0) {
            details.subscription.notifyKeepAliveReceived();
        } else {
            for (ExtensionObject xo : notificationData) {
                UaStructuredType notification = xo.decode(this.client.getStaticEncodingContext());
                if (notification instanceof DataChangeNotification) {
                    MonitoredItemNotification[] monitoredItems = ((DataChangeNotification)notification).getMonitoredItems();
                    if (monitoredItems == null || monitoredItems.length <= 0) continue;
                    details.subscription.notifyDataReceived(monitoredItems);
                    continue;
                }
                if (notification instanceof EventNotificationList) {
                    EventFieldList[] events = ((EventNotificationList)notification).getEvents();
                    if (events == null || events.length <= 0) continue;
                    details.subscription.notifyEventsReceived(events);
                    continue;
                }
                if (notification instanceof StatusChangeNotification) {
                    StatusChangeNotification scn = (StatusChangeNotification)notification;
                    StatusCode status = scn.getStatus();
                    if (status.value() == 0x800A0000L) {
                        details.subscription.getSubscriptionId().ifPresent(this.subscriptionDetails::remove);
                    }
                    details.subscription.notifyStatusChanged(status);
                    continue;
                }
                this.logger.warn("Unhandled notification type: {}", (Object)notification);
            }
        }
    }

    private long getMaxPendingPublishes() {
        long maxPendingPublishRequests = this.client.getConfig().getMaxPendingPublishRequests().longValue();
        return this.subscriptionDetails.isEmpty() ? 0L : Math.min((long)(this.subscriptionDetails.size() + 1), maxPendingPublishRequests);
    }

    private UInteger getTimeoutHint() {
        double maxKeepAlive = this.client.getConfig().getRequestTimeout().doubleValue();
        List<SubscriptionDetails> subscriptions = List.copyOf(this.subscriptionDetails.values());
        for (SubscriptionDetails details : subscriptions) {
            double keepAlive;
            Optional<Double> revisedPublishingInterval = details.subscription.getRevisedPublishingInterval();
            Optional<UInteger> revisedMaxKeepAliveCount = details.subscription.getRevisedMaxKeepAliveCount();
            if (!revisedPublishingInterval.isPresent() || !revisedMaxKeepAliveCount.isPresent() || !((keepAlive = revisedPublishingInterval.get() * revisedMaxKeepAliveCount.get().doubleValue()) >= maxKeepAlive)) continue;
            maxKeepAlive = keepAlive;
        }
        long maxPendingPublishes = this.getMaxPendingPublishes();
        double timeoutHint = maxKeepAlive * (double)maxPendingPublishes * 1.5;
        if (Double.isInfinite(timeoutHint) || timeoutHint > 4.294967295E9) {
            maxKeepAlive = 0.0;
        }
        this.logger.debug("getTimeoutHint() maxKeepAlive={} maxPendingPublishes={} timeoutHint={}", new Object[]{maxKeepAlive, maxPendingPublishes, timeoutHint});
        return Unsigned.uint((long)((long)timeoutHint));
    }

    private static class SubscriptionDetails {
        private final List<UInteger> availableAcknowledgements = Collections.synchronizedList(new ArrayList());
        private volatile long lastSequenceNumber = 0L;
        private final OpcUaSubscription subscription;

        private SubscriptionDetails(OpcUaSubscription subscription) {
            this.subscription = subscription;
        }
    }
}

