android接入rocketmq

news/2025/3/5 5:34:17/

一 前言

RocketMQ 作为一个功能强大的消息队列系统,不仅支持基本的消息发布与订阅,还提供了顺序消息、延时消息、事务消息等高级功能,适应了复杂的分布式系统需求。其高可用性架构、多副本机制、完善的运维管理工具,以及安全控制功能,使其成为企业级应用的首选消息中间件。
在Android应用中,你可以使用RocketMQ的客户端库来发送和接收消息.

二 接入流程

1 添加依赖

在Android项目的build.gradle文件中添加RocketMQ客户端库的依赖。

dependencies {implementation 'org.apache.rocketmq:rocketmq-client:5.3.1'
}
2 添加权限
<uses-permission android:name="android.permission.INTERNET" />
3 接收消息
ExecutorService executor = Executors.newFixedThreadPool(20);  //根据项目需要设置常用线程个数
String TAG = "MainActivity";
String GROUP = "producer";
String ADDRESS = "192.168.1.84:9876";
String KEY = "key";executor.submit(() -> {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);consumer.setNamesrvAddr(ADDRESS);// 订阅 topic 下的全部 tabconsumer.subscribe(TOPIC, "*");// BROADCASTING:广播模式,把消息发给了所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组, CLUSTERING:集群模式(默认值),每一条消息只会被同一个消费者组中的一个实例消费consumer.setMessageModel(MessageModel.CLUSTERING);// CONSUME_FROM_LAST_OFFSET:从最新的偏移值开始消费(默认值), CONSUME_FROM_FIRST_OFFSET:从队列最开始的偏移值开始消费, CONSUME_FROM_TIMESTAMP:从指定的时间戳处开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// yyyyMMddHHmmss: 当选择从指定的时间戳处开始消费时, 需要指定该时间戳// consumer.setConsumeTimestamp("");// 使用并发方式从多个MessageQueue中取数据的方式监听consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println();for (MessageExt msg : msgs) {Log.e(TAG,"收到消息:"+new String(msg.getBody()));}// 返回消费成功, 还可以是 RECONSUME_LATER:稍后重新消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();TimeUnit.DAYS.sleep(1);} catch (Throwable cause) {cause.printStackTrace();}});
4 发送消息
 executor.submit(() -> {try {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();// 同步传递消息,消息会发给集群中的一个Broker节点。Message message = new Message(TOPIC, TAG, KEY, "android  hello word ss".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult result = producer.send(message);Log.e(TAG,"发送消息结果:result:"+ JSON.toJSONString(result));producer.shutdown();} catch (Exception e) {Log.e(TAG,"发送失败:"+e.getCause().toString());e.printStackTrace();}});

三 问题

启动项目,点击发送消息,项目报了异常信息,如下

java.lang.NoClassDefFoundError: Failed resolution of: Ljava/lang/management/ManagementFactory;  报错

这是因为RocketMQ客户端库依赖于Java标准库中的 java.lang.management.ManagementFactory 类,而Android并不完全支持Java标准库,尤其是 java.lang.management 包。

RocketMQ官方没有专门为Android提供适配版本,所以可以尝试使用这些版本,或者自己修改RocketMQ源码,移除对 ManagementFactory 的依赖。

四 修改源码

在github中,把rocketmq-client源码下载到本地
https://github.com/apache/rocketmq
在这里插入图片描述

导入到本地如下
在这里插入图片描述
然后找到前面ManagementFactory 报错的地方,将它移除或者用其他方法代替,经排查在
org.apache.rocketmq.common.UtilAll 有相关的引用
在这里插入图片描述
该方法则是为了通过获取jvm的进程ID,这边我们可以把它注释掉,然后用个固定值代替试下

 static {HEX_ARRAY = "0123456789ABCDEF".toCharArray();/* Supplier<Integer> supplier = () -> {// format: "pid@hostname"String currentJVM = ManagementFactory.getRuntimeMXBean().getName();try {return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf('@')));} catch (Exception e) {return -1;}};PID = supplier.get();*/PID = 888888;}

以及在org.apache.rocketmq.common.MixAll也有ManagementFactory相关引用,这个作用是获取当前java虚拟机(JVM)的进程ID,可以将其注释,然后返回固定的结果

public static long getPID() {String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();if (StringUtils.isNotEmpty(processName)) {try {return Long.parseLong(processName.split("@")[0]);} catch (Exception e) {return 0;}}return 0;}

最后还有一个地方有涉及到,在包路径org.apache.rocketmq.store.StoreUtil,其作用是为了获取当前机器的总物理内存大小(以字节为单位)

 public static long getTotalPhysicalMemorySize() {long physicalTotal = 1024 * 1024 * 1024 * 24L;OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();}return physicalTotal;}

将相关的包修改后,然后将其重新打包,在maven工具下,选择rocketmq-common,选择Plugins下的jar组件,选中下面的jar进行打包
在这里插入图片描述
打包完成后,在模块的target目录下生成jar包
在这里插入图片描述
android需要用到的包如下:

implementation files('libs\\rocketmq-remoting-5.3.1.jar')
implementation files('libs\\rocketmq-client-5.3.1.jar')
implementation files('libs\\rocketmq-common-5.3.1.jar')implementation 'io.github.aliyunmq:rocketmq-logback-classic:1.0.1'
implementation 'com.google.guava:guava:31.1-jre'
implementation 'commons-validator:commons-validator:1.7'

将模块rocketmq-remoting,rocketmq-client,rocketmq-commo三个模块重新打包后导入,然后再加上下面那三个相关联的依赖包.重新用android应用进行收发信息,测试如下:

2025-03-04 14:51:22.272 13676-13795/? E/MainActivity: 收到消息:android  hello word ss
2025-03-04 14:51:22.279 13676-13785/? E/MainActivity: 发送消息结果:result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"TopicTestLss"},"msgId":"C10005FD90380CA347BF12A326F00000","offsetMsgId":"C2000B5400002A9F000000000007A575","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true,"transactionId":"C10005FD90380CA347BF12A326F00000"}

http://www.ppmy.cn/news/1576742.html

相关文章

关于常规模式下运行VScode无法正确执行“pwsh”问题

前言&#xff1a; pwsh在系统环境中正确配置&#xff0c;且可以运行在cmd&#xff0c; powshell&#xff08;5.1&#xff09;--- 都需要在管理员权限下运行 &#xff08;打开setting&#xff09; 打开setting.json &#xff08;在vscode中添加 powershell 7 路径&…

字节旗下两款AI编程工具

Trae 和 MarsCode 是字节跳动推出的两款 AI 编程工具,旨在通过人工智能技术提升开发效率和质量。以下是它们的详细介绍: 1 Trae Trae 是字节跳动于 2025 年 1 月推出的 AI 原生集成开发环境(IDE),由旗下新加坡公司 SPRING PTE 开发。它主打“用自然语言生成代码”,具备…

贪心算法+题目

贪心算法 跳跃游戏跳跃游戏2 跳跃游戏 题目 拿到题目就暴力穷举&#xff0c;我用的是dfs&#xff0c;加上备忘录之后还是超出时间限制。就考虑一下贪心算法。你想 我在[0,n-2]位置遍历求出可以跳跃的最远距离&#xff0c;用farthest更新最大值&#xff0c;如果>终点就返回t…

Ubuntu 下 nginx-1.24.0 源码分析 - ngx_conf_read_token

ngx_conf_read_token 定义在src\core\ngx_conf_file.c static ngx_int_t ngx_conf_read_token(ngx_conf_t *cf) {u_char *start, ch, *src, *dst;off_t file_size;size_t len;ssize_t n, size;ngx_uint_t found, need_space, last_space, sharp_comm…

7.1.2 计算机网络的分类

文章目录 分布范围交换方式 分布范围 计算机网络按照分布范围可分为局域网、广域网、城域网。局域网的范围在10m~1km&#xff0c;例如校园网&#xff0c;网速高&#xff0c;主要用于共享网络资源&#xff0c;拓扑结构简单&#xff0c;约束少。广域网的范围在100km&#xff0c;例…

Windows10 Xming6 + Xshell7 实现远程 ubuntu-24.04.1-desktop gui 界面本地展示

Windows10 Xming6 + Xshell7 实现远程 ubuntu-24.04.1-desktop gui 界面本地展示 1 运行环境2 实现思路3 XMing3.1 工具下载安装与配置3.2 XMing 配置3.3 XMing配置4 Xshell 7 配置5 远程机器配置6 测试1 运行环境 本地Windows系统:Windows10 专业版 远程Linux系统: ubuntu-…

AI本地化部署强劲驱动GPU,LPU等AI芯片和服务器的爆发性寻求

人工智能技术正在经历从云端到边缘的深刻变革&#xff0c;这一转变催生了AI基础设施建设的全新需求。在数据安全、实时响应和成本优化的多重驱动下&#xff0c;AI本地化部署已成为不可逆转的趋势&#xff0c;推动GPU、LPU等AI芯片及配套服务器系统进入爆发性增长周期。这场变革…

【leetcode hot 100 560】和为K的子数组

解法一&#xff1a;用左右指针寻找字串&#xff0c;如果和>k&#xff0c;则减少一个数&#xff08;left&#xff09;&#xff1b;如果和<k&#xff0c;则加上一个数&#xff08;right&#xff09;。 class Solution {public int subarraySum(int[] nums, int k) {int nu…