/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.tahu.host.seq;

import java.util.Calendar;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.SparkplugParsingException;
import org.eclipse.tahu.host.CommandPublisher;
import org.eclipse.tahu.host.TahuHostCallback;
import org.eclipse.tahu.host.TahuPayloadHandler;
import org.eclipse.tahu.host.api.HostApplicationEventHandler;
import org.eclipse.tahu.host.manager.EdgeNodeManager;
import org.eclipse.tahu.host.manager.SparkplugEdgeNode;
import org.eclipse.tahu.host.model.HostApplicationMetricMap;
import org.eclipse.tahu.host.seq.SequenceReorderContext;
import org.eclipse.tahu.host.seq.SequenceReorderMap;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.Topic;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceReorderManager {
    private static Logger logger = LoggerFactory.getLogger((String)SequenceReorderManager.class.getName());
    private static SequenceReorderManager instance;
    private static final long SEQUENCE_MONITOR_TIMER = 1000L;
    private final Map<EdgeNodeDescriptor, SequenceReorderMap> edgeNodeMap;
    private final Object edgeNodeMapLock = new Object();
    private Timer timer;
    private HostApplicationEventHandler eventHandler;
    private CommandPublisher commandPublisher;
    private PayloadDecoder<SparkplugBPayload> payloadDecoder;
    private Long timeout;

    private SequenceReorderManager() {
        this.edgeNodeMap = new ConcurrentHashMap<EdgeNodeDescriptor, SequenceReorderMap>();
    }

    public static SequenceReorderManager getInstance() {
        if (instance == null) {
            instance = new SequenceReorderManager();
        }
        return instance;
    }

    public void init(HostApplicationEventHandler eventHandler, CommandPublisher commandPublisher, PayloadDecoder<SparkplugBPayload> payloadDecoder, Long timeout) {
        if (eventHandler != null && timeout != null) {
            SequenceReorderManager.instance.eventHandler = eventHandler;
            SequenceReorderManager.instance.commandPublisher = commandPublisher;
            SequenceReorderManager.instance.payloadDecoder = payloadDecoder;
            SequenceReorderManager.instance.timeout = timeout;
        } else {
            logger.error("Not re-initializing the SequenceReorderManager timer");
        }
    }

    public void start() {
        TimerTask monitorTask = new TimerTask(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = SequenceReorderManager.this.edgeNodeMapLock;
                synchronized (object) {
                    SequenceReorderManager.this.edgeNodeMap.values().forEach(sequenceReorderMap -> {
                        try {
                            if (!sequenceReorderMap.isEmpty()) {
                                Calendar calendar = Calendar.getInstance();
                                calendar.add(14, (int)(SequenceReorderManager.this.timeout * -1L));
                                if (sequenceReorderMap.getLastUpdateTime().before(calendar.getTime())) {
                                    logger.info("Timeout while reording sequence numbers on {} with {} in queue", (Object)sequenceReorderMap.getEdgeNodeDescriptor(), (Object)sequenceReorderMap.size());
                                    SequenceReorderContext sequenceReorderContext = sequenceReorderMap.getExpiredSequenceReorderContext(SequenceReorderManager.this.timeout);
                                    if (sequenceReorderContext != null) {
                                        TahuPayloadHandler handler = new TahuPayloadHandler(SequenceReorderManager.this.eventHandler, SequenceReorderManager.this.commandPublisher, SequenceReorderManager.this.payloadDecoder);
                                        SparkplugEdgeNode edgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(sequenceReorderMap.getEdgeNodeDescriptor());
                                        sequenceReorderMap.reset();
                                        if (edgeNode != null) {
                                            logger.info("Requesting a rebirth from known edge node {}", (Object)sequenceReorderMap.getEdgeNodeDescriptor());
                                            edgeNode.setHostAppMqttClientId(sequenceReorderContext.getHostAppMqttClientId());
                                            edgeNode.setMqttServerName(sequenceReorderContext.getMqttServerName());
                                            handler.requestRebirth(sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId(), sequenceReorderMap.getEdgeNodeDescriptor(), edgeNode);
                                        } else {
                                            logger.info("Requesting a rebirth from unknown edge node {}", (Object)sequenceReorderMap.getEdgeNodeDescriptor());
                                            handler.requestRebirth(sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId(), sequenceReorderMap.getEdgeNodeDescriptor());
                                        }
                                    }
                                }
                            }
                        }
                        catch (Exception e) {
                            logger.error("Failed to handle reorder entry in monitor", (Throwable)e);
                        }
                    });
                }
            }
        };
        this.timer = new Timer("SequenceMonitorTimer");
        this.timer.scheduleAtFixedRate(monitorTask, 1000L, 1000L);
    }

    public void stop() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handlePayload(TahuHostCallback tahuHostCallback, ThreadPoolExecutor executor, String topicString, String[] splitTopic, MqttMessage message, MqttServerName mqttServerName, MqttClientId mqttClientId, long arrivedTime) throws Exception {
        Topic topic;
        try {
            topic = TopicUtil.parseTopic(splitTopic);
        }
        catch (SparkplugParsingException e) {
            logger.error("Error parsing topic", (Throwable)e);
            return;
        }
        MessageType messageType = topic.getType();
        if (messageType == MessageType.NCMD || messageType == MessageType.DCMD) {
            return;
        }
        SparkplugBPayloadDecoder decoder = new SparkplugBPayloadDecoder();
        SparkplugBPayload payload = (SparkplugBPayload)decoder.buildFromByteArray(message.getPayload(), HostApplicationMetricMap.getInstance().getMetricDataTypeMap(topic.getEdgeNodeDescriptor(), topic.getSparkplugDescriptor()));
        logger.trace("Incoming payload: {}", (Object)payload);
        Object object = this.edgeNodeMapLock;
        synchronized (object) {
            EdgeNodeDescriptor edgeNodeDescriptor = new EdgeNodeDescriptor(topic.getGroupId(), topic.getEdgeNodeId());
            SequenceReorderMap sequenceReorderMap = this.edgeNodeMap.computeIfAbsent(edgeNodeDescriptor, k -> new SequenceReorderMap(edgeNodeDescriptor));
            if (topic.isType(MessageType.NBIRTH)) {
                logger.debug("Resetting sequenceReorderMap on NBIRTH for {}", (Object)edgeNodeDescriptor);
                sequenceReorderMap.resetSeqNum();
            } else {
                if (topic.isType(MessageType.NDEATH)) {
                    this.handleMessage(tahuHostCallback, executor, new SequenceReorderContext(topicString, topic, message, payload, messageType, mqttServerName, mqttClientId, arrivedTime));
                    return;
                }
                if (topic.isType(MessageType.NCMD) || topic.isType(MessageType.DCMD)) {
                    return;
                }
            }
            boolean passedSeqNumCheck = false;
            if (payload == null || payload.getSeq() == null) {
                logger.warn("Invalid payload arrived on topic={} with {}", (Object)topic, (Object)(payload == null ? "'payload is null'" : (payload.getSeq() == null ? "'payload sequence number is null'" : "sequence number is present - shouldn't have gotten here")));
            } else {
                passedSeqNumCheck = sequenceReorderMap.liveSeqNumCheck(payload.getSeq());
            }
            if (passedSeqNumCheck) {
                if (topic.isType(MessageType.NBIRTH)) {
                    sequenceReorderMap.prune(payload.getTimestamp());
                }
                logger.debug("Handling real time message on {} with seqNum={}", (Object)topicString, (Object)payload.getSeq());
                this.handleMessage(tahuHostCallback, executor, new SequenceReorderContext(topicString, topic, message, payload, messageType, mqttServerName, mqttClientId, arrivedTime));
                if (!sequenceReorderMap.isEmpty()) {
                    boolean done = false;
                    long nextSeqNum = this.getNextSeqNum(payload.getSeq());
                    while (!done && !sequenceReorderMap.isEmpty()) {
                        SequenceReorderContext sequenceReorderContext = sequenceReorderMap.storedSeqNumCheck(nextSeqNum);
                        if (sequenceReorderContext != null) {
                            logger.debug("Handling stored message on {} with seqNum={}", (Object)topicString, (Object)nextSeqNum);
                            this.handleMessage(tahuHostCallback, executor, new SequenceReorderContext(sequenceReorderContext.getTopicString(), sequenceReorderContext.getTopic(), sequenceReorderContext.getMessage(), sequenceReorderContext.getPayload(), sequenceReorderContext.getMessageType(), sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId(), sequenceReorderContext.getArrivedTime()));
                            nextSeqNum = this.getNextSeqNum(nextSeqNum);
                            continue;
                        }
                        logger.debug("Failed to find SequenceReorderContext for {} - moving on", (Object)nextSeqNum);
                        done = true;
                    }
                }
            } else {
                logger.debug("Storing message on {} due to out of sequence message with seqNum={} - was expecting {}", new Object[]{topicString, payload.getSeq(), sequenceReorderMap.getNextExpectedSeqNum()});
                SequenceReorderContext sequenceReorderContext = new SequenceReorderContext(topicString, topic, message, payload, messageType, mqttServerName, mqttClientId, arrivedTime);
                sequenceReorderMap.put(payload.getSeq(), sequenceReorderContext);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeEdgeNode(EdgeNodeDescriptor edgeNodeDescriptor) {
        Object object = this.edgeNodeMapLock;
        synchronized (object) {
            this.edgeNodeMap.remove(edgeNodeDescriptor);
        }
    }

    private long getNextSeqNum(long currentSeqNum) {
        long nextSeqNum = currentSeqNum + 1L;
        if (nextSeqNum == 256L) {
            nextSeqNum = 0L;
        }
        return nextSeqNum;
    }

    private void handleMessage(TahuHostCallback tahuHostCallback, ThreadPoolExecutor executor, SequenceReorderContext sequenceReorderContext) {
        executor.execute(() -> {
            try {
                new TahuPayloadHandler(this.eventHandler, this.commandPublisher, this.payloadDecoder).handlePayload(sequenceReorderContext.getTopicString(), sequenceReorderContext.getSplitTopic(), sequenceReorderContext.getMessage(), sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId());
            }
            catch (Throwable t) {
                logger.error("Failed to handle Sparkplug B message on topic {} - requesting rebirth", (Object)sequenceReorderContext.getTopic(), (Object)t);
                new TahuPayloadHandler(this.eventHandler, this.commandPublisher, this.payloadDecoder).requestRebirth(sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId(), sequenceReorderContext.getTopic().getEdgeNodeDescriptor());
            }
            finally {
                long latency = System.nanoTime() - sequenceReorderContext.getArrivedTime();
                if (logger.isTraceEnabled()) {
                    logger.trace("Updating message processing latency {}", (Object)latency);
                }
            }
        });
    }
}

