04 - 尚硅谷 - MQTT 客户端编程

devtools/2024/11/24 18:32:32/

1.在Java中使用MQTT

1.1 Eclipse Paho Java Client

具体步骤:

1、创建一个Spring Boot项目,添加如下依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version>
</parent><dependencies><!-- spring boot整合junit单元测试的起步依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- mqtt java客户端依赖 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency></dependencies>

2、建立连接代码实现

java">@Test
public void createConnection() throws MqttException {// 定义链接相关参数String broker = "tcp://localhost:1883";String username = "zhangsan";String password = "123";String clientid = "mqtt_java_client_01";// 创建MqttJava客户端对象// MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());   MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);// 阻塞当前线程while (true) ;
}

3、发布消息代码演示

java">@Test
public void sendMessage() throws MqttException {// 定义链接相关参数String broker = "tcp://localhost:1883";String username = "zhangsan";String password = "123";String clientid = "mqtt_java_client_01";// 创建MqttJava客户端对象MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);// 创建消息对象QoSString content = "hello mqtt";MqttMessage message = new MqttMessage(content.getBytes());message.setQos(2);message.setRetained(true);// 发送消息client.publish("a/c" , message);// 关闭链接释放资源client.disconnect();client.close();}

4、订阅主题获取消息

java">@Test
public void receiveMessage() throws MqttException {// 定义链接相关参数String broker = "tcp://localhost:1883";String username = "zhangsan";String password = "123";String clientid = "mqtt_java_client_02";// 创建MqttJava客户端对象MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());// 添加回调函数获取主题消息client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {  // 连接丢失时被调用System.out.println("connectionLost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {  // 接收到消息时被调用System.out.println("topic: " + topic);System.out.println("Qos: " + message.getQos());System.out.println("message content: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {  // 消息接收完成时被调用System.out.println("deliveryComplete---------" + token.isComplete());}});// 订阅主题client.connect(options);client.subscribe("a/d" , 2);while(true) ;}

1.2 spring-integration-mqtt

基础环境搭建

1、创建一个Spring Boot项目,并加入如下依赖:

<dependencies><!-- spring boot项目web开发的起步依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring boot项目集成消息中间件基础依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><!-- spring boot项目和mqtt客户端集成起步依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version></dependency><!-- lombok依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- fastjson依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies>

2、编写启动类

java">@EnableConfigurationProperties(value = MqttConfigurationProperties.class)
@SpringBootApplication
public class MqttDemoApplication {public static void main(String[] args) {SpringApplication.run(MqttDemoApplication.class , args) ;}}

3、在application.yml文件中添加如下配置

java">spring:mqtt:username: zhangsanpassword: 123url: tcp://localhost:1883subClientId: sub_client_id_123subTopic: atguigu/iot/lamp/linepubClientId: pub_client_id_123

4、创建实体类读取自定义配置

java">@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId ;private String subTopic ;private String pubClientId ;}

5、创建配置类配置链接工厂

java">@Configuration
public class MqttConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties ;@Beanpublic MqttPahoClientFactory mqttClientFactory(){// 创建客户端工厂DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();// 创建MqttConnectOptions对象MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});factory.setConnectionOptions(options);// 返回return factory;}}

订阅主题获取消息

具体步骤:

1、配置入站适配器

java">@Configuration
public class MqttInboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties ;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;/*** 配置消息传输通道* @return*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置入站适配器*/@Beanpublic MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter  =new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl() ,mqttConfigurationProperties.getSubClientId() ,mqttPahoClientFactory , mqttConfigurationProperties.getSubTopic().split(",")) ;adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter ;}/*** 配置入站消息处理器* @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler messageHandler() {return this.receiverMessageHandler ;}}

2、定义监听主题消息的处理器

java">@Component
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String receivedTopicName = (String) headers.get("mqtt_receivedTopic");if("atguigu/iot/lamp/line".equals(receivedTopicName)) {System.out.println("接收到消息:" + message.getPayload());}}}

测试:通过MQTTX向atguigu/iot/lamp/line主题发送消息

向指定主题发送消息

具体步骤:

1、配置出站消息处理器

java">@Configuration
public class MqttOutboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties ;@Autowiredprivate MqttPahoClientFactory pahoClientFactory ;@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler mqttOutboundMassageHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl() ,mqttConfigurationProperties.getPubClientId() , pahoClientFactory ) ;messageHandler.setAsync(true);messageHandler.setDefaultQos(0);messageHandler.setDefaultTopic("default");return messageHandler ;}}

2、定义发送消息的网关接口

java">@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 发送mqtt消息* @param topic 主题* @param payload 内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送包含qos的消息* @param topic 主题* @param qos 对消息处理的几种机制。*          * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*          * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*          * 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

3、定义发送消息的服务类

java">@Component
@AllArgsConstructor
public class MqttMessageSender {private MqttGateway mqttGateway;/*** 发送mqtt消息* @param topic 主题* @param message 内容*/public void send(String topic, String message) {mqttGateway.sendToMqtt(topic, message);}/*** 发送包含qos的消息* @param topic 主题* @param qos 质量* @param message 消息体*/public void send(String topic, int qos, byte[] message){mqttGateway.sendToMqtt(topic, qos, message);}
}

3.智能灯泡案例

需求:

1、智能灯泡设备上线以后向MQTT服务端发送消息,后端服务从MQTT中获取消息记录设备信息到数据库中

2、后端微服务向MQTT服务端发送开灯或者关灯消息,设备端从MQTT中获取消息控制灯泡的开和关

3、设备端对灯泡进行开和关操作的时候向MQTT中发送消息,后端服务获取MQTT消息记录灯泡的开关状态

3.1 环境准备

具体步骤:

1、创建对应的数据库表

-- 智能灯泡设备表
CREATE TABLE `tb_lamp` (`id` bigint NOT NULL AUTO_INCREMENT,`deviceId` varchar(50) DEFAULT NULL,`status` int DEFAULT NULL COMMENT '1:上线  0:下线',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ,`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- 智能灯泡设备状态表
CREATE TABLE `tb_lamp_status` (`id` int NOT NULL AUTO_INCREMENT,`deviceId` varchar(50) DEFAULT NULL,`status` int DEFAULT NULL COMMENT '0: 关灯   1:开灯',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2、在spring-integration-mqtt案例中加入如下依赖

<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version>
</dependency>

3、在application.yml文件中加入如下依赖

java">spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.136.147:3306/lamp_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: 1234mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmap-underscore-to-camel-case: truemapper-locations: classpath*:mapper/*Mapper.xml

4、通过mybatis的逆向工程生成tb_lamp和tb_lamp_status表对应的基础代码

5、在启动类上添加@MapperScan注解指定Mapper接口的包路径

3.2 服务端获取设备上线消息

接口说明

接口一:设备上线

当终端设备连接上EMQX以后,发送上线消息到EMQX服务端,说明如下:

主题: atguigu/iot/lamp/line
消息内容:{"deviceId": "xxxxxx","online": 1}
数据说明:deviceId: 设备idonline:   上线状态,1表示上线,0表示离线

业务代码

ReceiverMessageHandler类的代码进行如下改造:

java">@Component
public class ReceiverMessageHandler implements MessageHandler {@Autowiredprivate TbLampService tbLampService ;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String receivedTopicName = (String) headers.get("mqtt_receivedTopic");if("atguigu/iot/lamp/line".equals(receivedTopicName)) {tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态}}}

TbLampServiceImpl类的代码进行如下改造:

java">@Service
public class TbLampServiceImpl extends ServiceImpl<TbLampMapper, TbLamp> implements TbLampService {@Overridepublic void updateLampOnlineStatus(String jsonInfo) {// 解析消息获取设备id和上线状态Map<String ,  Object> map = JSON.parseObject(jsonInfo, Map.class);String deviceId = map.get("deviceId").toString();Integer status = Integer.parseInt(map.get("online").toString());// 根据设备的id查询设备数据LambdaQueryWrapper<TbLamp> lambdaQueryWrapper = new LambdaQueryWrapper<>() ;lambdaQueryWrapper.eq(TbLamp::getDeviceid , deviceId) ;TbLamp tbLamp = this.getOne(lambdaQueryWrapper);if(tbLamp == null) {        // 设备不存在,新增设备tbLamp = new TbLamp() ;tbLamp.setDeviceid(deviceId);tbLamp.setStatus(status);this.save(tbLamp) ;}else {     // 设备已经存在,修改设备的状态tbLamp.setStatus(status);tbLamp.setUpdateTime(new Date());this.updateById(tbLamp) ;}}}

3.3 服务端发送关灯开灯消息到MQTT

接口说明

接口三:后端发送消息控制智能灯泡开关

后端可以发送控制灯泡状态消息到EMQX中,设备端监听指定主题获取消息,控制灯泡的开关状态,说明如下:

主题: atguigu/iot/lamp/server/status
消息内容:{"deviceId": "xxxxxx","status": 0}
数据说明:		status:	0:关灯   , 1:开灯

业务代码

java">@RestController
@RequestMapping(value = "/api/lamp")
public class LampApiController {@Autowiredprivate MqttMessageSender mqttMessageSender;@GetMapping(value = "/{deviceId}/{status}")public String sendStatusLampMsg(@PathVariable(value = "deviceId") String deviceId , @PathVariable(value = "status") Integer status) {Map<String , Object> map = new HashMap<>() ;map.put("deviceId" , deviceId) ;map.put("status" , status) ;String json = JSON.toJSONString(map);mqttMessageSender.send("atguigu/iot/lamp/server/status" , json);return "ok" ;}}

3.4 服务端获取设备开灯关灯消息

接口说明

接口四:设备端改变智能灯泡开关的状态,状态发给给后端,后端记录状态

主题:atguigu/iot/lamp/device/status
消息内容:{"deviceId": "xxxxx"  "status": 0}
数据说明:	deviceId:设备idstatus:0:关灯   , 1:开灯

业务代码

java">@Override
public void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String receivedTopicName = (String) headers.get("mqtt_receivedTopic");if("atguigu/iot/lamp/line".equals(receivedTopicName)) {tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态}else if("atguigu/iot/lamp/device/status".equals(receivedTopicName)) {tbLampStatusService.saveDeviceStatus(message.getPayload().toString()) ;}
}
java">@Service
public class TbLampStatusServiceImpl extends ServiceImpl<TbLampStatusMapper, TbLampStatus> implements TbLampStatusService {@Overridepublic void saveDeviceStatus(String json) {// 获取消息内容Map<String , Object> map = JSON.parseObject(json, Map.class);String deviceId = map.get("deviceId").toString();Integer status = Integer.parseInt(map.get("status").toString());// 创建对象封装消息TbLampStatus tbLampStatus = new TbLampStatus() ;tbLampStatus.setDeviceid(deviceId);tbLampStatus.setStatus(status);this.save(tbLampStatus) ;}}


http://www.ppmy.cn/devtools/136607.html

相关文章

想做一个类似于东郊到家这样的预约上门小程序,app也行,这个现在好不好运营?

传统推拿按摩服务通常需要顾客亲自到店接受技师的治疗&#xff0c;这就不可避免地涉及到门店租赁和技师聘请的费用。每年&#xff0c;这些费用加起来可能高达数十万&#xff0c;对于许多小型推拿店来说&#xff0c;这无疑是一个沉重的负担。 并且&#xff0c;传统推拿按摩的获客…

Unity Shader常见函数 内置Built-in/URP等效函数

简介&#xff1a; Unity Shader的URP中的函数与Built-in中的是不一样的&#xff0c;升级URP之后&#xff0c;基本都提供了平替的函数 Built-in 内置渲染管线函数URP 通用渲染函数TRANSFORM_TEX(uv, textureName)TRANSFORM_TEX(uv, textureName)tex2D, tex2Dlod, 等SAMPLE_TEXT…

VMware ubuntu创建共享文件夹与Windows互传文件

1.如图1所示&#xff0c;点击虚拟机&#xff0c;点击设置&#xff1b; 图1 2.如图2所示&#xff0c;点击选项&#xff0c;点击共享文件夹&#xff0c;如图3所示&#xff0c;点击总是启用&#xff0c;点击添加&#xff1b; 图2 图3 3.如图4所示&#xff0c;出现命名共享文件夹…

Python内置数据结构:列表篇:【】,list函数创建。列表创建常见遇到问题,索引,列表增删改查,常用函数,while,for进行列表遍历,列表推导式

介绍&#xff1a; 列表元组字典集合 列表&#xff1a; 方括号【】和list函数是列表最常用的两种定义方式。 我们暂且称列表内的东西为元素&#xff0c;列表内的元素以逗号分隔开。列表的初始化&#xff1a;空列表&#xff0c;有数值是列表的两种初始化情况。 使用方括号创建…

开发工具 - VSCode 快捷键

以下是一些常用的 VS Code 快捷键&#xff08;Windows、macOS 和 Linux 均适用&#xff0c;略有不同&#xff09;&#xff1a; 常用快捷键 功能Windows/LinuxmacOS打开命令面板Ctrl Shift P 或 F1Cmd Shift P打开文件Ctrl OCmd O保存文件Ctrl SCmd S全部保存Ctrl K,…

【什么是RabbitMQ】

RabbitMQ&#xff1a;可靠、灵活的消息中间件 在当今的分布式系统和微服务架构中&#xff0c;消息中间件扮演着至关重要的角色。RabbitMQ&#xff0c;作为一款开源的消息代理软件&#xff0c;以其可靠性、灵活性、可扩展性和多语言支持等特点&#xff0c;在众多消息队列系统中…

java-贪心算法

1. 霍夫曼编码&#xff08;Huffman Coding&#xff09; 描述&#xff1a; 霍夫曼编码是一种使用变长编码表对数据进行编码的算法&#xff0c;由David A. Huffman在1952年发明。它是一种贪心算法&#xff0c;用于数据压缩。霍夫曼编码通过构建一个二叉树&#xff08;霍夫曼树&a…

每日小练:Day6

1.简写单词 题目链接&#xff1a;1037-简写单词_牛客竞赛语法入门班数组字符串习题 每次取出输出的首字母&#xff0c;如果首字母是小写&#xff0c;则将其变成大写输出&#xff0c;如果是大写&#xff0c;则直接输出 import java.util.*;public class Main{public static vo…