springBoot集成emqx 实现mqtt消息的发送订阅

embedded/2025/3/5 9:46:27/

介绍

我们可以想象这么一个场景,我们java应用想要采集到电表a的每小时的用电信息,我们怎么拿到电表的数据?一般我们会想 直接 java 后台发送请求给电表,然后让电表返回数据就可以了,事实上,我们java应用发送请求请求电表的数据信息并不是发到电表上,而是发送到 服务端 (broker)上,请求服务器 给我们电表的信息,而电表会把数据 按照mqtt协议 源源不断的发送到服务端,服务端可以把数据存储到物联网数据库上,也可以由我们java应用手动存储到物联网数据库上

而我们怎么知道电表发送到服务端的哪里,java应用又怎么请求到该电表发送的位置?

这就 引出了 一个概念,主题 (topic) ,这个topic在mqtt中不需要手动的创建,只要又客户端订阅或者发布消息,主题就会被自动创建出来

而我们服务端用的最多的就是 集成好的emqx服务器,本文我们也用的是集成好的emqx的服务端

,我们先是 一个电表 订阅好一个固定的主题,然后 源源不断的往服务端发消息,然后我们java应用订阅这个主题,这样 java应用就能持续的拿到电表的数据了

具体什么是主题,主题怎么设置的 ,mqtt协议的具体协议内容,直接登录emqx官网查看即可

MQTT 最全教程:从入门到精通 | EMQ

而emqx服务器是怎么在linux系统上搭建的呢,具体直接看文档即可,输入文档对应的yum命令就可以直接 在linux服务器上安装了

在 CentOS/RHEL 上安装 EMQX | EMQX文档

文本主要书写代码的实现

代码实现

我们的yml文件如下

 我们后续 java应用订阅消息 都要到服务端 emqx 的1883端口

实体类如下

 

这里解释一下 clientid 是不固定的,随机的每一个发布/订阅消息的客户端都有一个唯一的clientid

而username 和password 是 客户端连接到 服务端的认证账户,多个客户端可以使用一个 账号密码

客户端代码实现

java">@Slf4j
@Component
@RequiredArgsConstructor
public class EMQXClient {
private  final MqttDefaultProperties mqttDefaultProperties;
private  final IMessageCallbackImpl mqttCallback;private IMqttClient mqttClient;/*** 初始化客户端对象*/
public  boolean initMqttClient(String clientId,String serverUrl)  {MemoryPersistence memoryPersistence = new MemoryPersistence();try {if(Objects.isNull(clientId)){clientId= mqttDefaultProperties.getDefaultClientId();}if(Objects.isNull(serverUrl)){serverUrl= mqttDefaultProperties.getServerUrl();}mqttClient = new MqttClient(serverUrl, clientId, memoryPersistence);} catch (MqttException e) {log.info("mqtt创建异常:{}", e.getMessage());return  false;}return true;
}public  boolean initMqttClient()  {MemoryPersistence memoryPersistence = new MemoryPersistence();try {mqttClient = new MqttClient(mqttDefaultProperties.getServerUrl(),mqttDefaultProperties.getDefaultClientId(),  memoryPersistence);} catch (MqttException e) {log.info("mqtt创建异常:{}", e.getMessage());return  false;}return true;}/*** 获取连接* @return*/public   boolean connect()  {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//当客户端会话关闭的时候  对应的broker也关闭mqttConnectOptions.setCleanSession(true);//自动重连mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(mqttDefaultProperties.getDefaultUserName());mqttConnectOptions.setPassword(mqttDefaultProperties.getDefaultPassword().toCharArray());mqttClient.setCallback(mqttCallback);try {mqttClient.connect(mqttConnectOptions);} catch (MqttException e) {log.info("客户端连接异常:{}",e.getMessage());return  false;}return true;}public   boolean connect(String username,String password)  {if(Objects.isNull(username)){username=mqttDefaultProperties.getDefaultUserName();}if(Objects.isNull(password)){password= mqttDefaultProperties.getDefaultPassword();}MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//当客户端会话关闭的时候  对应的broker也关闭mqttConnectOptions.setCleanSession(true);//自动重连mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttClient.setCallback(mqttCallback);try {mqttClient.connect(mqttConnectOptions);} catch (MqttException e) {log.info("客户端连接异常:{}",e.getMessage());return  false;}return true;
}/*** 断开连接* @return*/public  boolean disConnect(){try {mqttClient.disconnect();} catch (MqttException e) {log.info("客户端断开连接异常:{}",e.getMessage());return  false;}
return true;
}/**** @param topic 主题* @param msg 消息内容* @param qosEnum* @param retain  新的订阅者来了是否能拿到之前的 最新的一次消息* @return*/public  boolean publish(String topic, String msg, QosEnum qosEnum,boolean retain){int uniqueInt = (int) (System.nanoTime() & 0xFFFFFFFFL);//取纳秒时间戳低32位MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload(msg.getBytes());mqttMessage.setQos(qosEnum.getType());mqttMessage.setRetained(retain);mqttMessage.setId(uniqueInt);try {mqttClient.publish(topic,mqttMessage);} catch (MqttException e) {log.info("客户端发送消息失败:{}",e.getMessage());return  false;}return  true;}/**** @param topicFilters 要订阅的主题 例子 testtopic/#* @param qosEnum* @return*/public boolean subscribe(String topicFilters,QosEnum qosEnum){try {mqttClient.subscribe(topicFilters,qosEnum.getType());} catch (MqttException e) {log.info("订阅主题失败:{}",e.getMessage());return  false;}return true;}
public boolean unSubscribe(String topicFilter){try {mqttClient.unsubscribe( topicFilter);} catch (MqttException e) {log.info("取消订阅主题失败:{}",e.getMessage());return false;}return  true;
}}

我们着重关注的是

我们想连接 服务端 是不是得有 一个client ,那这个client就对应IMqttclient

,我们java应用客户端连接上服务端之后,是不是得订阅主题,订阅之后的逻辑在哪里,就在

IMessageCallbackImpl

这里面就是 书写的 客户端收到服务端发来的消息之后的处理情况

java">@Slf4j
@Component
public class IMessageCallbackImpl  implements MessageCallback {@Overridepublic void connectionLost(Throwable cause) {//丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连 或者记录 日志之类的log.info("丢失了对broker的连接");}/*** 订阅到消息后的回调* 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker* 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的消息都将由broker服务器再次发送到客户端* @param topic* @param message* @throws Exception*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {log.info("订阅到了消息;topic={},messageid={},qos={},msg={}",topic,message.getId(),message.getQos(),new String(message.getPayload()));}/*** 消息发布完成且收到ack确认后的回调* QoS0:消息被服务端发出后触发一次* QoS1:当收到broker的PUBACK消息后触发* QoS2:当收到broer的PUBCOMP消息后触发* @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {int messageId = token.getMessageId();String[] topics = token.getTopics();log.info("消息发送完成,messageId={},topics={}",messageId,topics);}
}

 

我们用一个 bean 在初始化的时候就订阅一个主题,这样 只要有 客户端往主题上发消息,我们就能收到了

而我们这个时候 没有硬件,怎么办呢,很简单,直接下载一个mqttx 模拟硬件发送消息到主题,启动springboot,就能看到消息的发送与接收了

当然 这实现的紧紧是最简单的协议的发送接收,后面还有许多的高级功能等我们使用,具体的可以查阅官方文档


http://www.ppmy.cn/embedded/170149.html

相关文章

CentOS7 安装Redis 6.2.6 详细教程

本文主要介绍CentOS7系统下安装Redis6.2.6的详细教程。 1.安装依赖 redis是基于C语言开发,因此想要在服务器上运行redis需要验证是否安装了gcc,没有安装gcc则需先安装 查看是否安装gcc gcc -v如果没有安装gcc,则通过如下命令安装 yum in…

每天练打字17:连续两天赛文速度突破100,今日赛文速度83.01

今日跟打:1932字 总跟打:221584字 记录天数:2602天 (实际没有这么多天,这个是注册账号的天数) 平均每天:85字 今日赛文首打速度:83.01 上周定的目标是:练习常用字前500&…

【封闭式】论文写作技巧--集中学习+集中写作

学术论文写作是许多科研人员、研究生以及青年学者都会面临的重要挑战。从选题的确定到创新点的挖掘,再到最终成稿,每一步都需要逻辑清晰、方法科学和语言精准。然而,繁重的科研任务和有限的指导资源让许多人在论文写作过程中感到迷茫。为了解…

abseil-cpp:环境搭建

参考: https://abseil.io/docs/cpp/quickstart-cmake abseil-cpp.git/dd4c89b abseil-cpp.git/20240722.1 1. clone代码仓库、编译 git clone https://github.com/abseil/abseil-cpp.git /app/abseil-cpp/ #/app/abseil-cpp/.git/config git checkout 20240722.1git rev-pa…

在笔记本电脑上用DeepSeek搭建个人知识库

最近DeepSeek爆火,试用DeepSeek的企业和个人越来越多。最常见的应用场景就是知识库和知识问答。所以本人也试用了一下,在笔记本电脑上部署DeepSeek并使用开源工具搭建一套知识库,实现完全在本地环境下使用本地文档搭建个人知识库。操作过程共…

el-table 手动选择展示列

需求: 由于表格的列过多,用滚动条进行滚动对比数据不方便,所以提出,手动选择展示列 实现思路: 表格默认展示所有字段,每个字段通过 v-if 属性来进行判断是否显示;点击设置按钮图标(表格右上角&#xff0…

Android NDK图像处理技术指南

Android NDK在图像处理方面的应用。首先,我得确认他们对NDK的基础知识是否了解。可能他们有一定的Android开发经验,但不太熟悉NDK部分。所以,我应该先简要介绍NDK是什么,以及为什么在图像处理中使用它,比如性能优势&am…

力扣-动态规划-72 编辑距离

思路 dp数组定义:0_i-1的word1转换成0_j-1的word2需要的最小操作步数为dp[i][j]递推公式: if(word1[i-1] word2[j-1]){dp[i][j] dp[i-1][j-1]; }else{// 删除 插入 替换dp[i][j] min(dp[i-1][j] 1, min(dp[i-1][j-1] 1, dp[i][j-1] 1)); } i…