SpringBoot整合MQTT最详细版(亲测有效)

embedded/2025/4/1 4:02:01/
一、导入pom.xml依赖
 <!--mqtt依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
二、配置MQTT相关信息到application.yml
spring:mqtt:username: 你的账号                           # 账号password: 你的密码                        # 密码hostUrl: tcp://127.0.0.1:1883           # mqtt连接tcp地址clientid: ${random.value}                # 客户端Id,不能相同,采用随机数 ${random.value}default-topic: /testtopic/#                      # 默认主题timeout: 3000                           # 超时时间keepalive: 600                            # 保持连接subscribeFlag: true                     #是否进行订阅true或者falseenabled: true                 # 是否使用mqtt功能

在这里插入图片描述

javaMqttConfigjavaMqttInitjavaMqttPushClientjavaPushCallbackjavahttpsiblogcsdnimgcndirect65e3bb81aee44d6c89bc3b817cf47158png_43">三、在项目中创建mqtt文件夹后添加AjaxResult.java、MqttConfig.java、MqttInit.java、MqttPushClient.java、PushCallback.java,如图:在这里插入图片描述

AjaxResult.java代码

public class AjaxResult extends HashMap<String, Object>
{private static final long serialVersionUID = 1L;/** 状态码 */public static final String CODE_TAG = "code";/** 返回内容 */public static final String MSG_TAG = "msg";/** 数据对象 */public static final String DATA_TAG = "data";/*** 初始化一个新创建的 AjaxResult 对象,使其表示一个空消息。*/public AjaxResult(){}/*** 初始化一个新创建的 AjaxResult 对象* * @param code 状态码* @param msg 返回内容*/public AjaxResult(int code, String msg){super.put(CODE_TAG, code);super.put(MSG_TAG, msg);}/*** 初始化一个新创建的 AjaxResult 对象* * @param code 状态码* @param msg 返回内容* @param data 数据对象*/public AjaxResult(int code, String msg, Object data){super.put(CODE_TAG, code);super.put(MSG_TAG, msg);if (StringUtils.isNotNull(data)){super.put(DATA_TAG, data);}}/*** 返回成功消息* * @return 成功消息*/public static AjaxResult success(){return AjaxResult.success("操作成功");}/*** 返回成功数据* * @return 成功消息*/public static AjaxResult success(Object data){return AjaxResult.success("操作成功", data);}/*** 返回成功消息* * @param msg 返回内容* @return 成功消息*/public static AjaxResult success(String msg){return AjaxResult.success(msg, null);}/*** 返回成功消息* * @param msg 返回内容* @param data 数据对象* @return 成功消息*/public static AjaxResult success(String msg, Object data){return new AjaxResult(HttpStatus.SUCCESS, msg, data);}/*** 返回警告消息** @param msg 返回内容* @return 警告消息*/public static AjaxResult warn(String msg){return AjaxResult.warn(msg, null);}/*** 返回警告消息** @param msg 返回内容* @param data 数据对象* @return 警告消息*/public static AjaxResult warn(String msg, Object data){return new AjaxResult(HttpStatus.WARN, msg, data);}/*** 返回错误消息* * @return 错误消息*/public static AjaxResult error(){return AjaxResult.error("操作失败");}/*** 返回错误消息* * @param msg 返回内容* @return 错误消息*/public static AjaxResult error(String msg){return AjaxResult.error(msg, null);}/*** 返回错误消息* * @param msg 返回内容* @param data 数据对象* @return 错误消息*/public static AjaxResult error(String msg, Object data){return new AjaxResult(HttpStatus.ERROR, msg, data);}/*** 返回错误消息* * @param code 状态码* @param msg 返回内容* @return 错误消息*/public static AjaxResult error(int code, String msg){return new AjaxResult(code, msg, null);}/*** 是否为成功消息** @return 结果*/public boolean isSuccess(){return Objects.equals(HttpStatus.SUCCESS, this.get(CODE_TAG));}/*** 是否为警告消息** @return 结果*/public boolean isWarn(){return Objects.equals(HttpStatus.WARN, this.get(CODE_TAG));}/*** 是否为错误消息** @return 结果*/public boolean isError(){return Objects.equals(HttpStatus.ERROR, this.get(CODE_TAG));}/*** 方便链式调用** @param key 键* @param value 值* @return 数据对象*/@Overridepublic AjaxResult put(String key, Object value){super.put(key, value);return this;}
}

MqttConfig.java代码:

@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** mqtt功能使能*/private boolean enabled;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}//创建MQTT客户端public MqttPushClient getMqttPushClient() {if(enabled == true){String mqtt_topic[] = StringUtils.split(defaultTopic, ",");System.out.println("开始连接======================="+clientId);//连接mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);System.out.println("开始订阅=======================");for(int i=0; i<mqtt_topic.length; i++){//订阅主题mqttPushClient.subscribe(mqtt_topic[i], 1);}}return mqttPushClient;}}

MqttInit.java代码

@Component
public class MqttInit implements ApplicationRunner {@Autowiredprivate MqttConfig mqttConfig;/*** 初始化客户端用于接收生产者发过来的消息,项目运行就会创建好* @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception{mqttConfig.getMqttPushClient();}
}

MqttPushClient.java代码

@Component
@Order(value = 2)
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数* @param callback  是否回调*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {System.out.println("进入连接----------------"+clientID);MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {//设置回调  接收消息时需要回调client.setCallback(pushCallback);client.connect(options);System.out.println("连接成功==================="+clientID);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* QoS 0(最多一次):消息发送后不进行确认,也不重试,是最低的服务质量等级。这种方式可能会导致消息丢失,但传输效率最高。* QoS 1(至少一次):确保消息至少被送达一次。如果发送方没有收到确认,它可能会重试发送消息,这可能导致消息重复。* QoS 2(恰好一次):保证消息准确无误地送达一次,不丢失也不重复,是最高的服务质量等级,但相应地增加了通信的开销。* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();return AjaxResult.success();} catch (MqttPersistenceException e)  {e.printStackTrace();return AjaxResult.error();} catch (MqttException e) {e.printStackTrace();return AjaxResult.error();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}

PushCallback.java代码

@Component
@Order(value = 1)
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {System.out.println("重连======================");// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");boolean flag=true;while(flag) {try {Thread.sleep(5000);if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}flag=false;} catch (Exception e) {e.printStackTrace();}}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));System.out.println("接收消息主题 : " + topic);System.out.println("接收消息Qos : " + mqttMessage.getQos());System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));_topic = topic;_qos = mqttMessage.getQos()+"";_msg = new String(mqttMessage.getPayload());}/*** 参数回调的方法* @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}//别的Controller层会调用这个方法来  获取  接收到的硬件数据public String receive() {JSONObject jsonObject = new JSONObject();jsonObject.put("topic", _topic);jsonObject.put("qos", _qos);jsonObject.put("msg", _msg);return jsonObject.toString();}}

最后整合结束

四、测试

创建mttqController控制器发送消息,如图
在这里插入图片描述
mttqController代码

@RestController
@RequestMapping("/mttpTest")
public class mttqController extends BaseController {@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate MqttPushClient mqttPushClient;@GetMapping(value = "/a")public AjaxResult a(){JSONObject jsonObject = new JSONObject();jsonObject.put("hello", "你好");jsonObject.put("msg", "成功");AjaxResult publish = mqttPushClient.publish(2,false,"/testtopic/5",jsonObject.toString());return success(null);}}

浏览器输入接口地址调试发送成功
在这里插入图片描述
使用MQTTX客户端工具查看消息成功被发送
在这里插入图片描述
springboot控制台(也就是PushCallback.java里的回调方法messageArrived)也成功打印出了客户端接收到的消息,如图

在这里插入图片描述


http://www.ppmy.cn/embedded/174795.html

相关文章

OpenCV-Python实战(1)——OpenCV简介与图像处理基础

OpenCV介绍 Python安装OpenCV &#xff1a;对于 Linux 和 Windows 操作系统&#xff0c;首先需要在 shell 或 cmd 中运行以下命令安装 NumPy &#xff1a;pip install numpy 。然后再安装 OpenCV&#xff0c;可以选择仅安装主模块包 &#xff1a;pip install opencv-python &am…

ElementUI el-radio失效

我们在使用过程中&#xff0c;偶尔会遇到ElementUI el-radio失效&#xff0c;无法切换选中效果的情况。这个时候该如何解决呢&#xff1f;下面一起来看看吧&#xff01; 使用 el-radio 标签&#xff0c;点击图中的【二级指标】没有反应&#xff0c;还是默认选中【一级指标】 可…

【LangChain入门 3 Prompts组件】聊天提示词模板 ChatPromptTemplate

文章目录 一、 聊天信息提示词模板1.1 使用关键字1.2 使用SystemMessage, HumanMessage, AIMessage来定义消息1.3 使用MessagesPlaceholder 在特定未知添加消息列表 二、关键类介绍2.1 ChatPromptTemplate 类2.1.1 from_messages()2.1.2 format_messages()2.1.3 format_prompt(…

【亚马逊云科技】大模型选型实战(挑选和测评对比最适合业务的大模型)

文章目录 前言1、实验内容2、手册内容 一、环境准备二、Prompt 实战与模型配置2.1 基于 Amazon Bedrock 对比测试不同模型的逻辑推理效果2.2 基于 Amazon Bedrock 对比测试不同模型知识问答能力2.3 Prompt 实战结果分析 三、基于 Amazon Bedrock Evaluations 进行模型评测与自动…

Redis原理--持久化

Redis的数据都保存在内存&#xff0c;如果Redis宕机&#xff0c;数据将会全部丢失&#xff0c;因此必须有一种机制来保证Redis里的数据不会因为故障而丢失&#xff0c;这种机制就是Redis的持久化机制。 Redis持久化机制 1、快照 快照是一次全量备份&#xff0c;快照是内存数…

【Go】运算符笔记

基本数学运算 Go 语言支持常见的 算术运算符&#xff0c;用于执行数学计算。 运算符说明加法-减法*乘法/除法%取余自增--自减 整数运算只能得到整数部分 package mainimport ("fmt""math" )func main() {go_math() }func go_math() {x, y : 8, 5fmt.Pr…

虚拟机检测与反调试对抗技术

1. 虚拟机检测技术全解析 1.1 系统特征检测 常见检测点与绕过方案&#xff1a; 检测维度检测方法示例绕过技巧系统属性Build.PRODUCT.contains("sdk")Hook Build类属性返回值硬件信息检查/proc/cpuinfo虚拟化特征动态修改文件读取内容传感器数据加速度计/陀螺仪数…

MySQL以及MyBatis事务配置

在数据库管理中&#xff0c;事务是一个至关重要的概念。无论是金融交易、库存管理还是用户数据更新&#xff0c;事务都确保了数据的完整性和一致性。本文将详细介绍为什么需要事务、事务的核心概念、ACID特性、MySQL事务实践以及MyBatis事务配置的最佳实践。 一、事务的必要性…