/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.outbound;

import java.nio.charset.StandardCharsets;
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.MqttComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;

public class Mqttv5PahoMessageHandler
extends AbstractMqttMessageHandler<IMqttAsyncClient, MqttConnectionOptions>
implements MqttCallback,
MqttComponent<MqttConnectionOptions> {
    private final MqttConnectionOptions connectionOptions;
    private IMqttAsyncClient mqttClient;
    @Nullable
    private MqttClientPersistence persistence;
    private boolean async;
    private boolean asyncEvents;
    private HeaderMapper<MqttProperties> headerMapper = new MqttHeaderMapper();

    public Mqttv5PahoMessageHandler(String url, String clientId) {
        super(url, clientId);
        Assert.hasText((String)url, (String)"'url' cannot be null or empty");
        this.connectionOptions = new MqttConnectionOptions();
        this.connectionOptions.setServerURIs(new String[]{url});
        this.connectionOptions.setAutomaticReconnect(true);
    }

    public Mqttv5PahoMessageHandler(MqttConnectionOptions connectionOptions, String clientId) {
        super(Mqttv5PahoMessageHandler.obtainServerUrlFromOptions(connectionOptions), clientId);
        this.connectionOptions = connectionOptions;
    }

    public Mqttv5PahoMessageHandler(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
        super(clientManager);
        this.connectionOptions = (MqttConnectionOptions)clientManager.getConnectionInfo();
    }

    private static String obtainServerUrlFromOptions(MqttConnectionOptions connectionOptions) {
        Assert.notNull((Object)connectionOptions, (String)"'connectionOptions' must not be null");
        Object[] serverURIs = connectionOptions.getServerURIs();
        Assert.notEmpty((Object[])serverURIs, (String)"'serverURIs' must be provided in the 'MqttConnectionOptions'");
        return serverURIs[0];
    }

    @Override
    public MqttConnectionOptions getConnectionInfo() {
        return this.connectionOptions;
    }

    public void setPersistence(@Nullable MqttClientPersistence persistence) {
        this.persistence = persistence;
    }

    public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
        Assert.notNull(headerMapper, (String)"'headerMapper' must not be null");
        this.headerMapper = headerMapper;
    }

    public void setAsync(boolean async) {
        this.async = async;
    }

    public void setAsyncEvents(boolean asyncEvents) {
        this.asyncEvents = asyncEvents;
    }

    @Override
    protected void onInit() {
        super.onInit();
        try {
            if (this.getClientManager() == null) {
                this.mqttClient = new MqttAsyncClient(this.getUrl(), this.getClientId(), this.persistence);
                this.mqttClient.setCallback((MqttCallback)this);
                this.incrementClientInstance();
            }
        }
        catch (MqttException ex) {
            throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + this.getComponentName(), (Throwable)ex);
        }
        if (this.getConverter() == null) {
            this.setConverter((MessageConverter)this.getBeanFactory().getBean("integrationArgumentResolverMessageConverter", MessageConverter.class));
        } else {
            Assert.state((!(this.getConverter() instanceof MqttMessageConverter) ? 1 : 0) != 0, (String)"MessageConverter must not be an MqttMessageConverter");
        }
    }

    @Override
    protected void doStart() {
        try {
            ClientManager clientManager = this.getClientManager();
            if (clientManager != null) {
                this.mqttClient = (IMqttAsyncClient)clientManager.getClient();
            } else {
                this.mqttClient.connect(this.connectionOptions).waitForCompletion(this.getCompletionTimeout());
            }
        }
        catch (MqttException ex) {
            this.logger.error((Throwable)ex, (CharSequence)"MQTT client failed to connect.");
        }
    }

    @Override
    protected void doStop() {
        try {
            if (this.getClientManager() == null) {
                this.mqttClient.disconnect().waitForCompletion(this.getDisconnectCompletionTimeout());
            }
        }
        catch (MqttException ex) {
            this.logger.error((Throwable)ex, (CharSequence)"Failed to disconnect 'MqttAsyncClient'");
        }
    }

    public void destroy() {
        super.destroy();
        try {
            if (this.getClientManager() == null) {
                this.mqttClient.close(true);
            }
        }
        catch (MqttException ex) {
            this.logger.error((Throwable)ex, (CharSequence)"Failed to close 'MqttAsyncClient'");
        }
    }

    @Override
    protected void handleMessageInternal(Message<?> message) {
        Object payload = message.getPayload();
        MqttMessage mqttMessage = payload instanceof MqttMessage ? (MqttMessage)payload : this.buildMqttMessage(message);
        this.publish(this.obtainTopicToPublish(message), mqttMessage, message);
    }

    private String obtainTopicToPublish(Message<?> message) {
        String topic = (String)this.getTopicProcessor().processMessage(message);
        if (topic == null) {
            topic = this.getDefaultTopic();
        }
        Assert.state((topic != null ? 1 : 0) != 0, () -> "No topic could be determined from the '" + message + "' and no default topic defined");
        return topic;
    }

    private MqttMessage buildMqttMessage(Message<?> message) {
        byte[] body;
        Object payload = message.getPayload();
        if (payload instanceof byte[]) {
            body = (byte[])payload;
        } else if (payload instanceof String) {
            body = ((String)payload).getBytes(StandardCharsets.UTF_8);
        } else {
            MessageConverter converter = this.getConverter();
            body = (byte[])converter.fromMessage(message, byte[].class);
            Assert.state((body != null ? 1 : 0) != 0, () -> "The MQTT payload cannot be null. The '" + converter + "' returned null for: " + message);
        }
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(body);
        Integer qos = (Integer)this.getQosProcessor().processMessage(message);
        mqttMessage.setQos(qos == null ? this.getDefaultQos() : qos.intValue());
        Boolean retained = (Boolean)this.getRetainedProcessor().processMessage(message);
        mqttMessage.setRetained(retained == null ? this.getDefaultRetained() : retained.booleanValue());
        MqttProperties properties = new MqttProperties();
        this.headerMapper.fromHeaders(message.getHeaders(), (Object)properties);
        mqttMessage.setProperties(properties);
        return mqttMessage;
    }

    @Override
    protected void publish(String topic, Object mqttMessage, Message<?> message) {
        Assert.isInstanceOf(MqttMessage.class, (Object)mqttMessage, (String)"The 'mqttMessage' must be an instance of 'MqttMessage'");
        long completionTimeout = this.getCompletionTimeout();
        try {
            if (!this.mqttClient.isConnected()) {
                this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
            }
            IMqttToken token = this.mqttClient.publish(topic, (MqttMessage)mqttMessage);
            ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
            if (!this.async) {
                token.waitForCompletion(completionTimeout);
            } else if (this.asyncEvents && applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttMessageSentEvent(this, message, topic, token.getMessageId(), this.getClientId(), this.getClientInstance()));
            }
        }
        catch (MqttException ex) {
            throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + "]", (Throwable)ex);
        }
    }

    private void sendDeliveryComplete(IMqttToken token) {
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        if (this.async && this.asyncEvents && applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new MqttMessageDeliveredEvent(this, token.getMessageId(), this.getClientId(), this.getClientInstance()));
        }
    }

    public void deliveryComplete(IMqttToken token) {
        this.sendDeliveryComplete(token);
    }

    public void disconnected(MqttDisconnectResponse disconnectResponse) {
        MqttException cause = disconnectResponse.getException();
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, cause));
        }
    }

    public void mqttErrorOccurred(MqttException exception) {
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new MqttProtocolErrorEvent((Object)this, exception));
        }
    }

    public void messageArrived(String topic, MqttMessage message) {
    }

    public void connectComplete(boolean reconnect, String serverURI) {
    }

    public void authPacketArrived(int reasonCode, MqttProperties properties) {
    }
}

