阿里云RocketMQ消费MQTT消息

ops/2024/9/23 22:32:51/

业务背景:

        项目中涉及的消息队列既有RocketMQ,又有MQTT,均为阿里云提供(阿里云有专门的“微消息队列 MQTT 版”模块,但博主公司消息队列的实例都在“消息队列 RocketMQ 版”模块下,只是实例不同,猜测是做了适配,有清楚的大佬欢迎指点)。其中MQTT的消息是由硬件设备上报而来,由java服务进行消费,使用一套内部框架连接。现因框架存在适配问题,经讨论决定放弃使用,需改造原消费代码。

技术方案:

       查询资料与询问同事得知两种消息队列在底层逻辑上高度相似,可以用RocketMQ方式连接MQTT的topic并消费。在实际改造过程中发现二者均可通过com.aliyun.openservices.ons.api.ONSFactory#createConsumer这一阿里云官方接口进行连接。而项目中已有一套配置用于连接RocketMQ,两类消息队列的地址、密钥等实例信息并不相同。因此改造的重点其实就是实现RocketMQ多实例配置。

代码实现:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 自定义注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface OnsMqttMessageListener {String topic();String consumerGroup();}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;/*** 参数类*/
@Data
@Component
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "aliyun.rocketmq")
public class OnsMqttMessageProperties {private String onsAddr;private String accessKey;private String secretKey;private String groupId;}
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.geelytech.bms.annotation.OnsMqttMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Properties;/*** 配置类*/
@Slf4j
@Component
public class OnsMqttMessageConsumerConfig implements ApplicationListener<ApplicationReadyEvent> {@Autowiredprivate OnsMqttMessageProperties onsMqttMessageProperties;/*** 注入所有MessageListener实例*/@Autowiredprivate Map<String, MessageListener> messageListeners;@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {String onsAddr = onsMqttMessageProperties.getOnsAddr();String accessKey = onsMqttMessageProperties.getAccessKey();String secretKey = onsMqttMessageProperties.getSecretKey();log.info("ConsumerSubscriptionConfig onApplicationEvent messageListeners={}", messageListeners);// 订阅每个MessageListenerfor (Map.Entry<String, MessageListener> entry : messageListeners.entrySet()) {MessageListener listener = entry.getValue();OnsMqttMessageListener annotation = AnnotationUtils.findAnnotation(listener.getClass(), OnsMqttMessageListener.class);if (annotation != null) {String topic = annotation.topic();String consumerGroup = annotation.consumerGroup();Properties properties = new Properties();properties.setProperty(PropertyKeyConst.GROUP_ID, consumerGroup);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, onsAddr);properties.setProperty(PropertyKeyConst.AccessKey, accessKey);properties.setProperty(PropertyKeyConst.SecretKey, secretKey);Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(topic, "*", listener);consumer.start();}}}}
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.geelytech.bms.annotation.OnsMqttMessageListener;
import com.geelytech.satellite.constant.TopicConstant;
import com.geelytech.satellite.dto.eventOriginalData.BizSwapDone;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import static com.aliyun.openservices.ons.api.Action.CommitMessage;@Slf4j
@Component
@OnsMqttMessageListener(topic = "", consumerGroup = "")
public class BmsBizSwapDoneConsumer implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {// 业务逻辑return CommitMessage;}}

http://www.ppmy.cn/ops/29663.html

相关文章

【ZZULIOJ】1091: 童年生活二三事(多实例测试)(Java)

目录 题目描述 输入 输出 样例输入 Copy 样例输出 Copy code 题目描述 Redraiment小时候走路喜欢蹦蹦跳跳&#xff0c;他最喜欢在楼梯上跳来跳去。 但年幼的他一次只能走上一阶或者一下子蹦上两阶。 现在一共有N阶台阶&#xff0c;请你计算一下Redraiment从第0阶到第N阶…

UDP_INTRODUCTION_03:介绍 - 挂起的监听调用

测试目的&#xff1a; 验证当数据报到达一个没有挂起监听&#xff08;LISTEN&#xff09;调用的UDP端口时&#xff0c;UDP是否应该发送ICMP端口不可达&#xff08;Port Unreachable&#xff09;消息。 描述&#xff1a; 本测试用例旨在确保当数据报发送到DUT上一个未被监听的…

常见面试题:XSS和CSRF原理及防范方法

XSS和CSRF原理及防范方法 XSS 跨站脚本攻击 浏览器向服务器请求的时候被注入脚本攻击 类型恶意代码有效的位置插入点反射型URLHTML存储型服务端数据库HTML基于DOM服务端数据库/客户端存储/URL前端javascript 反射型XSS&#xff08;非持久性跨站脚本攻击&#xff09; 攻击方法…

git 子模块

git config -f .gitmodules submodule xxx xxx.git git submodule sync 删除&#xff1a; git submodule deinit <name_of_submodule> git rm -f <name_of_submodule> rm -rf .git/modules/<name_of_submodule> git commit -m “Deleted submodule xy” 重…

搜维尔科技:TechViz中的手指跟踪:触摸3D模型并与之交互

TechViz中的手指跟踪&#xff1a;触摸3D模型并与之交互 搜维尔科技&#xff1a;TechViz中的手指跟踪&#xff1a;触摸3D模型并与之交互

Debian下postgreSQL的安装和使用

Debian下postgreSQL的安装和使用 手动安装下载源码包解压源码包安装依赖编译安装创建用户组和用户创建数据目录配置环境变量切换用户并初始化数据库修改配置文件通过systemctl管理进入命令行并创建密码 分发包安装使用常见问题 手动安装 下载源码包 cd /usr/local wget -c h…

LSTM-KDE的长短期记忆神经网络结合核密度估计多变量回归区间预测(Matlab)

LSTM-KDE的长短期记忆神经网络结合核密度估计多变量回归区间预测&#xff08;Matlab&#xff09; 目录 LSTM-KDE的长短期记忆神经网络结合核密度估计多变量回归区间预测&#xff08;Matlab&#xff09;效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.LSTM-KDE的长短期…

Scala应用 —— JDBC的创建

文章目录 Scala应用 —— JDBC的创建前言一、JDBC的创建过程1.初始化连接1.1 配置驱动1.2 创建连接对象 2. 初始化执行器2.1 创建执行器对象2.2 初始化执行器参数 3. 执行操作并返回结果 二、Scala JDBC的基本设计思路1. 操作步骤设计2. 解决结果差异化3.实现jdbc方法并输出结果…