/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.tahu.host;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.host.CommandPublisher;
import org.eclipse.tahu.host.TahuPayloadHandler;
import org.eclipse.tahu.host.api.MultiHostApplicationEventHandler;
import org.eclipse.tahu.host.seq.SequenceReorderManager;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.StatePayload;
import org.eclipse.tahu.mqtt.ClientCallback;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.mqtt.MqttServerUrl;
import org.eclipse.tahu.mqtt.TahuClient;
import org.eclipse.tahu.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TahuHostCallback
implements ClientCallback {
    private static Logger logger = LoggerFactory.getLogger((String)TahuHostCallback.class.getName());
    private static final int DEFAULT_NUM_OF_THREADS = 100;
    private final ThreadPoolExecutor[] sparkplugBExecutors;
    private Map<MqttServerName, TahuClient> tahuClients;
    private final boolean enableSequenceReordering;
    private final MultiHostApplicationEventHandler eventHandler;
    private final CommandPublisher commandPublisher;
    private final SequenceReorderManager sequenceReorderManager;
    private final PayloadDecoder<SparkplugBPayload> payloadDecoder;
    private final String hostId;
    private boolean onlineState;

    public TahuHostCallback(MultiHostApplicationEventHandler eventHandler, CommandPublisher commandPublisher, SequenceReorderManager sequenceReorderManager, PayloadDecoder<SparkplugBPayload> payloadDecoder, String hostId, boolean onlineState) {
        this.eventHandler = eventHandler;
        this.commandPublisher = commandPublisher;
        if (sequenceReorderManager != null) {
            this.enableSequenceReordering = true;
            this.sequenceReorderManager = sequenceReorderManager;
            this.sequenceReorderManager.start();
        } else {
            this.enableSequenceReordering = false;
            this.sequenceReorderManager = null;
        }
        this.payloadDecoder = payloadDecoder;
        this.hostId = hostId;
        this.onlineState = onlineState;
        this.sparkplugBExecutors = new ThreadPoolExecutor[100];
        for (int i = 0; i < 100; ++i) {
            final String uuid = UUID.randomUUID().toString().substring(0, 8);
            this.sparkplugBExecutors[i] = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

                @Override
                public Thread newThread(Runnable r) {
                    String threadName = String.format("%s-%s", "TahuHostCallback-", uuid);
                    return new Thread(r, threadName);
                }
            });
        }
    }

    @Override
    public void shutdown() {
        logger.info("Shutting down TahuHostCallback");
        for (int i = 0; i < 100; ++i) {
            try {
                this.sparkplugBExecutors[i].shutdownNow();
                continue;
            }
            catch (Exception e) {
                logger.error("Failed to shutdown executor", (Throwable)e);
            }
        }
    }

    public void setMqttClients(Map<MqttServerName, TahuClient> tahuClients) {
        this.tahuClients = tahuClients;
    }

    public void setOnlineState(boolean onlineState) {
        this.onlineState = onlineState;
    }

    @Override
    public void messageArrived(MqttServerName server, MqttServerUrl url, MqttClientId clientId, String topic, MqttMessage message) {
        try {
            TahuClient client = this.tahuClients.get(server);
            if (client == null) {
                logger.error("Message arrived on topic {} from unknown client {} on {}", new Object[]{topic, clientId, server});
                for (Map.Entry<MqttServerName, TahuClient> entry : this.tahuClients.entrySet()) {
                    logger.error("Failed - but found: {}", (Object)entry.getKey());
                }
                return;
            }
            logger.trace("Message arrived on topic {} from client {}", (Object)topic, (Object)clientId);
            if (topic == null) {
                logger.error("Invalid null topic");
                return;
            }
            String[] splitTopic = TopicUtil.getSplitTopic(topic);
            long arrivedTime = System.nanoTime();
            if (topic.startsWith("spBv1.0")) {
                if (splitTopic.length == 3 && splitTopic[1].equals("STATE")) {
                    ObjectMapper mapper = new ObjectMapper();
                    StatePayload statePayload = (StatePayload)mapper.readValue(new String(message.getPayload()), StatePayload.class);
                    if (this.hostId != null && !this.hostId.trim().isEmpty() && splitTopic[2].equals(this.hostId)) {
                        if (!statePayload.isOnline().booleanValue() && this.onlineState) {
                            logger.info("This is a offline STATE message from {} - correcting with new online STATE message", (Object)splitTopic[2]);
                            client.publishBirthMessage();
                        } else if (statePayload.isOnline().booleanValue() && !this.onlineState) {
                            logger.info("This is a online STATE message from {} - correcting with new offline STATE message", (Object)splitTopic[2]);
                            client.publishLwt(true);
                        }
                    }
                } else {
                    String key = splitTopic[1] + "/" + splitTopic[3];
                    int index = this.getThreadPoolExecutorIndex(key, 100);
                    logger.debug("Adding Sparkplug B message to ThreadPoolExecutor {} :: {}", (Object)index, (Object)this.sparkplugBExecutors[index].getQueue().size());
                    ThreadPoolExecutor executor = this.sparkplugBExecutors[index];
                    if (this.enableSequenceReordering) {
                        logger.trace("Sending the message on {} to the SequenceReorderManager", (Object)topic);
                        this.sequenceReorderManager.handlePayload(this, executor, topic, splitTopic, message, server, clientId, arrivedTime);
                    } else {
                        executor.execute(() -> {
                            try {
                                logger.trace("Sending the message on {} directly to the TahuPayloadHandler", (Object)topic);
                                new TahuPayloadHandler(this.eventHandler, this.commandPublisher, this.payloadDecoder).handlePayload(topic, splitTopic, message, server, clientId);
                            }
                            catch (Throwable t) {
                                logger.error("Failed to handle Sparkplug B message on topic {}", (Object)topic, (Object)t);
                            }
                            finally {
                                long latency = System.nanoTime() - arrivedTime;
                                if (logger.isTraceEnabled()) {
                                    logger.trace("Updating message processing latency {}", (Object)latency);
                                }
                            }
                        });
                    }
                }
            } else {
                logger.debug("Received non-Sparkplug message on topic {}", (Object)topic);
            }
        }
        catch (Throwable t) {
            logger.error("Failed to handle message on topic {}", (Object)topic, (Object)t);
        }
    }

    private int getThreadPoolExecutorIndex(String key, int numOfThreadPoolExecutors) {
        return Math.abs(key.hashCode() % numOfThreadPoolExecutors);
    }

    @Override
    public void connectionLost(MqttServerName mqttServerName, MqttServerUrl url, MqttClientId clientId, Throwable cause) {
        logger.warn("Connection Lost to - {} :: {} :: {}", new Object[]{mqttServerName, url, clientId});
        this.eventHandler.onDisconnect(mqttServerName);
        if (cause != null) {
            logger.error("Connection lost due to - {}", (Object)cause.getMessage(), (Object)cause);
        }
        logger.info("Clear out all connection counts to this MQTT Server");
        this.tahuClients.get(mqttServerName).clearConnectionCount();
        TahuClient tahuClient = this.tahuClients.get(mqttServerName);
        String lwtTopic = this.tahuClients.get(mqttServerName).getLwtTopic();
        if (lwtTopic != null && lwtTopic.startsWith("spBv1.0/STATE")) {
            String primaryHostId = lwtTopic.substring("spBv1.0/STATE".length() + 1, lwtTopic.length());
            logger.debug("Setting Primary Host ID info tag for {} to offline", (Object)primaryHostId);
        }
        if (tahuClient.getAutoReconnect()) {
            tahuClient.connect();
        }
    }

    @Override
    public void connectComplete(boolean reconnect, MqttServerName server, MqttServerUrl url, MqttClientId clientId) {
        this.eventHandler.onConnect(server);
    }

    private void updateEngineInfoDateTag(MqttServerName server, String tagName) {
    }
}

