如何顺序处理设备上报的数据

news/2024/11/27 5:36:27/
1. 引言

随着智能技术的发展,市场上出现了很多的智能设备,其具有连接网络的能力。用户可以实现远程控制,并且设备也可上报自己的状态,实现云端对设备的运行情况分析。在某些情况下需要保证设备上报状态的有序性,例如传感器数据采集和处理,其中数据的顺序很重要,因为它们可能表示实时的物理过程或事件。为了能实现消息顺序消费,可以用什么办法实现呢?

2. 消息队列实现消息的顺序消费

顺序消息是RocketMQ提供的一种高级消息类型,消费者按照发送消息的顺序性去处理消息。即在进行设备状态上报的时候消息发送到消息队列里面时就要先保持有序,之后进行消息处理时才能获取有序的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传在这里插入图片描述

如上图所示,在设备端采集设备的状态,放到MQ进行存储起来,起到缓冲作用,之后再去消费消息。

因此,生产者进行发送消息时,就需要保持消息的有序性。

3. 状态上报的有序性

为保证状态推送的有效性,需要指定消息的关键字,其中设备ID是唯一标识设备,因此同一个设备上报的状态就被同一个队列中,这样就能保持统一设备的状态是有序的。

其代码示例如下:

public class Producer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 设置 NameServer 地址producer.setNamesrvAddr("localhost:9876");// 启动生产者实例producer.start();// 设置消息队列选择器producer.setQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 获取设备 IDString deviceId = (String) arg;// 计算队列索引int index = Math.abs(deviceId.hashCode()) % mqs.size();// 返回对应的消息队列return mqs.get(index);}});// 发送消息,模拟100个设备上报状态for (int i = 0; i < 100; i++) {// 构建消息体Message msg = new Message("device_status_topic", "device_status_tag", ("Device status " + i).getBytes());// 设置设备 ID 作为消息队列选择器的参数String deviceId = "device_" + i % 10;SendResult sendResult = producer.send(msg, deviceId);System.out.println(sendResult);}// 关闭生产者实例producer.shutdown();}
}

如上述的代码所示,首先创建了生产者,定义了MQ服务器地址,启动的生产者实例;接着以设备ID为参数,以取模的方式进行哈希计算,计算出该设备属于哪个队列,之后对应该设备ID的消息都会被发送到相同的队列。接着使用for循环,模拟100个设备上报,首先生成消息,然后使用send方法进行推送到指定的队列。

4. 状态消费的有序性

在消息生产的时候保持了有序性,为了实现消息消费的有序性,消费消息时需要严格按照接收—处理—应答的语义处理消息。

其代码示例如下:

public class Consumer {public static void main(String[] args) throws Exception {// 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 设置 NameServer 地址consumer.setNamesrvAddr("localhost:9876");// 订阅主题和标签consumer.subscribe("device_status_topic", "device_status_tag");// 注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 遍历消息列表for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}// 返回消费状态return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例consumer.start();System.out.println("Consumer started");// 等待一段时间后关闭消费者实例Thread.sleep(60000);consumer.shutdown();}
}

在代码中,创建了消费组实例,设置了MQ服务的地址,订阅了指定的主题和标签。其中较为重要的一步是注册消息监听器,在该方法中,使用了顺序消费,即在队列里面的消息会根据存储的先后顺序推送给消费者,进行消费。如果消费不成功,会重新将该消息放到队列的头部,重新推送。

需要注意的是,在RocketMQ中,同一消息队列上的消息是有序的,但不同消息队列之间的消息是无序的。因此,我们需要通过消息队列选择器来确保同一设备的消息发送到同一消息队列上,从而实现对同一设备的消息顺序消费。

5. 小结

如上介绍了如何实现消息推送的有序性,其核心原则:先发送的先消费、后发送的后消费。生产者通过设置消息队列选择器来实现消息的顺序生产。消费者通过注册消息监听器来实现消息的顺序消费。


http://www.ppmy.cn/news/97322.html

相关文章

光力转债上市价格预测

光力转债 基本信息 转债名称&#xff1a;光力转债&#xff0c;评级&#xff1a;A&#xff0c;发行规模&#xff1a;4.0亿元。 正股名称&#xff1a;光力科技&#xff0c;今日收盘价&#xff1a;22.53元&#xff0c;转股价格&#xff1a;21.46元。 当前转股价值 转债面值 / 转股…

Three.js--》实现3d汽车模型展览搭建

目录 项目搭建 初始化three.js基础代码 添加汽车模型展示 动态修改汽车模型 今天简单实现一个three.js的小Demo&#xff0c;加强自己对three知识的掌握与学习&#xff0c;只有在项目中才能灵活将所学知识运用起来&#xff0c;话不多说直接开始。 项目搭建 本案例还是借助…

《低代码指南》——维格云低代码工作流介绍

工作流模块是在 维格云 表格功能的基础上提供对工作流程的管理。通过 维格云 工作流,我们可以对团队中的工作流程建立一个正式的模型,明确一个工作流程包含哪些阶段,每个阶段由什么人负责,需要完成哪些事项。通过定义工作流,我们可以让团队中的协作更明确和有序。 ●工作流…

把Kubernetes用于微服务管理的最佳实践

把Kubernetes用于微服务管理的最佳实践 一、 引言1 什么是 Kubernetes2 Kubernetes 的优势和应用场景3 为什么使用 Kubernetes 部署容器化应用程序 二、 准备工作1 安装 Kubernetes2 准备容器镜像3 准备 Kubernetes 配置文件 三、 部署应用程序1 创建 Kubernetes 命名空间2 创建…

git Husky

虽然我们已经要求项目使用eslint了&#xff0c;但是不能保证组员提交代码之前都将eslint中的问题解决掉了&#xff1a; 也就是我们希望保证代码仓库中的代码都是符合eslint规范的&#xff1b; 那么我们需要在组员执行 git commit 命令的时候对其进行校验&#xff0c;如果不符合…

(转载)基于遗传算法的TSP算法(matlab实现)

1 理论基础 TSP(traveling salesman problem,旅行商问题)是典型的NP完全问题&#xff0c;即其最坏情况下的时间复杂度随着问题规模的增大按指数方式增长&#xff0c;到目前为止还未找到一个多项式时间的有效算法。 TSP问题可描述为&#xff1a;已知n个城市相互之间的距离&…

AirServer电脑通用版下载及使用教程

AirServer 是一款功能十分强大的投屏软件&#xff0c;支持并适用于 Windows和Mac。AirServer 是接收方&#xff0c;而不是发送方。 AirServer 只允许您接收镜像或流媒体内容&#xff0c;反之则不行。AirServer虽然功能十分强大&#xff0c;但是整体操作和使用都十分简单&#x…

Vue--》Vue3打造可扩展的项目管理系统后台的完整指南(一)

今天开始使用 vue3 ts 搭建一个项目管理的后台&#xff0c;因为文章会将项目的每一个地方代码的书写都会讲解到&#xff0c;所以本项目会分成好几篇文章进行讲解&#xff0c;我会在最后一篇文章中会将项目代码开源到我的GithHub上&#xff0c;大家可以自行去进行下载运行&…