【RocketMQ系列十四】RocketMQ中消息堆积如何处理

news/2025/1/6 6:56:00/

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 消息堆积
    • 2. 消息堆积出现的原因
    • 3. 如何解决消息堆积

1. 消息堆积

消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。

在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。

消息堆积

这里有个延迟就表示目前堆积的消息数。

2. 消息堆积出现的原因

消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有:

  1. 第一种情况: 新上线的消费者的消费逻辑存在Bug,导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败,或者消费超时,从而导致消息被大量堆积。

  2. 第二种情况:消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。

  3. 第三种情况:生产者短时间内大量发送消息到Broker端,消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作,比如操作数据库,调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。

3. 如何解决消息堆积

  1. 解决第一种情况:对需要上线的消费者进行严格的测试,确保每种消息的场景都能覆盖到。另外,在上线的时候采用灰度发布,先灰度小范围的用户进行使用,确认没有问题了,在全量放开所有用户使用。

  2. 解决第二种情况:在上线消费者实例时需要,采用多实例,异地多活的方式,确保极端的情况下都能有消费者能够正常消费消息。

  3. 解决第三种情况:这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决:

    1. 同一个消费者组下,增加消费者实例。比如Topic中有8个队列,那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个,我增加到9个,乃至10个行不行?答案是你可以增加10个消费者,但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量,那么有可能会出现消费不均的情况。

    2. 提高单个消费者的消费并行线程。RocketMQ 支持批量消费消息,可以通过修改DefaultMQPushConsumer 消费者类的consumeThreadMin(最少消费线程数),以及consumeThreadMax(最大消费线程数)来提高单个消费者的消费能力。

    3. 批量消费消息:

      某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。建议使用5.x SDK的SimpleConsumer,每次接口调用设置批次大小,一次性拉取消费多条消息。

    下面就让我们来看个例子:

    生产者:使用的是DefaultMQProducer;

    	//4.创建消息StopWatch stopWatch = new StopWatch();stopWatch.start();for (int i = 0; i < 20000; i++) {// 创建消息,指定topic,以及消息体Message message = new Message("heap_topic", ("消息堆积测试" + i).getBytes());//5.发送消息SendResult send = defaultMQProducer.send(message);System.out.println(send);}stopWatch.stop();System.out.println("生产者发送2万条消息用时="+stopWatch.getTotalTimeSeconds()+"秒");
    

    消费者:使用的是DefaultMQPushConsumer;

    	// 4.创建一个回调函数consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println("本批次收到的消息数="+msgs.size());// 5.处理消息for (MessageExt msg : msgs) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("当前时间="+System.currentTimeMillis()+" 收到的消息内容:" + new String(msg.getBody()));}// 返回消费成功的对象return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
    

    生产者329秒内发送了2万条消息,平均60条,

    image-20231014152350841

    而消费者消费一条消息需要一秒,所以生产者发送完消息之后,两个消费者还在消费。

    image-20231014144541572

image-20231014152042570

这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息,通过pullBatchSize字段设置,而消费者,每次消费1条消息,通过consumeMessageBatchMaxSize字段设置。

image-20231014153721132

当然,官方推荐使用SimpleConsumer进行批量消费消息。

	//每批次拉取16条消息int maxMessageNum = 16;// Set message invisible duration after it is received.Duration invisibleDuration = Duration.ofSeconds(15);// Receive message, multi-threading is more recommended.do {final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);log.info("Received {} message(s)", messages.size());for (MessageView message : messages) {final MessageId messageId = message.getMessageId();try {consumer.ack(message);log.info("Message is acknowledged successfully, messageId={}", messageId);} catch (Throwable t) {log.error("Message is failed to be acknowledged, messageId={}", messageId, t);}}} while (true);

官方的代码示例


http://www.ppmy.cn/news/1174845.html

相关文章

CPU缓存一致性协议剖析

CPU缓存一致性协议剖析 文章目录 前言缓存一致性经典问题并发读写导致的数据不一致问题需要缓存一致性协议的实例 常见协议java中的影响关于缓存一致性协议总结 前言 首先&#xff0c;我们需要了解什么是缓存一致性。多处理器系统中的每个处理器都有自己的缓存&#xff0c;用…

图像处理中底层、高层特征、上下文信息理解

1.图像的语义信息: 图像的语义分为视觉层、对象层和概念层。 视觉层即通常所理解的底层&#xff0c;即颜色、纹理和形状等等&#xff0c;这些特征都被称为底层特征语义&#xff1b; 对象层即中间层&#xff0c;通常包含了属性特征等&#xff0c;就是某一对象在某一时刻的状态&a…

Java面试题-Java核心基础-第十二天(SPI机制)

目录 一、什么是SPI机制 二、SPI机制的作用 三、SPI的一些应用 四、 例子 一、什么是SPI机制 SPI因为service provider interface 意为&#xff1a;服务提供者的接口 就是为服务提供者提供的接口&#xff0c;就是设计一套接口规范&#xff0c;然后不同的服务提供者去进行相…

Queue 的 poll() 和 remove()

在Queue接口中&#xff0c;poll()和remove()都是用于从队列中移除并返回头部元素的方法&#xff1a; 返回值: poll(): 如果队列为空&#xff0c;poll()方法会返回null&#xff0c;不会抛出异常。remove(): 如果队列为空&#xff0c;remove()方法会抛出NoSuchElementException异…

Java面试题-Java核心基础-第十三天(序列化)

目录 一、Java序列化与反序列化是什么&#xff1f; 二、为什么需要序列化与反序列化&#xff1f; 三、序列化的实现方式有哪些&#xff1f; 四、什么是serialVersionUID? 五、为什么还要显示指定serialVersionUID 六、serialVersionUID什么时候修改&#xff1f; 七、Jav…

分库分表-ShardingSphere 4.x(2)

❤️作者简介&#xff1a;2022新星计划第三季云原生与云计算赛道Top5&#x1f3c5;、华为云享专家&#x1f3c5;、云原生领域潜力新星&#x1f3c5; &#x1f49b;博客首页&#xff1a;C站个人主页&#x1f31e; &#x1f497;作者目的&#xff1a;如有错误请指正&#xff0c;将…

深入探究Selenium定位技巧及最佳实践

在使用Selenium进行Web自动化测试时&#xff0c;准确地定位元素是非常重要的一步。Selenium提供了多种元素定位方法&#xff0c;本文将深入探究这八大元素定位方法&#xff0c;帮助读者更好地理解和应用Selenium的定位技巧。 1. ID定位 ID是元素在HTML中的唯一标识符&#xff…

配置VUE环境过程中 npm报错的处理方案以及VUE环境搭建过程

背景&#xff1a;VUE已经出来很久了&#xff0c;一直想研究这个东西也很久了。由于各种各样的原因&#xff0c;一直没有能处理。最近终于有时间可以研究了。 奈何报错了 嘤嘤嘤~~ 针对报错情况&#xff0c;其实后来没有找到什么好的方案&#xff0c;几经周折&#xff0c;终于搭…