文章目录
- 概述
- 架构说明
- 核心组件
- 核心概念
- namesvr
- producer
- 默认实现
- producer启动
- 消息发送
- broker-mq核心
- 基本模型
- 集群模型
- 内部模型
- 存储机制
- 高可用
- consumer
- push类型
- push流程
- pull类型
概述
随着分布式技术在业内的快速应用,mq(消息队列)做为不可或缺的重要组件发挥着重要作用:削峰,异步,解耦。rocketmq做为热门产品,本文将对该产品实现原理进行一些分析,供大家参考。
架构说明
核心组件
- 组成: name servers, brokers, producers, consumers
- 特性:支持水平扩展,没有单一的故障点
核心概念
- topic(主题):简单的理解为消息的类型,相同类型的消息使用相同的topic。例:某日报、期刊
- message(消息):消息id可用于防重、冥等性,key可用于路由算法topic分片(queue)。例:具体的一期报纸
- tag(标签):类似子主题,可以在主题下进行拆分子类型, 可用于过滤。例:某日报、期刊的英文版、中文版
- queue:topic的分片。例:家里7个信箱,报纸可以按所属星期分散存储到7个信箱里
- producer-group:多个相同的procuder客户端组成。主要用于冗灾。例:邮局里多人小组负责同一区域报纸邮寄。
- consumer-group:多个相同consumer客户端组成。例:家里由7口人,7个信箱,一个报纸只能由有一个人取到。
- offset:主要用于consumer消费的下标。例:从信箱取报纸的期数,用户1读到了100期,用户2读取了102期,未完读可重复去信箱根据下标重新复制一份。
- 消息顺序:全局要求只能有一个消息queue,多个queue存在并发场景不保证顺序。
namesvr
- namesvr各节点无联系,存储的数据是等价相同的
- namesvr接收并管理broker上报的各种注册元信息,cluster下,broker同时向所有节点上报,若出现任一节点挂掉,其他节点没影响
- namesvr为producer、consumer提供发现服务,cluster下,client只会连接一个可用节点,若出现该节点挂掉,client将自动转移至其它可用的节点
- namesvr将上报的信息都存储在内存中,同时通过心跳机制检测broker的可用性。注:支持持久化, 但一般不启用
- namesvr存储的数据包括broker基本数据(节点信息等)及元数据(topic及message队列映射表等)
- 源码底层通过封装netty来实现
- 为什么不直接用zk?zk完全满足需求,但功能过多;避免过度依赖,保持中间件的轻量级,减少维护成本。
producer
默认实现
官方提供的默认消息发送实现方式涉及3个核心类,如下图
- DefaultMQProducer为基础引导类,除了维护部分基础参数(name、重试次数等等)外,行为操作均为委托Impl,类似一个controller的角色
- DefaultMQProducerImpl为DefaultMQProducer的委托类,是执行具体的业务逻辑实现,消息发送、回调及相关的逻辑分支处理,类似一个service角色
- MQClientInstance用于client维护,用于和broker、namesvr交互,还包括负载、路由、状态存储管理等重要功能,类似一个dao但不止dao的角色。
官方提供了一个DefaultMQProducer的默认消息发送类,该类持有DefaultMQProducerImpl,该实现中引用一个MQClientFactory,
producer启动
- 1:由producer引出,一线实现类为DefaultMQProducerImpl
- 3:实现类初始化start(true),其中的true表示启用MqClientFactory
- 4:checkconfig:检测包括namesvr、group等基本配置信息
- 5:初始化MQClientInstance实例为mQClientFactory,并将该实例加入factoryTable
- 7:执行starttopic对应的routeInfo
- 9:执行向所有broker发送heartbeat
消息发送
以一个最低配置的消息发送为例
- 初始化消息发送机制:消息机制CommunicationMode.SYNC, 消息回调, 发送超时时间
- 获取topic信息,selector计算topic的message queue,并分配其中1个
- 根据message queue信息获取broker相关信息,只会获取角色为master的broker
- 组装其它默认参数,向broker发送消息请求
- 接收到发送的消息sendResult进行相关校验回调操作
- 超过发送阀值,消息将被置入“死信队列”
broker-mq核心
基本模型
- cluster:集群,由多个节点组成逻辑群组。1个cluster可以拥有多个broker组
- broker组:由节点组成小群组,有且只有一个master,0或多个slave
- topic:消息主题,broker与topic为多对多关系
- queue:message queue,1个topic对应1或多个queue(topic分片)
注:cluster和broker组均为逻辑概念,由各节点的clustername、brokername配置形成
集群模型
- 上图为一个2主2从的典型broker模式
- 1个集群(clustername相同为一个集群),2个组(brokername相同为一个组),4个节点(各组各1个master1个slave)
- 图中含2个topic,topic1有2个分片(分布于broker1和broker2),topic2有3个分片(分布于broker1和broker2)
- 同一个broker中的slave为均为master的镜像副本
内部模型
- Remoting Module:一个netty服务端,用于接收并处理客户端请求
- Client Manager:维护接入的producer和consumer的信息,包括各consumer组消费下标都在这里
- Store Service:用于消息硬盘存储策略及实现
- HA Service:主从节点的数据同步机制维护
- Index Service:数据查询策略,索引机制管理及实现
存储机制
- 使用mmap + write方式加快存盘及读取速度
- commitLog:所有消息都存储于commitLog。完全顺序写,随机读(读操作时批量加载,内存缓存)
- consumequeue:关联consumergroup–messagequeue的offset消费位置信息
- 持久化:支持异步和同步。异步的意思为写入内存即响应,后台根据机制定期刷盘。同步表示刷盘后响应
- 主从同步:2m-2s-async(主从异步); 2m-2s-sync(主从同步); 2m-noslave(仅master)
高可用
- 发送消息:如果一个master挂掉,slave暂不支持自动升级为主(后续版本可能会支持)。此时,若消息被路由到该master,因该master不可用,因此消息分配至可用的master中。
- 消费消息:master同时支持读写操作,若某master挂掉,系统会自动将从master中读消息切换至salve中,保证高可用。
consumer
push类型
- DefaultMQPullConsumer 初始启动消费类
- AllocateMessageQueueStrategy 指定消息分配策略实现,consumer获取可消费的topic分片(message queue列表
- DefaultMQPullConsumerImpl 业务逻辑具体实现,同上方producer相关的impl
- MQClientInstance 同上方producer的MQClientInstance相关说明
push流程
- 该模式下,broker收到消息后,consumer,自动更新offset,client接收消息后自行处理
- 类似kafka中的high-level
- 增加新的consumer客户端后,系统会自动实现均衡,需要注意的是,cluster模式下,当同一个组的consumer客户端个数超过某个topic的message queue个数后,那么多出来的consumer客户端将无消息可用
rocketmq中的push,并非完全意义上的broker收到消息后立即推送给consumer,而是采用一种longpolling方式,pull和push相结合。
consumer根据自身消化消息的情况(未处理的消息个数、消息大小、offset跨度),判断是否需要从broker获取新消息
if 消化良好 then {consumer从broker pull消息if broker有新消息,则直接返回else broker没有新消息,broker隔段时间重试,检测到了就返回,超过一定时间仍然检测不到,则返回空。
} else 消化不良:consumer继续处理未处理完的消息
- 整体类似try(pull,timeout)
- 作用:减少server的推送压力,降低server推送过快consumer消化不了的隐患
pull类型
注:pull主要类同push类型
pull流程
- pull类型的特点是需要自行维护offset及异常处理
- 相比于push,灵活度较高,自由优化空间大,适用于复杂场景下优化
- 类比kafka中的simpleconsume
- 主要操作:
- 获取message queue并遍历
- 维护处理offset
- 不同消息状态处理
总结
本文简要阐述了rocketmq浅层面的原理,可供一些常见故障排查定位。希望能对大家有所帮助,感谢!