物联网中RocketMQ的使用

news/2025/1/13 3:33:33/

物联网中RocketMQ的使用

1. 背景

       随着物联网行业的发展、智能设备数量越来越多,很多常见的智能设备都进入了千家万户;随着设备数量的增加,也对后台系统的性能提出新的挑战。

       在日常中,存在一些特定的场景,属于高并发请求,瞬时的压力会达到20w/s。为了保证用户的使用体验,提高服务器的处理能力刻不容缓。那应该如何来提高后台服务处理能力呢?

系统访问的两种常见方式:

  • 同步方式,该方式是常用的一种方式,当请求时,就会阻塞当前线程等待返回结果。该方式开发很简单,但是在高并发的情况下,会导致系统负载剧增,处理能力下降,甚至会导致上游服务等待过长产生熔断。
  • 异步方式,当我们请求时,不需要阻塞当前线程(主线程),当时结果处理完之后,再回调告诉主线程;该方式很适合与高并发的场景,但开发比较困难,会导致失败的情况。

2. 缓冲池

       在讲解RocketMQ之前,首先来介绍缓冲池的概念:

       缓冲池是计算机里的概念,很像生活中的蓄水池:从上游来的水并不是直接到达用户家里,而是先存储到一个水池里面;其优点有:

  • 多余的水可以在水池里面存起来,防止压力直接就传到下游;
  • 控制水池的阀门可以调节出水大小,可以根据实际的使用控制水流。

3. RocketMq的组成

       有了缓存池的概念,就很好理解RocketMQ工作的基本原理,在RocketMQ中主要有三部分组成:

  • 生产者(producer),生产者用户生产消息,就像上游的水。
  • brokerbroker就像蓄水池一样,会将生产的消息存储起来。
  • 消费者(consumer),将broker队列里面的消息拉出来消费。

其工作的原理图如下:

在这里插入图片描述

       通过生产者将消息传递到broker存储起来,然后消费者去消费broker里面的消息,其中broker是路由,具体的存储是采用了队列数据结构。

4. RocketMQ在物联网中的应用

       根据它的组成和原理,其非常合适用于异步解耦。比如在某个时间段,大量的请求来到云服务,如果云服务采用同步的方式去控制设备,势必导致服务延迟严重,出现大批量的失败。

       因此为了达到异步解耦,流量削峰,当大量的请求来到云服务的时候,并不是直接去控制设备,而是先将控制消息存入RocketMQ中,然后通过消费者去控制设备。因此具体的控制流程图如下:

在这里插入图片描述

       如图所示,当控制参数来到服务时,现将控制的参数封装为message,然后通过生产者将消息存放到RocketMQbroker存储起来,在消费者端将启动多线程去消费消费消息—即去控制设备,当设备控制完成后,将控制的结果告诉上游服务。

       通过以上的设计,不管多大的流量来,都先将参数存入rocketMQ中,然后慢慢去消费,有效地保护了云端服务,提供了云端的性能,提高了用户的使用体验。

5. 使用RocketMQ搭建一个简单的生产者和消费者demo

  • 前提—RocketMQ已经搭建好

  • 依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
  • 配置文件
server.port: 8085rocketmq:name-server: 10.16.17.246:9876producer:group: rocket-demo
  • 首先是生产者,这里开发了一个接口:
@RestController()
@RequestMapping("/producer")
public class ProducerController {// 注入@ResourceRocketMQTemplate rocketMQTemplate;@PostMapping("/push")public void producer(){ProducerDto producerDto = new ProducerDto();Random random = new Random();producerDto.setApplianceId(String.valueOf(random.nextInt()));producerDto.setReqId(UUID.randomUUID().toString());// 将producerDto 封装为消息Message<ProducerDto> message = withPayload(producerDto).setHeader(PROPERTY_KEYS, producerDto.getReqId()).build();System.out.println("生产的消息为:" + message);// 将消息塞入指定的TOPICrocketMQTemplate.syncSend(TOPIC_DEMO, message);}
}

其中,ProducerDto消息体有applianceIdreqId两个参数,broker中将消息路由到与topic绑定的队列中。

  • 消费者
@RocketMQMessageListener(consumeThreadNumber = 16,topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt> {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}
}

在消费者类上加上@RocketMQMessageListener、@Component注解即可实现消费。

  • 结果, 实现了消息的生产和消费
生产的消息为:GenericMessage [payload=ProducerDto(reqId=086a5666-922a-4345-b408-956ba6bc18d9, applianceId=2140322229), headers={KEYS=086a5666-922a-4345-b408-956ba6bc18d9, id=12816bba-1
259-469a-d0ff-e42311a27fe7, timestamp=1676713480448}]
消费者消费的消息为: {"reqId":"086a5666-922a-4345-b408-956ba6bc18d9","applianceId":"2140322229"} 

6. 结语

以上是关于RocketMQ的简单介绍,只是皮毛而已,里面还有很多的功能可以挖掘。


http://www.ppmy.cn/news/26027.html

相关文章

蓝桥杯 stm32 PWM 设置占空比

本文代码使用 HAL 库。 文章目录 前言一、创建CubeMX 工程 ,占空比分析:二、相关函数:1. 获取 CNT函数2.设置CNT为 0 函数(计算器清零)3.开启TIM2_CH1的输入捕获中断函数4.TIM 回调函数三、设置上升沿,下降沿四、在lcd上显示 R40 占空比 详细代码五、设置占空比,输出 PW…

Go 数组和切片反思

切片的底层数据结构是数组&#xff0c;所以&#xff0c;切片是基于数组的上层封装&#xff0c;使用数组的场景&#xff0c;也完全可以使用切片。 类型比较 我看到 go 1.17 有对切片和数组转换的优化&#xff0c;禁不住纳闷&#xff0c;有什么场景是必须数组来完成的呢&#x…

超低成本DDoS攻击来袭,看WAF如何绝地防护

一、DDoS攻击&#xff0c;不止于网络传输层 网络世界里为人们所熟知的DDoS攻击&#xff0c;多数是通过对带宽或网络计算资源的持续、大量消耗&#xff0c;最终导致目标网络与业务的瘫痪&#xff1b;这类DDOS攻击&#xff0c; 工作在OSI模型的网络层与传输层&#xff0c;利用协…

Python-TCP网络编程基础以及客户端程序开发

文章目录一. 网络编程基础- 什么是IP地址?- 什么是端口和端口号?- TCP介绍- socket介绍二. TCP客户端程序开发三. 扩展一. 网络编程基础 - 什么是IP地址? IP地址就是标识网络中设备的一个地址 IP地址分为 IPv4 和 IPv6 IPv4使用十进制, IPv6使用十六进制 查看本机IP地址: l…

ChatGPT入门案例|商务智能对话客服(三)

本篇介绍智能客服的基本功能架构和基本概念&#xff0c;并利用对话流技术构建商务智能应用。 01、商务智能客服功能结构 互联网的发展已经深入到社会的各个方面&#xff0c;智能化发展已经成为社会发展的大趋势。在大数据和互联网时代&#xff0c;企业和组织愈加重视客户沟通…

Websocket详细介绍

需求背景 在某个资产平台&#xff0c;在不了解需求的情况下&#xff0c;我突然接到了一个任务&#xff0c;让我做某个页面窗口的即时通讯&#xff0c;想到了用websocket技术&#xff0c;我从来没用过&#xff0c;被迫接受了这个任务&#xff0c;我带着浓烈的兴趣&#xff0c;就…

rt-thread pwm 多通道

一通道pwm参考 https://blog.csdn.net/yangshengwei230612/article/details/128738351?spm1001.2014.3001.5501 以下主要是多通道与一通道的区别 芯片 stm32f407rgt6 1、配置PWM设备驱动相关宏定义 添加PWM宏定义 #define BSP_USING_PWM8 #define BSP_USING_PWM8_CH1 #d…

LeetCode 338. 比特位计数

给你一个整数 n &#xff0c;对于 0 < i < n 中的每个 i &#xff0c;计算其二进制表示中 1 的个数 &#xff0c;返回一个长度为 n 1 的数组 ans 作为答案。 示例 1&#xff1a; 输入&#xff1a;n 2 输出&#xff1a;[0,1,1] 解释&#xff1a; 0 --> 0 1 --> …