Java实现Mqtt收发消息

news/2024/11/22 19:59:44/

Java实现Mqtt收发消息

文章目录

  • Java实现Mqtt收发消息
    • windows mqtt 平台服务搭建
    • mqtt 客户端工具:mqttbox
    • 整体代码结构
    • mqtt基础参数配置类
    • mqtt客户端连接
    • mqtt接收的消息处理类
      • 对应的MqttService注解和MqttTopic注解
    • MqttGateway 发送消息
    • 指定topic接收处理方法

java实现mqtt对消息的交互,mqtt 的topic主题概念是相互的,这个要先理解好,
发布者和订阅者是对等的,它们之间可以相互发送消息,而不需要建立任何连接或状态
使用到windows mqtt 平台服务搭建(不是必须安装,仅 windows 测试需要此步骤)
mqtt 客户端工具:mqttbox
废话不多说,直接上代码,上工具,准备工作先做好,以及我的实现过程

windows mqtt 平台服务搭建

下载apache-apollo-1.7.1-windows版本,这里提供一个链接地址
http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/提供一个现有教程:
https://blog.csdn.net/qq_42315062/article/details/125890181
搭建完成后:登录 http://127.0.0.1:61680 即可,默认账号 admin,密码 password,
注意 这里网页的端口是 61680,但是 mqtt 服务的端口是 61613

mqtt 客户端工具:mqttbox

这里提供一个下载地方,也可以自行下载
https://download.csdn.net/download/qq_39671088/85740566?utm_medium=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&depth_1-utm_source=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&spm=1003.2020.3001.6616.1

在这里插入图片描述

整体代码结构

在这里插入图片描述

mqtt基础参数配置类

在这里插入图片描述

@Data
@Component
@ConfigurationProperties("mqtt")
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 进-客户Id*/private String inClientId;/*** 出-客户Id*/private String outClientId;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** 是否清除session*/private boolean clearSession;
}

mqtt客户端连接

import com.bsj.boyun.core.tool.utils.ExceptionUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttMessageHandle mqttMessageHandle;/*** 出站通道,MqttGateway类*/private static String outboundChannel = "mqttOutboundChannel";/*** Mqtt 客户端工厂 所有客户端从这里产生** @return*/@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() throws MqttException {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();try {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);} catch (Exception e) {System.out.println("mqtt初始化连接异常:" + ExceptionUtil.getStackStr(e));}return factory;}/*** Mqtt 管道适配器** @param factory* @return*/@Beanpublic MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory) {return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(), factory, mqttProperties.getDefaultTopic().split(","));}/*** 消息消费者 (接收,处理来自mqtt的消息)** @param adapter* @return*/@Beanpublic IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {adapter.setCompletionTimeout(5000);adapter.setQos(1);//默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的//这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channelreturn IntegrationFlows.from(adapter).channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())).handle(mqttMessageHandle).get();}@Beanpublic ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 最大可创建的线程数int maxPoolSize = 200;executor.setMaxPoolSize(maxPoolSize);// 核心线程池大小int corePoolSize = 50;executor.setCorePoolSize(corePoolSize);// 队列最大长度int queueCapacity = 1000;executor.setQueueCapacity(queueCapacity);// 线程池维护线程所允许的空闲时间int keepAliveSeconds = 300;executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}/*** 出站处理器 (向 mqtt 发送消息 生产者)** @param factory* @return*/@Beanpublic IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(), factory);handler.setAsync(true);handler.setConverter(new DefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return IntegrationFlows.from(outboundChannel).handle(handler).get();}}

mqtt接收的消息处理类

import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttService;
import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttTopic;
import com.bsj.studentcard.gateway.attendance.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;/*** 所有mqtt到达的消息都会在这里处理* 参考MVC @RequestMapping的方式* 使用注解映射到专门的Topic去处理(MqttTopicHandle类),不写 if else**/
@Component
@Slf4j
public class MqttMessageHandle implements MessageHandler {/*** 包含 @MqttService注解 的类(Component)*/public static Map<String, Object> mqttServices;/*** 所有mqtt到达的消息都会在这里处理* 要注意这个方法是在线程池里面运行的** @param message message*/@Overridepublic void handleMessage(Message<?> message) throws MessagingException {getMqttTopicService(message);}/*** 获取有@MqttService 的类,专门处理topic消息的类** @return*/public Map<String, Object> getMqttServices() {if (mqttServices == null) {mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);}return mqttServices;}/*** topic 匹配** @param message*/public void getMqttTopicService(Message<?> message) {// 在这里 我们根据不同的 主题 分发不同的消息String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);if (receivedTopic == null || "".equals(receivedTopic)) {return;}for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) {// 把所有带有 @MqttService 的类遍历Class<?> clazz = entry.getValue().getClass();// 获取他所有方法Method[] methods = clazz.getDeclaredMethods();for (Method method : methods) {if (method.isAnnotationPresent(MqttTopic.class)) {// 如果这个方法有 这个注解MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);if (isMatch(receivedTopic, handleTopic.value())) {// 并且 这个 topic 匹配成功try {method.invoke(SpringUtils.getBean(clazz), message);return;} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {log.error("执行 {} 方法出现错误", handleTopic.value(), e);}}}}}}/*** mqtt 订阅的主题与我实际的主题是否匹配** @param topic   是实际的主题* @param pattern 是我订阅的主题 可以是通配符模式* @return 是否匹配*/public static boolean isMatch(String topic, String pattern) {if ((topic == null) || (pattern == null)) {return false;}if (topic.equals(pattern)) {// 完全相等是肯定匹配的return true;}if ("#".equals(pattern)) {// # 号代表所有主题  肯定匹配的return true;}String[] splitTopic = topic.split("/");String[] splitPattern = pattern.split("/");boolean match = true;// 如果包含 # 则只需要判断 # 前面的for (int i = 0; i < splitPattern.length; i++) {if (!"#".equals(splitPattern[i])) {// 不是# 号 正常判断if (i >= splitTopic.length) {// 此时长度不相等 不匹配match = false;break;}if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) {// 不相等 且不等于 +match = false;break;}} else {// 是# 号  肯定匹配的break;}}return match;}
}

对应的MqttService注解和MqttTopic注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;import java.lang.annotation.*;/*** 自定义注解:消息处理类*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {@AliasFor(annotation = Component.class)String value() default "";
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 自定义注解:topic处理方法*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {/*** 主题名字*/String value() default "";}

MqttGateway 发送消息

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** MqttGateway*/@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
}

指定topic接收处理方法

/*** MqttTopicHandle  指定topic消息处理*/
@MqttService
@Slf4j
@RequiredArgsConstructor
public class MqttTopicHandle {private final MqttGateway mqttGateway;/*** 上线通知** @param message*/@MqttTopic("mqtt/face/basic")public void basic(Message<?> message) throws MqttException {String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);String payload = (String) message.getPayload();log.info("接收到的topic为:{},内容:{}", receivedTopic, payload );// 要回复当前主题,不回复不需要处理mqttGateway.sendToMqtt(topic, 0, "收到消息!");}
}

http://www.ppmy.cn/news/242135.html

相关文章

vue 路由的hash和history模式的区别

Vue-Router有两种模式&#xff1a;hash模式和history模式。默认的路由模式是hash模式。 1. hash模式 简介&#xff1a; hash模式是开发中默认的模式&#xff0c;它的URL带着一个#&#xff0c;例如&#xff1a;www.weiwehao.com/#/flower&#xff0c;它的hash值就是#/flower。 …

ftp传输文件大小有限制吗 ftp文件传输工具有哪些

软件版本&#xff1a;xmanager 7 这两年&#xff0c;线上办公逐渐常态化&#xff0c;相信大家对ftp这个概念也比较熟悉了。ftp&#xff0c;即文件传输协议&#xff0c;线上办公就是用ftp软件进行文件传输的。那ftp传输文件大小有限制吗&#xff0c;ftp文件传输工具有哪些&…

达梦数据库读写分离集群异常测试(⾼可⽤)及双主(类似脑裂)问题处理

目录 测试前准备... 4 断电测试... 4 一、备库204断电... 4 二、断电数据新增测试... 5 1、备库204断电... 5 2、主库200新增数据&#xff0c;203备库查询正常... 5 3、204服务器启动并启动守护进程&#xff0c;测试&#xff0c;正常... 6 三、主库断电测试... 6 1、主…

集成灶公认10大品牌有哪些?从这三方面轻松辨别品牌是否靠谱

集成灶公认10大品牌有哪些&#xff1f;这是很多消费者面对鱼龙混杂的集成灶市场发出的疑问。在选择集成灶产品时&#xff0c;如果不知道如何下手&#xff0c;不妨从销售额、企业影响力和产品创新力这三方面来进行辨别&#xff0c;这样就能够轻松了解它们是不是真的称得上是全国…

干货 | 提高步进电机运行质量的电流控制方法

A双极性步进电机的基础知识 双极性步进电机包含两绕组&#xff0c;为了使电机运行平稳&#xff0c;不断的给这两个线圈加以相位差90度的正弦波&#xff0c;步进电机就开始转动起来。 通常&#xff0c;步进电机不是由模拟线性放大器驱动&#xff1b;而是由PWM电流调节驱动&…

RockChip:Boot Mode(二)

一:recovery (key) 在无usb接入情况下,通过按压volume-up键再上电进入recovery模式。 由于loader mode的优先级高于recovery模式,故需要断开usb连接。 * rockchip_dnl_key_pressed(): * * (1) volume-up key (default) * (2) menu key (If no rockusb) * * Its possible t…

接口测试-使用mock生产随机数据

在做接口测试的时候&#xff0c;有的接口需要进行大量的数据进行测试&#xff0c;还不能是重复的数据&#xff0c;这个时候就需要随机生产数据进行测试了。这里教导大家使用mock.js生成各种随机数据。 一、什么是mock.js mock.js是用于生成随*机数据&#xff0c;拦截 Ajax 请…

【spring源码系列-03】xml配置文件启动spring时refresh的前置工作

Spring源码系列整体栏目 内容链接地址【一】spring源码整体概述https://blog.csdn.net/zhenghuishengq/article/details/130940885【二】通过refresh方法剖析IOC的整体流程https://blog.csdn.net/zhenghuishengq/article/details/131003428【三】xml配置文件启动spring时refres…