1、mqtt服务器使用emqx
EMQX: The World's #1 Open Source Distributed MQTT Broker
2、下载安装
下载地址:
Download EMQX
选择系统,版本,安装方法
3、springboot连接mqtt服务方法:
引包:
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
自定义配置信息:
spring:#MQTT配置信息mqtt:enable: true#MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开url: tcp://localhost:1883#用户名username: admin#密码password: public#客户端id(不能重复)provider-id: server-provider#MQTT默认的消息推送主题,实际可在调用接口是指定default-topic: topic
配置参数对应的自定义配置类:
package com.gnetek.monitor.api.bean;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Description mqtt客户端配置信息* @Author Darren Huang* @Date 2024-04-28 13:16*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class GneMqtt {/*** 启用*/private boolean enable;/*** url*/private String url;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 提供者id*/private String providerId;/*** 消费者id*/private String consumerId;/*** 默认主题*/private String defaultTopic;
}
启动连接mqtt服务器配置类
注意,发送消息retained=true表示保留消息,发送后,才有客户端订阅也能收到,如果收到retained的消息后要删除消息,需要再发一个空的消息(payload= new byte[0])到此主题上
package com.gnetek.monitor.api.config;import cn.hutool.json.JSONUtil;
import com.gnetek.monitor.api.bean.GneMqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;/*** @Description 消息生产者客户端配置* @Author Darren Huang* @Date 2024-04-28 13:19*/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.mqtt.enable", havingValue = "true")
public class MqttProviderClient {@Autowiredprivate GneMqtt gneMqtt;private MqttClient mqttClient;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try{//创建MQTT客户端对象mqttClient = new MqttClient(gneMqtt.getUrl(), gneMqtt.getProviderId(), new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(gneMqtt.getUsername());//设置连接密码options.setPassword(gneMqtt.getPassword().toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔 1.5*30秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(30);// 自动重连 setCallback需要实现 MqttCallbackExtendedoptions.setAutomaticReconnect(true);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
// options.setWill("willTopic", (mqttClient + "与服务器断开连接").getBytes(),0,false);//设置回调mqttClient.setCallback(new MqttProviderCallBack());mqttClient.connect(options);} catch(MqttException e){e.printStackTrace();}}/*** 发布** @param topic 主题* @param message 消息*/public void publish(String topic, Object message) {publish(topic, message, true, 0);}/*** 发布** @param topic 主题* @param message 消息* @param retained 保留* @param qos qos 最多一次 (QoS0) 至少一次 (QoS1) 有且仅有一次 (QoS2)*/public void publish(String topic, Object message, boolean retained, int qos){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);byte[] payload = JSONUtil.toJsonStr(message).getBytes();mqttMessage.setPayload(payload);//主题的目的地,用于发布/订阅信息MqttTopic mqttTopic = mqttClient.getTopic(topic);//提供一种机制来跟踪消息的传递进度//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}@PreDestroypublic void destroy() {try {if (mqttClient.isConnected()) {mqttClient.disconnect();}} catch (MqttException e) {e.printStackTrace();}}
}
mqtt客户端监听:
注意如果你需要与服务器断开连接后重新连接,需要实现MqttCallbackExtended方法,
// 自动重连 setCallback需要实现 MqttCallbackExtended
options.setAutomaticReconnect(true);
package com.gnetek.monitor.api.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;/*** @Description 生产者客户端消息回调* @Author Darren Huang* @Date 2024-04-28 13:28*/
@Slf4j
public class MqttProviderCallBack implements MqttCallbackExtended {/*** 连接丢失** @param throwable throwable*/@Overridepublic void connectionLost(Throwable throwable) {log.debug("与服务器断开连接,可重连");throwable.printStackTrace();}/*** 消息到达** @param topic 主题* @param mqttMessage mqtt消息* @throws Exception*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.debug("接收消息的主题 : {}", topic);log.debug("接收消息的内容 : {}", new String(mqttMessage.getPayload()));log.debug("接收消息的Qos : {}", mqttMessage.getQos());log.debug("接收消息的retained : {}", mqttMessage.isRetained());}/*** 交付完成** @param iMqttDeliveryToken MQTT交付令牌*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.debug("发送消息成功");}/*** 连接完成** @param reconnect 重新连接* @param serverURI 服务器uri*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {// 可以做订阅主题 if(reconnect) 如果是重连 订阅主题log.debug("与服务器断连接完成 是否是重连={}, serverURI={}", reconnect, serverURI);}
}
4、android连接mqtt服务
参考:Android 使用 Kotlin 连接 MQTT | EMQ (emqx.com)
里面用到了
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
高版本的android需要修改一些代码,它的源码地址是:eclipse/paho.mqtt.android: MQTT Android (github.com)