9.延迟消息发送与延迟消息实现探究

news/2024/11/7 14:27:10/

highlight: arduino-light

4.3 延时消息

延迟消息对应的Topic是SCHEDULETOPICXXXX,注意就是SCHEDULETOPICXXXX,XXXX不是某某某的意思。

image.png

SCHEDULETOPICXXXX的队列名称是从2开始到17,对应的delayLevel为3到18,3对应10s,18对应2h,在类MessageStoreConfig中这样定义延时时间:

String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。

SCHEDULETOPICXXXX这个topic只对内部使用,对于consumer只能消费到自己所在的消费者组的重试topic的数据。

比如A是OrderConsumer组的一个消费者,OrderConsumer消费的主题Topic是Order。

那么消费者A在启动的时候,会订阅Order主题的同时,还会订阅%RETRY%_OrderConsumer。

即订阅Order主题的同时,还会订阅%RETRY%+消费者组的主题。

consumer消费失败的消息发回broker后总是先写到SCHEDULETOPICXXXX里面,然后schedule service在延迟时间到了以后会读取SCHEDULETOPICXXXX里面的数据然后重新发回到重试主题,consumer订阅了重试主题,所以会重新消费失败的数据,这样就完成了一个循环。

md 发送到重试消费Topic 是%RETRY% + 消费组名 注意是消费组名 我们思考一下为什么是消费者组名? A消费者组消费成功 B消费者组消费失败 如果发回原topic就有问题了,A又会消费一次

rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。

从这个过程也能看到,一个消费失败的消息体每次发回broker需要在commitLog里面存储两份。

topic为SCHEDULETOPICXXXX的一份这个主要是为schedule service控制延时用的。

topic为%RETRY%groupName的一份。

通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。

另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。

虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。

阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源 发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。

延迟消息是根据延迟队列的 level 来的,延迟队列默认是 msg.setDelayTimeLevel(5) 代表延迟一分钟 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个 小时消费。

生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。

固定Level的含义是延迟是特定级别的,比如支持3秒、5秒的Level,那么用户只能发送3秒延迟或者5秒延迟,不能发送8秒延迟的消息。

消息队列RocketMQ的阿里云版本(收费版本)才支持到精确到秒级别的延迟消息(没有特定Level的限制)。

开源版本没有支持任意延迟的消息,我想可能有以下几个原因:

  1. 任意延迟的消息的需求不强烈
  2. 可能是一个比较有技术含量的点,不愿意开源

需求不强

对支持任意延迟的需求确实不强,因为:

  1. 延迟并不是MQ场景的核心功能,业务单独做一个替代方案的成本不大
  2. 业务上一般对延迟的需求都是固定的,比如下单后半小时check是否付款,发货后7天check是否收货

在我司,MQ上线一年多后才有业务方希望我能支持延迟消息,且不要求任意延迟,只要求和RocketMQ开源版本一致,支持一些业务上的级别即可。

不愿意开源

为了差异化(好云上卖钱),只能将开源版本的功能进行阉割,所以开源版本的RocketMQ变成了只支持特定Level的延迟。

既然业务有需求,我们肯定也要去支持。

任意延迟的消息难点在哪里?

首先,我们先划清楚定义和边界:在我们的系统范围内,支持任意延迟的消息指的是:

1 精度支持到秒级别 2 最大30天的延迟

本着对自己的高要求,我们并不满足于开源RocketMQ的18个Level的方案。那么,如果我们自己要去实现一个支持任意延迟的消息队列,难点在排序和消息存储。

消息要在服务端排序

任意延迟意味消息要在服务端排序

比如用户先发了一条延迟1分钟的消息,一秒后发了一条延迟3秒的消息,显然延迟3秒的消息需要先被投递出去。

那么服务端在收到消息后需要对消息进行排序后再投递出去。在MQ中,为了保证可靠性,消息是需要落盘的,且对性能和延

迟的要求,决定了在服务端对消息进行排序是完全不可接受的。

消息存储量太大

其次,目前MQ的方案中都是基于Write Ahead Log的方式实现的(RocketMQ、Kafka),日志文件会被过期删除,一般会保留最近一段时间的数据。

支持任意级别的延迟,那么需要保存最近30天的消息。阿里内部 1000+ 核心应用使用,每天流转几千亿条消息,经过双11交易、商品等核心链路真实场景的验证,稳定可靠。

考虑一下一天几千亿的消息,保存30天的话需要堆多少服务器,显然是无法做到的。

开源版本延迟消息如何做的?

虽然决定自己做,但是依旧需要先了解开源的实现,那么就只能看看RocketMQ开源版本中,支持18个Level是怎么实现的,希望能从中得到一些灵感。

上图是通过RocketMQ源码分析后简化一个实现原理方案示意图。

image.png

消息写入

  1. 在写入CommitLog之前,如果是延迟消息,替换掉消息的Topic和queueId(被替换为延迟消息特定的Topic,queueId则为延迟级别对应的id)

  2. 消息写入CommitLog之后,提交dispatchRequest到DispatchService

  3. 因为在第①步中Topic和QueueId被替换了,所以写入的ConsumeQueue实际上非真正消息应该所属的ConsumeQueue,而是写入到ScheduledConsumeQueue中(这个特定的Queue存放不会被消费)

Schedule过程中:

private String levelString = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) {                this.delayLevelTable.put(level, delayTimeMillis); }

每个level分配1个定时器,扫描所有延迟级别里面的延迟消息message。

如果存在延时消息。

如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。countdown是消息消费的时间-当前时间now。

如果当前消息到消费的时间,根据消息的物理偏移量和大小获取消息。把延迟消息message,发送到真正的topic对应的某个队列。

RocketMQ延迟消息的代码实战及原理分析 - 万猫学社 - 博客园 (cnblogs.com)

回顾一下这个方案,最大的优点就是没有了排序:

  • 先发一条level是5s的消息,再发一条level是3s的消息,因为他们会属于不同的ScheduleQueue所以投递顺序能保持正确
  • 如果先后发两条level相同的消息,那么他们的处于同一个ConsumeQueue且保持发送顺序
  • 因为level数固定,每个level的有自己独立的定时器,开销也不会很大
  • ScheduledConsumeQueue其实是一个普通的ConsumeQueue,所以可靠性等都可以按照原系统的M-S结构等得到保障

但是这个方案也有一些问题:

  • 固定了Level,不够灵活,最多只能支持18个Level
  • 业务是会变的,但是Level需要提前划分,不支持修改
  • 如果要支持30天的延迟,CommitLog的量会很大,这块怎么处理没有看到

时间轮:TimeWheel

总结RocketMQ的方案,通过划分Level的方式,将排序操作转换为了O(1)的ConsumeQueue 的append操作。

我们去支持任意延迟的消息,必然也需要通过类似的方式避免掉排序。

此时我们想到了TimeWheel:Netty中也是用TimeWheel来优化I/O超时的操作。

4.3.1 启动消息消费者

java public class ScheduledMessageConsumer {   public static void main(String[] args) throws Exception {      // 实例化消费者      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");      // 订阅Topics      consumer.subscribe("TestTopic", "*");      // 注册消息监听者      consumer.registerMessageListener(new MessageListenerConcurrently() {          @Override          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {              for (MessageExt message : messages) {                  // Print approximate delay time period                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");             }              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;         }     });      // 启动消费者      consumer.start(); } }

4.3.2 发送延时消息

java public class ScheduledMessageProducer {   public static void main(String[] args) throws Exception {      // 实例化一个生产者来产生延时消息      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");      // 启动生产者      producer.start();      int totalMessagesToSend = 100;      for (int i = 0; i < totalMessagesToSend; i++) {          Message message = new Message             ("TestTopic", ("Hello scheduled message " + i).getBytes());          // delayTimeLevel="1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间)          message.setDelayTimeLevel(3);          // 发送消息          producer.send(message);     }       // 关闭生产者      producer.shutdown(); } }

4.3.3 验证

您将会看到消息的消费比存储时间晚10秒

4.3.4 使用限制:1s-2h

md // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。

RocketMQ延时消息实现原理探究


http://www.ppmy.cn/news/547700.html

相关文章

碰撞检测算法详述

算法的分类 目录 一、基于空间域的碰撞检测算法分类 1. 基于图像空间的碰撞算法 2.基于几何空间的碰撞检测算法 &#xff08;1&#xff09;基于空间剖分算法 &#xff08;2&#xff09;裁剪扫掠法 &#xff08;3&#xff09;基于距离场的算法 &#xff08;4&#xff09;…

C++面向对象 this指针 构造函数 析构函数 拷贝构造 友元

C面向对象 面向对象概念类与对象的区别 C中类的设计设计实例实例解释共有和私有类的认识 函数定义函数在类里定义和类外定义区别函数定义实例 C对象模型方案一:各对象完全独立地安排内存的方案方案二:各对象的代码区共用的方案: this指针this指针特点程序编译面向对象程序的过程…

谷歌查看请求选择只看接口

打开F12 调试工具 选择Network 这是我们会发现 什么图片 文件 接口的请求乱七八糟的 正常情况我们需要的都是只看接口 先点击这里这个 过滤 我们只需要点击 Fetch/XHR 即可过滤掉其他请求信息的展示

谷歌浏览器console打印不出信息,Default levels无法选择解决办法

如题&#xff0c;在网上搜了各种什么Default levels将相应的选上&#xff0c;可是我发现我那里灰了点不了&#xff0c;在试了各种方法无效之后&#xff0c;重启了浏览器就好了&#xff01;&#xff01;&#xff01;

谷歌如何调试css3动画

在浏览器值&#xff0c;右键随便检查元素 如何 crtlshfitp 出现输入框&#xff08;这时中文翻译了&#xff09; 寻常输入 show animation

Chrome开发者工具对Vue应用的支持

https://blog.51cto.com/jerrywangsap/5137803 Chrome对于vue这么一个流行的前端框架的支持当然是少不了的。 在Chrome web store里安装Chrome开发者工具的Vue扩展&#xff1a; 安装完毕之后&#xff0c;在Chrome的工具栏里会出现一个象征Vue的小图标&#xff1a; Chrome开发…

googe.fun问题汇总

2022/11/05&#xff1a;今天发现镜像站出了点问题&#xff0c;来做个问题汇总。如果发现镜像站出现问题&#xff0c;请大家来问题汇总查看是否被发现以及修复进度。 今天的问题&#xff1a; 问题进度&#xff1a;原因已找到&#xff0c;需要更换服务器&#xff0c;请耐心等待 …