28.RocketMQ之消费者的负载均衡源码

news/2024/12/5 6:39:35/

highlight: arduino-light

消费者负载均衡流程

当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是在同1个消费组中。此时一个消费组中多个Consumer消费一个Topic,而一个Topic对应多个MessageQueue。

比如TopicA有2个Consumer,6个MessageQueue,那么这6个MessageQueue怎么分配呢?

这就涉及到Consumer的负载均衡了。

首先 Consumer 在启动时,会把自己注册给所有 Broker并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。

然后Consumer在消费时,会随机连接一台Broker ,获取消费组中的所有Consumer。

然后根据要消费的Topic的messageQueue和消费者的数量做负载均衡。

因为是分布式环境下的负载均衡,所以如何让每个消费者都能保证看到的视图是一样呢?

答案是先对消费者和messageQueue排序,然后每个消费者使用相同的负载均衡策略做负载均衡。

这样每个消费者看到的负载均衡分配的messageQueue的视图就是一致的。

负载均衡主要流程如下:

假如topicA 有 6个队列。

消费者1启动订阅topicA。通过findConsumerIdList方法到broker获取到所有的消费者,发现是1个。

对消费者和队列进行排序,然后执行负载均衡策略,获取到当前消费者的分配到的mqSet是6个。

此时消费者2启动,获取所有的消费者,发现是2个。

对消费者和队列进行排序。执行负载均衡策略。获取到当前消费者分配到的mqSet是4 5 6。

此时消费者2就可以对4 5 6这3个队列做消息的拉取了。

过了20秒消费者1的负载均衡任务又触发了,流程同上,消费者1获取到当前消费者分配到的mqSet是1 2 3。

此时消费者1就可以对1 2 3 这3个队列做消息的拉取了。

说明:为了方便,有些地方的messageQueue使用mq代替。

```md /* 循环遍历所有的topic,获取对应的队列集合messageQueueSet。 判断是否是顺序消费,是否顺序消费判断的依据是当前消费者绑定的listener是并发还是顺序

在广播模式下 1.遍历的是所有的mqset

在集群模式下 1.遍历的是所有的allocateResult 即根据负载均衡策略分配的mqSet 2.移除新增的mq 3.移除长时间未拉取的mq 不会移除顺序消费且是集群模式的mq,也就是说不会参与负载均衡?? 4.遍历移除了不重要的(新增||长时间未拉取)的mq的mqSet 5.如果消费者是顺序消费,尝试加锁该MessageQueue(远程向服务器端请求加锁,并设置本地mq加锁状态),   加锁失败就跳过该mq 不会构建该mq的pullRequest。否则也会构建该mq的pullRequest。 6.清除本地该mq的消费offset,从服务器拉取更新本地mq的消费offset 7.构建pullRequest准备拉取消息

在 rebalance 时,需要对 队列,还有消费者客户端 ID 进行排序,以确保同一个消费组下的视图是一致的。 */ ```

ServiceThread#start

java public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; //this 指的是rebalanceService this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); }

RebalanceService#run

java @Override public void run() { log.info(this.getServiceName() + " service started"); // while (!this.isStopped()) { //waitInterval 20秒 也就是每20秒需要负载均衡一次 this.waitForRunning(waitInterval); //做负载均衡操作 this.mqClientFactory.doRebalance(); } ​ log.info(this.getServiceName() + " service end"); }

MQClientInstance#doRebalance

每个消费者都需要做负载均衡

public void doRebalance() { //MQClientInstance遍历已注册的消费者,对消费者执行doRebalance()方法 for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }

MQConsumerInner有2个实现,DefaultMQPullConsumerImpl和DefaultMQPushConsumerImpl。

分别是对应拉和推模式。默认DefaultMQPushConsumerImpl即推。

```java //DefaultMQPullConsumerImpl //pull this.rebalanceImpl.doRebalance(false);

//DefaultMQPushConsumerImpl   //push this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); ```

重点看推模式下的负载均衡。

DefaultMQPushConsumerImpl#doRebalance

假如topicA 有 6个队列。

消费者1启动订阅topicA。通过findConsumerIdList方法到broker获取到所有的消费者,发现是1个。

对消费者和队列进行排序,然后执行负载均衡策略,获取到当前消费者的分配到的mqSet是6个。

此时消费者2启动,获取所有的消费者,发现是2个。

对消费者和队列进行排序。执行负载均衡策略。获取到当前消费者分配到的mqSet是4 5 6。

此时消费者2就可以对4 5 6这3个队列做消息的拉取了。

过了20秒消费者1的负载均衡任务又触发了,流程同上,消费者1获取到当前消费者分配到的mqSet是1 2 3。

此时消费者1就可以对1 2 3 这3个队列做消息的拉取了。


参考链接:https://blog.csdn.net/HoneyYHQ9988/article/details/105941328

参考链接:https://blog.csdn.net/HoneyYHQ9988/article/details/105941328

```java /* consumeOrderly:是否是顺序消费,consumeOrderly由监听器的类型决定 如果监听器是MessageListenerOrderly consumeOrderly 是true 如果监听器是MessageListenerConcurrently consumeOrderly 是false if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService (this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService (this, (MessageListenerConcurrently) this.getMessageListenerInner()); } */ @Override public void doRebalance() { if (!this.pause) {

this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}

} ``` 注意doRebalance的参数consumeOrderly。

consumeOrderly:是否是顺序消费,consumeOrderly由监听器的类型决定。

如果监听器是MessageListenerOrderly consumeOrderly是true。

如果监听器是MessageListenerConcurrently consumeOrderly是false。

RebalanceImpl#doRebalance

```java //isOrder是否需要顺序消费消息 public void doRebalance(final boolean isOrder) { //遍历topic 对每个topic订阅的队列进行重新负载 // ConcurrentMap subscriptionInner //subscriptionInner的结构是1个Map,key是Topic,value是订阅信息 Map subTable = this.getSubscriptionInner(); if (subTable != null) { //注意在这里是for循环每个topic for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { //根据topic负载均衡 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY GROUPTOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } }

this.truncateMessageQueueNotMyTopic();

} ```

这里有个点:是for循环遍历topic,难道每个消费者组中的消费者还能订阅多个topic?

答案是肯定的!

有兴趣的同学可参考:https://blog.csdn.net/u011385940/article/details/130511718

接着往下看

RebalanceImpl#rebalanceByTopic

``` //topic 消息主题 //isOrder 是否顺序消费 private void rebalanceByTopic(final String topic, final boolean isOrder) {

switch (messageModel) {//广播模式每个消费者都需要消费所有的消息//所以在updateProcessQueueTableInRebalance中传入的是该topic下的所有的mqSetcase BROADCASTING: {//获取每个topic的所有队列集合Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);}} else {//不存在MessageQueue}break;}//集群模式case CLUSTERING: {//1.获取topic的所有队列集合Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//2.从broker端获取topic的消费者集合,消费者集合种存放的是所有消费者的clientId//先根据topic获取topic所在的所有的broker的地址//随机选择1个broker地址拉取消费者列表//cidAll中的clientId长这样:172.18.120.141@21908List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {//不存在MessageQueue}}if (null == cidAll) {}//给消费者分配队列if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);//将cid和mq做排序 保证每个消费者的视图一致Collections.sort(mqAll);Collections.sort(cidAll);//默认是AllocateMessageQueueAveragelyAllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//AllocateMessageQueueAveragely的allocate//mqAll 队列集合//cidAll 消费者集合//根据队列集合和消费者集合进行重新负载均衡allocateResult = strategy.allocate(this.consumerGroup,//本机的clientIdthis.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception}");return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//集群模式  //集群模式每个消费者组的一个实例对个一个//所以在updateProcessQueueTableInRebalance中传入的是allocateResultSetboolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed");this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}
}

```

广播模式

广播模式首先获取mqset,mqSet是该topic下面的所有的messageQueue。

然后调用updateProcessQueueTableInRebalance方法,根据返回值判断是否调用messageQueueChanged方法。

集群模式

1.获取topic下的所有队列的集合记作mqSet。

2.根据topic获取topic所在的所有的broker的地址,然后随机选择1个broker地址拉取消费者列表,

3.消费者集合中存放的是所有消费者的clientId记作cidAll。cidAll中的clientId格式:172.18.120.141@21908

4.将cid和mq做排序 保证每个消费者的视图一致

5.获取负载均衡策略,默认是AllocateMessageQueueAveragely

6.调用负载均衡策略的allocate方法,需要传入mqSet、cidAll、消费者的clientId、消费者组,allocate方法返回结果记作allocateResult。

7.将分配给自己的mq集合allocateResult加入allocateResultSet。

然后调用updateProcessQueueTableInRebalance方法,根据返回值判断是否调用messageQueueChanged方法。

虽然广播和集群都调用了updateProcessQueueTableInRebalance方法但是传入的参数mqSet不同。

广播模式传入的是topic下面的所有的messageQueue。

集群模式传入的allocateResult是该消费者在该topic分配到的队列。

其实不管是集群还是广播,传入的参数mqSet都是分配给消费者的队列。这么理解也没毛病!

RebalanceImpl#updateProcessQueueTableInRebalance

```java //mqSet消费者分配到的消息队列的集合 //isOrder是否是顺序消息 private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,final boolean isOrder) { boolean changed = false;

//it中存放的是Entry<MessageQueue,ProcessQueue>
Iterator<Entry<MessageQueue,ProcessQueue>> it = this.processQueueTable.entrySet().iterator();

//遍历Entry while (it.hasNext()) {

Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) {//假如所有的是123456,当前消费者分配的是456//那么遍历123456 判断当前消费者分配的456是否包含123456//这样做的目的是防止原来分配的是123 现在分配的是456//如果不丢弃123 会造成123的重复消费//mqSet是分配给当前消费者的队列集合//如果这个队列没有分配给当前消费者 //那么需要丢弃该队列丢弃该ProcessQueue//那么在拉取消息的时候就不会对该ProcessQueue进行拉取if (!mqSet.contains(mq)) {//设置丢弃标识pq.setDropped(true);//广播模式直接返回true//如果是集群模式且顺序消费 返回false//把该mq的客户端消费offset更新到broker保存,移除客户端该mq的消费offset记录。//移除已经下线的mqif (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, remove unnecessary mq");}//对于原来分配给当前消费者现在又分配给当前消费者的队列需要做失效判断//比如原来分配的是123 现在分配的是34 那么队列3就是符合这种条件的。//PULL_MAX_IDLE_TIME 默认是2分钟,参数rocketmq.client.pull.pullMaxIdleTime,12000秒//当前时间-拉取开始时间大于2分钟    //lastPullTimestamp + 2分钟 < 当前时间  认为该mq失效 需要丢弃} else if (pq.isPullExpired()) { switch (this.consumeType()) {//PULLcase CONSUME_ACTIVELY:break;//PUSHcase CONSUME_PASSIVELY://push走这里//把ProcessQueue置为失效//这样在PullService线程拉取的时候该对象是失效状态,就不再拉取该对象pq.setDropped(true);//广播模式直接返回true//如果是集群模式且顺序消费 返回false//把该mq的客户端消费offset更新到broker保存//移除客户端该mq的消费offset记录。//从集合移除长时间没有拉取消息的mqif (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;}break;default:break;}}}
}//end whileList<PullRequest> pullRequestList = new ArrayList<PullRequest>();//mqSet是清理了不必要的mq的mqSet
for (MessageQueue mq : mqSet) {//说明该mq是新增的 旧的不需要分配 已经分配过了if (!this.processQueueTable.containsKey(mq)) {//如果消费者是顺序消费//尝试加锁该MessageQueue//加锁失败就跳过该mq 不会拉去该mq的消息//顺序消费并且加锁成功 或者 是并发消费 才会继续往下执行if (isOrder && !this.lock(mq)) {log.warn("doRebalance, add a new mq failed, because lock failed");continue;}//消费客户端移除该mq的消费offset//通过LocalFileOffsetStore或RemoteBrokerOffsetStore移除该消息队列的消费offsetthis.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();//向broker发送命令QUERY_CONSUMER_OFFSET获取broker端记录的该mq的消费offset//默认是CONSUME_FROM_LAST_OFFSET//使用offsetStore读取消费进度 //广播是本地消费进度 集群是远程消费进度long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {//说明当前消息队列之前已经被分配给当前消费者//已经存在的ProcessQueue拉取offset,在每次拉取到消息以后会重新设置log.info("doRebalance, {}, mq already exists");} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);//构建拉取PullRequestPullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);//设置拉取消息的偏移量pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);//将新加入的放入pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}
}
//遍历pullRequestList集合
//把pullRequest对象添加到PullMessageService服务线程的阻塞队列内供PullMessageService拉取执行
this.dispatchPullRequest(pullRequestList);
return changed;

} ```

updateProcessQueueTableInRebalance是消费端重新负载的核心方法。

顾名思义功能就是更新处理器队列集合RebalanceImpl.processQueueTable。

那么什么是processQueue,它的作用是什么呢?

它是一个队列消费快照,消息拉取的时候,会将实际消息体、拉取相关的操作存放在其中。比如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供。

另外在拉取消息的时候使用的是PullRequest去请求,PullRequest结构如下:

java public class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean previouslyLocked = false; }

可以发现1个ProcessQueue和1个MessageQueue是一一对应的,MessageQueue用来表示拉取回来的消息元数据信息。

具体可参考:https://blog.csdn.net/Saintmm/article/details/120723628

假设有6个队列,有2个消费者分别是消费者A和消费者B。

————————————————消费者B第一轮负载均衡——————————————————

消费者B第一次做负载均衡分到的队列是123

1.先从processQueueTable获取Entry 集合it

2.由于是第一次做负载均衡,所以it为空,此时跳过while循环

3.遍历本次分到的mqSet,判断processQueueTable中是否存在对应的mq,如果不存在说明是新分配的mq。

4.如果是新分配给当前消费者的mq&&当前消费者是顺序消费&&加锁失败啥也不做。

为什么要加锁?

因为顺序消费需要加分布式锁+本地锁,此处是在broker加分布式锁。

为什么加锁失败啥也不做?

因为加锁失败,说明是其它的消费者在broker加的分布式锁还没释放,那么等下次Rebalance的时候再尝试加锁即可。

5如果是新分配给当前消费者的mq&&当前消费者不是顺序消费&&加锁成功。

5.1调用removeDirtyOffset清除本地的脏消费偏移量

5.2重新计算队列拉取消息的偏移量。

5.3构建拉取消息的PullRequest

6.分发PullRequest拉取消息

————————————————消费者B第二轮负载均衡——————————————————

消费者B第二次做负载均衡分到的队列mqSet是34

1.先从processQueueTable获取Entry 集合it,集合it中存放的是123

2.由于是第二次做负载均衡,所以it不为空,此时进入while循环

3.遍历it集合,集合it中存放的是123

4.先判断分配的mqSet是否包含1、2、3,mqSet是3、4,所以队列1、2会被丢弃

5.队列3显然是包含在mqSet中的,对于队列3会判断拉取是否过期,如果拉取过期那么要丢弃队列3,在removeUnnecessaryMessageQueue方法中如果消费者B是顺序消费还需要释放broker端的分布式锁。

6.遍历本次分到的mqSet,判断processQueueTable中是否存在对应的mq,如果不存在说明是新分配的mq。

7.如果是新分配给当前消费者的mq&&当前消费者是顺序消费&&加锁失败啥也不做。

为什么要加锁?

因为顺序消费需要加分布式锁+本地锁,此处是在broker加分布式锁。

为什么加锁失败啥也不做?

因为加锁失败,说明是其它的消费者在broker加的分布式锁还没释放,那么等下次Rebalance的时候再尝试加锁即可。

8.如果是新分配给当前消费者的mq&&当前消费者不是顺序消费&&加锁成功。

8.1调用removeDirtyOffset清除本地的脏消费偏移量

8.2重新计算队列拉取消息的偏移量。

8.3构建拉取消息的PullRequest

9.分发PullRequest拉取消息

RebalancePushImpl#removeUnnecessaryMessageQueue

``` @Override public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { //持久化当前消费进度到broker this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); //移除本地消费进度 this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); //集群模式且顺序消费 if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals (this.defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { try { return this.unlockDelay(mq, pq); } finally { pq.getLockConsume().unlock(); } } else { log.warn("mq is consuming, so can not unlock it, {}. maybe hanged for a while"); //增加tryUnlockTimes pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception", e); } return false; }

//否则返回true
return true;

} ```

his.unlockDelay(mq, pq);中调用RebalancePushImpl.this.unlock(mq, true);解除分布式锁

RebalancePushImpl#computePullFromWhere

``` @Override public long computePullFromWhere(MessageQueue mq) { long result = -1; //默认是从上一个OFFSET消费 默认策略,从该队列最尾开始消费,即跳过历史消息 //private ConsumeFromWhere consumeFromWhere = // ConsumeFromWhere.CONSUMEFROMLASTOFFSET; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUMEFROMLASTOFFSETANDFROMMINWHENBOOTFIRST: case CONSUMEFROMMINOFFSET: case CONSUMEFROMMAXOFFSET: //默认是从上次消费的地方拉取 case CONSUMEFROMLASTOFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { //返回本地消费的偏移量 result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { result = 0L; } else { try { //从broker服务器拉取 result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUMEFROMFIRSTOFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUMEFROMTIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; }

default:break;}return result;
}

``` 默认是CONSUMEFROMLAST_OFFSET,从上次消费的地方拉取。

先从offsetStore获取拉取消息的偏移量,如果从offsetStore获取不到。

那么从broker端获取队列拉取消息的偏移量。

RebalancePushImpl#dispatchPullRequest

java @Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { //遍历所有的pullRequest for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl .executePullRequestImmediately(pullRequest); } }

分发PullRequest,此时会立刻执行1个拉取消息的动作,至于消息是怎么拉取的,下篇文章见。

我们继续看DefaultMQPullConsumerImpl负载均衡。

DefaultMQPullConsumerImpl负载均衡

MQClientInstance#start

``` public void start() throws MQClientException {

synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// 启动负载均衡服务 这里是一个Thread 所以看run方法this.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object has been created before, and failed.", null);default:break;}}
}

```

ServiceThread#start

public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; //this 指的是rebalanceService this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); }

RebalanceService#run

``` @Override public void run() { log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {this.waitForRunning(waitInterval);//做负载均衡操作this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");
}

```

MQClientInstance#doRebalance

每个消费者都需要做负载均衡

 public void doRebalance() {     //MQClientInstance遍历已注册的消费者,对消费者执行doRebalance()方法        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {            MQConsumerInner impl = entry.getValue();            if (impl != null) {                try {                    //MQConsumerInner impl有2个实现                                        //DefaultMQPullConsumerImpl                    //pull this.rebalanceImpl.doRebalance(false);                                        //DefaultMQPushConsumerImpl                      //push this.rebalanceImpl.doRebalance(this.isConsumeOrderly());                    impl.doRebalance();               } catch (Throwable e) {                    log.error("doRebalance exception", e);               }           }       }   }

DefaultMQPullConsumerImpl#doRebalance

@Override    public void doRebalance() {        if (this.rebalanceImpl != null) {            //注意这里传的一直是false            this.rebalanceImpl.doRebalance(false);       }   }

DefaultMQPushConsumerImpl做负载均衡是否是顺序消费是根据消费者关联的监听器确定。

DefaultMQPullConsumerImpl做负载均衡是否是顺序消费传的是false。

DefaultMQPushConsumerImpl剩下的流程和DefaultMQPushConsumerImpl一模一样


http://www.ppmy.cn/news/758522.html

相关文章

panabit

管理口ipputty登录rootpanaos 命令echo "nameserver 114.114.114.114" >> /etc/resolv.conf产看是否写入成功cat /etc/resolv.conf 通过这个命令查 logeye user list 默认密码 rootroot 改密码 passwd 转载于:https://www.cnblogs.com/nxiao/articles/64088…

panabit之Web认证

panabit之Web认证 要求用户通过 WEB 认证实现一机一账号上网。 1、开启 WEB 认证 2、编写 WEB 账号密码

panabit之MAC管控

panabit之MAC管控 原理 对网络进行 MAC 和 IP 地址绑定。 需求 学校要求对违规 IP 地址的流量进行丢弃。 流程 1、开启 MAC 绑定功能 2、MAC 地址绑定 3、修改 IP 地址验证 注意&#xff0c;MAC管控只能用在交换机连接主机的环境中&#xff0c;若是多层的网络拓扑&…

panabit之HTTP管控

panabit之HTTP管控 原理 对网络进行网址过滤。 需求 例如&#xff0c;学校要求教师上班不能访问购物网址&#xff0c;访问则被禁止并提示“上班时间&#xff0c;不得购物”&#xff0c;学生不得访问视频网址&#xff0c;访问则被禁止并提示“好好学习&#xff0c;天天向上”…

panabit部署模式

panabit 部署模式 路由模式 panabit 此时外接互联网&#xff0c;内接内网&#xff0c;相当于一个路由器的功能&#xff0c;除了转发数据流外&#xff0c;还可以部署 NAT。因为内网的流量需要经过 panabit&#xff0c;所以可以起到监管流量的作用。 网桥模式 相较上一幅拓扑多…

python-ap-imc-panabit-mysql

利用Python抓取ap的登录ip去imc拿到用户信息添加到panabit白名单后记录到数据库 from pysnmp.entity.rfc3413.oneliner import cmdgen import requests import urllib.request import json import time import pymysql add_ipaddress_groups [] oid_number { ap1:1.3.6.1.…

panabit虚拟机安装以及实现的细节研究

panabit是国内协议识别性能最高的厂商&#xff0c;作为一个安全开发者&#xff0c;确实很有理由去研究下。 panabit一般运行在freeBSD系统下&#xff0c;但是最近的版本都有liveCD这也省去了我很多下在系统的时间。直接用vmware新建一个虚拟机&#xff0c;如果需要功能正常的话…

Panabit 安装笔记之Panabit的安装和配置

PanaBit 2008 的安装和配置 安装前的准备: 1. 安装好 FreeBsd6.2 ,并确认网卡为 2 块网卡,在此 Panabit 工作在旁听模式下,如果要工作