一文读懂 RocketMQ:从概念到架构与应用原理概述

news/2025/1/22 23:31:49/

文章目录

  • 概述
  • 架构说明
    • 核心组件
    • 核心概念
  • namesvr
  • producer
    • 默认实现
    • producer启动
    • 消息发送
  • broker-mq核心
    • 基本模型
    • 集群模型
    • 内部模型
    • 存储机制
    • 高可用
  • consumer
    • push类型
    • push流程
    • pull类型

概述

随着分布式技术在业内的快速应用,mq(消息队列)做为不可或缺的重要组件发挥着重要作用:削峰,异步,解耦。rocketmq做为热门产品,本文将对该产品实现原理进行一些分析,供大家参考。

架构说明

核心组件

在这里插入图片描述

  • 组成: name servers, brokers, producers, consumers
  • 特性:支持水平扩展,没有单一的故障点

核心概念

在这里插入图片描述

  • topic(主题):简单的理解为消息的类型,相同类型的消息使用相同的topic。例:某日报、期刊
  • message(消息):消息id可用于防重、冥等性,key可用于路由算法topic分片(queue)。例:具体的一期报纸
  • tag(标签):类似子主题,可以在主题下进行拆分子类型, 可用于过滤。例:某日报、期刊的英文版、中文版
  • queue:topic的分片。例:家里7个信箱,报纸可以按所属星期分散存储到7个信箱里
  • producer-group:多个相同的procuder客户端组成。主要用于冗灾。例:邮局里多人小组负责同一区域报纸邮寄。
  • consumer-group:多个相同consumer客户端组成。例:家里由7口人,7个信箱,一个报纸只能由有一个人取到。
  • offset:主要用于consumer消费的下标。例:从信箱取报纸的期数,用户1读到了100期,用户2读取了102期,未完读可重复去信箱根据下标重新复制一份。
  • 消息顺序:全局要求只能有一个消息queue,多个queue存在并发场景不保证顺序。

namesvr

  • namesvr各节点无联系,存储的数据是等价相同的
  • namesvr接收并管理broker上报的各种注册元信息,cluster下,broker同时向所有节点上报,若出现任一节点挂掉,其他节点没影响
  • namesvr为producer、consumer提供发现服务,cluster下,client只会连接一个可用节点,若出现该节点挂掉,client将自动转移至其它可用的节点
  • namesvr将上报的信息都存储在内存中,同时通过心跳机制检测broker的可用性。注:支持持久化, 但一般不启用
  • namesvr存储的数据包括broker基本数据(节点信息等)及元数据(topic及message队列映射表等)
  • 源码底层通过封装netty来实现
  • 为什么不直接用zk?zk完全满足需求,但功能过多;避免过度依赖,保持中间件的轻量级,减少维护成本。

producer

默认实现

官方提供的默认消息发送实现方式涉及3个核心类,如下图
在这里插入图片描述

  • DefaultMQProducer为基础引导类,除了维护部分基础参数(name、重试次数等等)外,行为操作均为委托Impl,类似一个controller的角色
  • DefaultMQProducerImpl为DefaultMQProducer的委托类,是执行具体的业务逻辑实现,消息发送、回调及相关的逻辑分支处理,类似一个service角色
  • MQClientInstance用于client维护,用于和broker、namesvr交互,还包括负载、路由、状态存储管理等重要功能,类似一个dao但不止dao的角色。

官方提供了一个DefaultMQProducer的默认消息发送类,该类持有DefaultMQProducerImpl,该实现中引用一个MQClientFactory,

producer启动

在这里插入图片描述

  • 1:由producer引出,一线实现类为DefaultMQProducerImpl
  • 3:实现类初始化start(true),其中的true表示启用MqClientFactory
  • 4:checkconfig:检测包括namesvr、group等基本配置信息
  • 5:初始化MQClientInstance实例为mQClientFactory,并将该实例加入factoryTable
  • 7:执行starttopic对应的routeInfo
  • 9:执行向所有broker发送heartbeat

消息发送

以一个最低配置的消息发送为例

  • 初始化消息发送机制:消息机制CommunicationMode.SYNC, 消息回调, 发送超时时间
  • 获取topic信息,selector计算topic的message queue,并分配其中1个
  • 根据message queue信息获取broker相关信息,只会获取角色为master的broker
  • 组装其它默认参数,向broker发送消息请求
  • 接收到发送的消息sendResult进行相关校验回调操作
  • 超过发送阀值,消息将被置入“死信队列”

broker-mq核心

基本模型

在这里插入图片描述

  • cluster:集群,由多个节点组成逻辑群组。1个cluster可以拥有多个broker组
  • broker组:由节点组成小群组,有且只有一个master,0或多个slave
  • topic:消息主题,broker与topic为多对多关系
  • queue:message queue,1个topic对应1或多个queue(topic分片)

注:cluster和broker组均为逻辑概念,由各节点的clustername、brokername配置形成

集群模型

在这里插入图片描述

  • 上图为一个2主2从的典型broker模式
  • 1个集群(clustername相同为一个集群),2个组(brokername相同为一个组),4个节点(各组各1个master1个slave)
  • 图中含2个topic,topic1有2个分片(分布于broker1和broker2),topic2有3个分片(分布于broker1和broker2)
  • 同一个broker中的slave为均为master的镜像副本

内部模型

在这里插入图片描述

  • Remoting Module:一个netty服务端,用于接收并处理客户端请求
  • Client Manager:维护接入的producer和consumer的信息,包括各consumer组消费下标都在这里
  • Store Service:用于消息硬盘存储策略及实现
  • HA Service:主从节点的数据同步机制维护
  • Index Service:数据查询策略,索引机制管理及实现

存储机制

在这里插入图片描述

  • 使用mmap + write方式加快存盘及读取速度
  • commitLog:所有消息都存储于commitLog。完全顺序写,随机读(读操作时批量加载,内存缓存)
  • consumequeue:关联consumergroup–messagequeue的offset消费位置信息
  • 持久化:支持异步和同步。异步的意思为写入内存即响应,后台根据机制定期刷盘。同步表示刷盘后响应
  • 主从同步:2m-2s-async(主从异步); 2m-2s-sync(主从同步); 2m-noslave(仅master)

高可用

  • 发送消息:如果一个master挂掉,slave暂不支持自动升级为主(后续版本可能会支持)。此时,若消息被路由到该master,因该master不可用,因此消息分配至可用的master中。
  • 消费消息:master同时支持读写操作,若某master挂掉,系统会自动将从master中读消息切换至salve中,保证高可用。

consumer

push类型

在这里插入图片描述

  • DefaultMQPullConsumer 初始启动消费类
  • AllocateMessageQueueStrategy 指定消息分配策略实现,consumer获取可消费的topic分片(message queue列表
  • DefaultMQPullConsumerImpl 业务逻辑具体实现,同上方producer相关的impl
  • MQClientInstance 同上方producer的MQClientInstance相关说明

push流程

  • 该模式下,broker收到消息后,consumer,自动更新offset,client接收消息后自行处理
  • 类似kafka中的high-level
  • 增加新的consumer客户端后,系统会自动实现均衡,需要注意的是,cluster模式下,当同一个组的consumer客户端个数超过某个topic的message queue个数后,那么多出来的consumer客户端将无消息可用
    在这里插入图片描述
    rocketmq中的push,并非完全意义上的broker收到消息后立即推送给consumer,而是采用一种longpolling方式,pull和push相结合。

consumer根据自身消化消息的情况(未处理的消息个数、消息大小、offset跨度),判断是否需要从broker获取新消息

if 消化良好 then {consumer从broker pull消息if broker有新消息,则直接返回else broker没有新消息,broker隔段时间重试,检测到了就返回,超过一定时间仍然检测不到,则返回空。
}  else 消化不良:consumer继续处理未处理完的消息
  • 整体类似try(pull,timeout)
  • 作用:减少server的推送压力,降低server推送过快consumer消化不了的隐患

pull类型

在这里插入图片描述
注:pull主要类同push类型
pull流程

  • pull类型的特点是需要自行维护offset及异常处理
  • 相比于push,灵活度较高,自由优化空间大,适用于复杂场景下优化
  • 类比kafka中的simpleconsume
  • 主要操作:
  1. 获取message queue并遍历
  2. 维护处理offset
  3. 不同消息状态处理

总结
本文简要阐述了rocketmq浅层面的原理,可供一些常见故障排查定位。希望能对大家有所帮助,感谢!


http://www.ppmy.cn/news/1565333.html

相关文章

QT的TCP通讯

目录 一、引言 二、QT 中与 TCP 通讯相关的类 1.QTcpSocket 类 1.1 常用信号 1.2常用函数 2.QTcpServer类 2.1常用函数 三、QT TCP通信的详细代码实现 1.TCP服务器端实现 2.TCP客户端实现 四、总结 一、引言 在网络编程领域,TCP(Transmission…

自动化爬虫运行过程中,有没有办法提高爬虫的抓取效率?

关于在实际爬虫开发及运行过程中,我们可以深刻研究爬虫机制和网站结构,如何提高爬虫抓取效率和性能是我们运行爬虫的关键所在。关于这一点,将从多个方面展开,包括并发处理、优化网络请求、缓存策略、代理IP池、分布式爬虫等,同时提供详细的代码实现和解释。 爬虫的抓取效…

微服务学习-Nacos 作为注册中心使用

Nacos 实现服务的注册与发现 1. 作用 服务的注册与发现 2. 为什么要引入注册中心 微服务中的订单服务调用商品、库存、账户等服务的 IP 地址和端口都是硬编码,会存在很多问题 如果商品、库存、账户等服务的 IP 地址或端口发生了变化,则订单服务将变得…

SpringBoot快速接入OpenAI大模型(JDK8)

使用AI4J快速接入OpenAI大模型 本博文给大家介绍一下如何使用AI4J快速接入OpenAI大模型,并且如何实现流式与非流式的输出,以及对函数调用的使用。 介绍 由于SpringAI需要使用JDK17和Spring Boot3,但是目前很多应用依旧使用的JDK8版本&…

32单片机从入门到精通之测试与验证——单元测试(十五)

人生苦短,我们都会面临困难和挑战。但是,只要我们保持积极的心态和勇往直前的精神,我们就能战胜一切困难,实现自己的目标。 成功并不是一蹴而就的,它需要我们付出努力和坚持不懈。就像爬山一样,我们可能会遇…

物联网技术正在如何影响我们的生活

物联网改变生活 引言 在今天这个数字化迅猛发展的时代,物联网(IoT)已经悄然渗透了我们生活的每一个角落。想象一下,当你的冰箱能够自动订购牛奶时,你是否会感到惊喜?这就是物联网的魔力,它不仅…

InVideo AI技术浅析(五):生成对抗网络

一、特效生成 1. 工作原理 特效生成是计算机视觉中的高级应用,旨在通过算法生成高质量的视觉特效,如风格迁移、图像到图像的翻译等。InVideo AI 使用生成对抗网络(GAN)来实现这一功能。GAN 通过生成器和判别器两个网络的对抗训练,生成逼真的视觉特效。 2. 关键技术模型…

Oracle 深入学习 Part 14:Managing Password Security and Resources(管理密码安全性和资源)

Profiles Profile 是一个以名称标识的集合,用于管理 密码 和 资源限制。 每个用户都对应一个profiles,可以通过 CREATE USER 或 ALTER USER 命令分配给用户。 Profiles 可以启用或禁用。 Profiles 可以关联到默认的 DEFAULT Profile。 密码管理&…