## 如何顺序处理设备上报的数据

news/2024/12/21 18:42:09/
1. 引言

随着智能技术的发展,市场上出现了很多的智能设备,其具有连接网络的能力。用户可以实现远程控制,并且设备也可上报自己的状态,实现云端对设备的运行情况分析。在某些情况下需要保证设备上报状态的有序性,例如传感器数据采集和处理,其中数据的顺序很重要,因为它们可能表示实时的物理过程或事件。为了能实现消息顺序消费,可以用什么办法实现呢?

2. 消息队列实现消息的顺序消费

顺序消息是RocketMQ提供的一种高级消息类型,消费者按照发送消息的顺序性去处理消息。即在进行设备状态上报的时候消息发送到消息队列里面时就要先保持有序,之后进行消息处理时才能获取有序的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传在这里插入图片描述

如上图所示,在设备端采集设备的状态,放到MQ进行存储起来,起到缓冲作用,之后再去消费消息。

因此,生产者进行发送消息时,就需要保持消息的有序性。

3. 状态上报的有序性

为保证状态推送的有效性,需要指定消息的关键字,其中设备ID是唯一标识设备,因此同一个设备上报的状态就被同一个队列中,这样就能保持统一设备的状态是有序的。

其代码示例如下:

public class Producer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 设置 NameServer 地址producer.setNamesrvAddr("localhost:9876");// 启动生产者实例producer.start();// 设置消息队列选择器producer.setQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 获取设备 IDString deviceId = (String) arg;// 计算队列索引int index = Math.abs(deviceId.hashCode()) % mqs.size();// 返回对应的消息队列return mqs.get(index);}});// 发送消息,模拟100个设备上报状态for (int i = 0; i < 100; i++) {// 构建消息体Message msg = new Message("device_status_topic", "device_status_tag", ("Device status " + i).getBytes());// 设置设备 ID 作为消息队列选择器的参数String deviceId = "device_" + i % 10;SendResult sendResult = producer.send(msg, deviceId);System.out.println(sendResult);}// 关闭生产者实例producer.shutdown();}
}

如上述的代码所示,首先创建了生产者,定义了MQ服务器地址,启动的生产者实例;接着以设备ID为参数,以取模的方式进行哈希计算,计算出该设备属于哪个队列,之后对应该设备ID的消息都会被发送到相同的队列。接着使用for循环,模拟100个设备上报,首先生成消息,然后使用send方法进行推送到指定的队列。

4. 状态消费的有序性

在消息生产的时候保持了有序性,为了实现消息消费的有序性,消费消息时需要严格按照接收—处理—应答的语义处理消息。

其代码示例如下:

public class Consumer {public static void main(String[] args) throws Exception {// 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 设置 NameServer 地址consumer.setNamesrvAddr("localhost:9876");// 订阅主题和标签consumer.subscribe("device_status_topic", "device_status_tag");// 注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 遍历消息列表for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}// 返回消费状态return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例consumer.start();System.out.println("Consumer started");// 等待一段时间后关闭消费者实例Thread.sleep(60000);consumer.shutdown();}
}

在代码中,创建了消费组实例,设置了MQ服务的地址,订阅了指定的主题和标签。其中较为重要的一步是注册消息监听器,在该方法中,使用了顺序消费,即在队列里面的消息会根据存储的先后顺序推送给消费者,进行消费。如果消费不成功,会重新将该消息放到队列的头部,重新推送。

需要注意的是,在RocketMQ中,同一消息队列上的消息是有序的,但不同消息队列之间的消息是无序的。因此,我们需要通过消息队列选择器来确保同一设备的消息发送到同一消息队列上,从而实现对同一设备的消息顺序消费。

5. 小结

如上介绍了如何实现消息推送的有序性,其核心原则:先发送的先消费、后发送的后消费。生产者通过设置消息队列选择器来实现消息的顺序生产。消费者通过注册消息监听器来实现消息的顺序消费。


http://www.ppmy.cn/news/80756.html

相关文章

CSS布局:浮动与绝对定位的异同点

CSS布局&#xff1a;浮动与绝对定位的异同点_cherry_vincent的博客-CSDN博客 浮动 ( float ) 和绝对定位 ( position:absolute ) 相同点&#xff1a; &#xff08;1&#xff09;都是漂起来( 离开原来的位置 ) &#xff08;2&#xff09;并且都不占着原来的位置 &#xff08;3…

抖音账号矩阵系统源码开发之——视频发布功能开发

视频发布权限在账号矩阵系统研发之初&#xff0c;都是一个备受争议的功能&#xff0c;最早之前我们使用的视频发布权限名字是Video.creat, video.delete权限&#xff0c;但是该权限于2022年10月份做了权限的收回&#xff0c;后又在上架了一个能力叫发布内容至抖音&#xff1a;…

高级第一个月考试题

1.什么是Vue框架&#xff1f; Vue是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是&#xff0c;Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&#xff0c;并且还便于与第三方库或既有项目整合。另一方面&#xff0c;当与现代化的工具链以及各种支持…

Android 12.0 手动安装Persistent app失败的解决方案

1.概述 在12.0的系统产品开发中,对于一些安装app的失败问题,需要看日志 和抛出异常来判断问题所在,在最近的一些app安装失败抛出了关于Presistent app安装失败的问题,就需要从PMS安装的过程中看异常抛出的原因解决问题所在 2.手动安装Persistent app失败的解决方案的核心类…

Java并发编程中的一些基本概念

进程的基本概念 进程是操作系统级别的概念&#xff0c;操作系统给程序分配的一定资源 线程的概念 线程是cpu调用的基本单位&#xff0c;每一个线程执行某一个进程代码的基本片段 特点 进程是线程的容器&#xff0c;线程利用进程中的一些资源&#xff0c;完成预期指令&…

【Java 28岁了】一个有趣的例子,再推荐一些经典好书(文末惊喜福利)

文章目录 1 写在前面2 C语言与Java语言的互相调用2.1 C语言调用Java语言2.2 Java语言调用C语言 3 友情推荐4 更多分享 1 写在前面 众所周知&#xff0c;C语言和Java语言是两种不同的编程语言&#xff0c;它们的关系可以描述为Java语言是在C语言的基础上发展而来的一种高级编程…

Kuberntes云原生实战09 Kubernetes高可用安装小结

大家好,我是飘渺。 今天咱们继续更新Kubernetes云原生实战系列,本节文章是我们在安装过程中可能会遇到的问题以及解决方法。(都是我踩过的坑,你们大概率也会遇到~) 1. kubesphere平台无法使用kubectl命令行工具 问题现象 如果你给你的KubeSphere在Nginx上配置了域名访…

无穷带来的直觉不靠谱

一、背景 话不多说直接上道具&#xff1b;错了…直接上问题。 请用直觉和数学逻辑告诉我下面三个问题的答案&#xff1f; S11-11-11-11-1… 问&#xff1a;S1等于多少&#xff1f; S21-23-45-67… 问&#xff1a;S2等于多少&#xff1f; S31234567… 问&#xff1a;S3等于多…