Rocketmq面试(四)RocketMQ 的推模式和拉模式有什么区别?

news/2024/11/15 4:54:19/

一、PUSH模式

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 初始化consumer,并设置consumer group nameDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876");//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息consumer.subscribe("TopicTest", "*");//注册回调接口来处理从Broker中收到的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动Consumerconsumer.start();System.out.printf("Consumer Started.%n");}
}

消费者会定义一个消息监听器,并把这个监听器注册到DefaultMQPushConsumer,同时也会注册到其实现类,当拉取消息的时候,就会使用这个监听器来处理消息,那这个监听器什么时候调用呢?

消费者真正拉取请求的类是DefaultMQPush-ConsumerImpl,这个类的pullMessage方法调用了PullApiWrapper的pullKernelImpl方法,这个方法有一个参数是回调函数Pull-Callback,当PULL状态是PullStatus.FOU-ND代表拉取成功。

PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND://省略部分逻辑DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//省略部分逻辑break;//省略其他casedefault:break;}}}@Overridepublic void onException(Throwable e) {//省略}
};

这个处理逻辑调用了ConsumerMessageService的submitConsumeRequest方法,看一下并发消费的处理逻辑

public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {//分批处理,跟上面逻辑一致
}

ConsumerRequest是一个线程类,run方法里面调用了消费者定义的消息处理方法

public void run() {//省略逻辑MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;//省略逻辑try {//调用消费方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {//省略逻辑}//省略逻辑
}

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

1.在MessageListenerConcurrently中定义消费者处理逻辑,在消费者启动的时候注册到DefultMQpushConsumer,和DefaultMQ-PushConsumerImpl

2.消费者启动时,启动消费拉取线程PullMessageService,里面死循环不停的从broker拉取消息,这里调用了DefaultMQPushConsumerImpl类的pullMessage方法

3.DefaultMQPushConsumerImpl类的pullMessage方法调用了PullAPIWrapper的pullKernelImpl方法真正的发送PULL请求,并传入PullCallback的回调函数

4.拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息;

5.ConsumerRequest处理消息时调用了消费端定义的消费逻辑,也就是MessageListerConcurrently的consumeMessage方法。

pull模式

官方代码

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {while (running) {List<MessageExt> messageExts = litePullConsumer.poll();System.out.printf("%s%n", messageExts);}
} finally {litePullConsumer.shutdown();
}

可以我们看到,PULL模式需要在处理逻辑里面不停的去拉取,比如上面代码中写了一个死循环,那PULL模式中poll韩式是怎么实现的呢?

跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe

    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {try {if (topic == null || topic.equals("")) {throw new IllegalArgumentException("Topic can not be null or empty.");}setSubscriptionType(SubscriptionType.SUBSCRIBE);SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),topic, subExpression);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);if (serviceState == ServiceState.RUNNING) {this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();updateTopicSubscribeInfoWhenSubscriptionChanged();}} catch (Exception e) {throw new MQClientException("subscribe exception", e);}}

这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。在消费者启动时,会启动 RebalanceService 这个线程,可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:

public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();if (messageQueueListener != null) {try {messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);} catch (Throwable e) {log.error("messageQueueChanged exception", e);}}
}

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

  1. 消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器;

  2. 消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程;

  3. 消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取;

  4. 消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。

总结

通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:

  1. PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑

  2. PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。


http://www.ppmy.cn/news/352198.html

相关文章

Python各目录下的__init__.py文件对Python模块化的重要作用

在Python项目中&#xff0c;__init__.py文件是一个特殊的Python文件&#xff0c;它的存在是为了将一个目录视为Python包。 __init__.py文件的主要作用有以下几点&#xff1a; 标识包&#xff1a;在一个目录中添加__init__.py文件&#xff0c;可以将该目录标识为一个Python包。…

优秀的杀毒软件

卡巴斯基&#xff08;kaspersky&#xff09; 小红伞&#xff08;avira&#xff09; 火绒 。。。。 评测结果

Altium Designer二次开发

Altium Designer二次开发就在该软件原有的基础上&#xff0c;自己写代码给它添加新功能&#xff0c;如&#xff1a;一键生成Gerber&#xff0c;计算铺铜面积&#xff0c;PCB走线的寄生参数和延时等等。 Altium Designer二次开发有两种方式&#xff0c;一种是基于Altium Designe…

推荐杀毒软件

卡巴斯基 卡巴斯基&#xff0c;英文名Kaspersky&#xff0c;是一款来源于俄罗斯的为用户度身定制的反病毒软件&#xff0c;查杀病毒性能远高于同类产品。它具有超强的中心管理和杀毒能力&#xff0c;能真正实现带毒杀毒!卡巴斯基软件技术安全且易于使用&#xff0c;能够为用户提…

企业中常见的杀毒软件

提到杀毒软件&#xff0c;很多人可能首先想到的就是360、腾讯安全管家、金山毒霸之类的国产免费杀毒软件。但是在企业环境中&#xff0c;除了一些小型公司外&#xff0c;基本不会选择这些免费的国产杀毒软件的。原因在于&#xff1a; 1、这类免费杀毒软件基本都附带推广信息 2、…

Mybatis《学习笔记(22版尚硅谷)》

Mybatis简介 MyBatis历史 MyBatis最初是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation迁移到了Google Code。随着开发团队转投Google Code旗下&#xff0c;iBatis3.x正式更名为MyBatis。代码于2013年11月迁移到GithubiBatis一词来源于“intern…

vue3的api解读-生命周期钩子

目录 什么是钩子&#xff08;hook&#xff09;&#xff1f; 钩子到生命周期的映射 vue的生命周期钩子示例 onRenderTracked/onRenderTriggered 用钩子还是watch&#xff1f; 什么是钩子&#xff08;hook&#xff09;&#xff1f; 钩子是一种消息机制&#xff1a; 例1&…

正则表达式特殊字符 [:alnum:] 等

如果在"[]"中出现了"^"&#xff0c;代表本表达式不匹配"[]"内出现的字符&#xff0c;比如"/[^a-z]/"不匹配任何小写字母&#xff01;并且正则表达式给出了几种"[]"的默认值&#xff0c;如下&#xff1a; [:alnum:] 匹配任何…