RocketMQ负载均衡机制解析

server/2024/11/30 20:09:04/

消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。

所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。

RocketMQ中,在消费者端有一个:Rebalance负载均衡组件

  • 他负责相对均匀的给消费者分配需要拉取的队列信息。

消费者负载均衡

指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息。

  • 这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不涉及负载均衡。

而集群模式一个消息队列同一时间只能分配给组内的一个消费者进行消费。

RocketMQ5.0以前是按照队列粒度进行负载均衡的,5.0以后提供了按消息粒度进行负载均衡。

队列粒度负载均衡

队列粒度负载均衡策略中,同一消费者组内的多个消费者将按照队列粒度消费消息,每个队列只能被其中一个消费者消费。

队列粒度负载均衡是在每个消费者端进行的,并不是由某个节点统一进行负载均衡之后将分配结果通知到每个消费者。

消费者增加或者减少会影响消息队列的分配,所以Broker需要感知消费者的上下线情况。

消费者在启动时会向所有的Broker发送心跳包进行注册,通知Broker消费者上线,下线的时候也会向Broker发送取消注册的请求。

Broker会维护消费者信息的注册信息,在消费者发生变更时会通知消费者进行负载均衡。

Rebalance触发时机

消费者启动时触发:

消费者在启动时会进行一次负载均衡,为自己分配消息队列。

Broker发现消费组变更时触发:

处于以下两种情况之一时会被判断为消费组发生了变化,需要进行负载均衡:

  • 某个消费组内有新的消费者向Broker进行了注册。

    • 比如某个消费组原来有两个消费者,现在新增了一个消费者,新增的消费者启动时会向Broker发送注册请求。

  • 消费组订阅的主题信息发生了变化。

    • 比如消费组新增订阅了某个主题或者取消某个主题的订阅,会被判断为主题订阅信息发生了变化。

被判定为变化之后,会触发变更事件,向该消费者下的所有消费者发送发送变更请求,通知组下每个消费者进行负载均衡。

Broker收到消费者下线时触发:

如果有消费者向Broker发送UNREGISTER_CLIENT取消注册请求,并且开启了允许通知变更,会触发变更事件。

变更事件同上,Broker会通知该消费者组下的所有消费者进行一次负载均衡。

消费者定时触发:

消费者本身也会定时执行负载均衡,默认是20s执行一次。

消息粒度负载均衡

RocketMQ5.0之后,增加了消息粒度负载均衡策略,默认且仅使用消息粒度负载均衡策略。

消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息。

  • 即同一个队列中的消息,可被平均分配给组内多个消费者共同消费。

消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理。

但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。

  • 当消费者获取到某条消息后,服务端会对该消息加锁,保证该消息对其他消费者不可见,直到消息消费成功或者超时。

所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费。

消息粒度负载均衡策略适用于绝大多数在线处理的业务场景,对于流式处理、聚合计算等场景,更适合队列粒度的负载均衡策略。

执行流程

负载均衡服务执行逻辑在doRebalance函数,里面会对每个消费者组执行负载均衡操作。

consumerTable这个map对象里存储了消费者组对应的的消费者实例。

private ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();public void doRebalance() {//每个消费者组都有负载均衡for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}
}

由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,最终是按topic的维度进行负载均衡。

public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {//按topic维度执行负载均衡this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();
}

最终负载均衡逻辑处理的实现在:

  • org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

其中分为广播消息和集群消息模型两种情况处理。

负载均衡核心功能的主流程,主要做了4件事情:

负载均衡策略原理

负载均衡策略顶层接口:

 
/*** Strategy Algorithm for message allocating between consumers*/
public interface AllocateMessageQueueStrategy {/*** Allocating by consumer id* 给消费者id分配消费队列*/List<MessageQueue> allocate(final String consumerGroup, //消费者组final String currentCID, //当前消费者idfinal List<MessageQueue> mqAll, //所有的队列final List<String> cidAll //所有的消费者);}

他默认共有7种负载均衡策略实现。

最常用的两种平均分配算法:

AllocateMessageQueueAveragely

是用总数除以消费者个数,余数按消费者顺序分配给消费者。

AlocateMessageQueueAveragelyByCircle

轮流一个一个分配。

 


http://www.ppmy.cn/server/146235.html

相关文章

IDEA敲Web前端快捷键

1.html基础格式 英文符号TAB键 <!doctype html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport"content"widthdevice-width, user-scalableno, initial-scale1.0, maximum-scale1.0, mini…

docker查询是否运行

您可以通过运行以下命令来检查Docker是否正在运行&#xff1a; docker info 或者&#xff1a; docker ps 如果Docker正在运行&#xff0c;docker info将显示Docker的详细信息&#xff0c;而docker ps将列出当前运行的容器。如果Docker没有运行&#xff0c;这些命令将会返回错误…

上传镜像docker hub登不上和docker desktop的etx4.vhdx占用空间很大等解决办法

平时使用docker一般都在Linux服务器上&#xff0c;但这次需要将镜像上传到docker hub上&#xff0c;但是服务器上一直无法登录本人的账号&#xff0c;&#xff08;这里的问题应该docker 网络配置中没有开代理的问题&#xff0c;因服务器上有其他用户使用&#xff0c;不可能直接…

鸿蒙生态崛起!!!

方向一&#xff1a;阐述对鸿蒙生态的认知和了解&#xff0c;并对鸿蒙生态的崛起进行简要分析 鸿蒙生态是基于OpenHarmony共建共享的生态&#xff0c;是所有基于OpenHarmony系统&#xff08;开源鸿蒙&#xff09;社区版本开发、并通过开放原子开源基金会的开源鸿蒙认证&#xff…

【连接池】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…

【纯原生js】原生实现h5落地页面中的单选组件按钮及功能

h5端的按钮系统自带的一般都很丑&#xff0c;需要我们进行二次美化&#xff0c;比如单选按钮复选框之类的&#xff0c;那怎么对其进行html和css的改造&#xff1f; 实现效果 实现代码 <section id"tags"><h2>给景区添加标题</h2><label><…

第R4周:LSTM-火灾温度预测(TensorFlow版)

>- **&#x1f368; 本文为[&#x1f517;365天深度学习训练营]中的学习记录博客** >- **&#x1f356; 原作者&#xff1a;[K同学啊]** 往期文章可查阅&#xff1a; 深度学习总结 任务说明&#xff1a;数据集中提供了火灾温度&#xff08;Tem1&#xff09;、一氧化碳浓度…

挂载本地目录到k8s的pod实现持久化存储

本地目录实现持久化存储 容器是无状态的,每次重启都是新的进程,但是我们需要将一些状态数据如配置、用户数据等存到本地来方便新的容器可以拿到历史状态。先创建一个目录来存放数据,并且挂载到minikube虚拟机内(不是pod里面)。注意要新开一个终端来调用,这个命令会阻塞,…