一、PUSH模式
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 初始化consumer,并设置consumer group nameDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876");//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息consumer.subscribe("TopicTest", "*");//注册回调接口来处理从Broker中收到的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动Consumerconsumer.start();System.out.printf("Consumer Started.%n");}
}
消费者会定义一个消息监听器,并把这个监听器注册到DefaultMQPushConsumer,同时也会注册到其实现类,当拉取消息的时候,就会使用这个监听器来处理消息,那这个监听器什么时候调用呢?
消费者真正拉取请求的类是DefaultMQPush-ConsumerImpl,这个类的pullMessage方法调用了PullApiWrapper的pullKernelImpl方法,这个方法有一个参数是回调函数Pull-Callback,当PULL状态是PullStatus.FOU-ND代表拉取成功。
PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND://省略部分逻辑DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//省略部分逻辑break;//省略其他casedefault:break;}}}@Overridepublic void onException(Throwable e) {//省略}
};
这个处理逻辑调用了ConsumerMessageService的submitConsumeRequest方法,看一下并发消费的处理逻辑
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {//分批处理,跟上面逻辑一致
}
ConsumerRequest是一个线程类,run方法里面调用了消费者定义的消息处理方法
public void run() {//省略逻辑MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;//省略逻辑try {//调用消费方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {//省略逻辑}//省略逻辑
}
下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:
1.在MessageListenerConcurrently中定义消费者处理逻辑,在消费者启动的时候注册到DefultMQpushConsumer,和DefaultMQ-PushConsumerImpl
2.消费者启动时,启动消费拉取线程PullMessageService,里面死循环不停的从broker拉取消息,这里调用了DefaultMQPushConsumerImpl类的pullMessage方法
3.DefaultMQPushConsumerImpl类的pullMessage方法调用了PullAPIWrapper的pullKernelImpl方法真正的发送PULL请求,并传入PullCallback的回调函数
4.拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息;
5.ConsumerRequest处理消息时调用了消费端定义的消费逻辑,也就是MessageListerConcurrently的consumeMessage方法。
pull模式
官方代码
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {while (running) {List<MessageExt> messageExts = litePullConsumer.poll();System.out.printf("%s%n", messageExts);}
} finally {litePullConsumer.shutdown();
}
可以我们看到,PULL模式需要在处理逻辑里面不停的去拉取,比如上面代码中写了一个死循环,那PULL模式中poll韩式是怎么实现的呢?
跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {try {if (topic == null || topic.equals("")) {throw new IllegalArgumentException("Topic can not be null or empty.");}setSubscriptionType(SubscriptionType.SUBSCRIBE);SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),topic, subExpression);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);if (serviceState == ServiceState.RUNNING) {this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();updateTopicSubscribeInfoWhenSubscriptionChanged();}} catch (Exception e) {throw new MQClientException("subscribe exception", e);}}
这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。在消费者启动时,会启动 RebalanceService 这个线程,可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();if (messageQueueListener != null) {try {messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);} catch (Throwable e) {log.error("messageQueueChanged exception", e);}}
}
下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:
-
消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器;
-
消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程;
-
消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取;
-
消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。
总结
通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:
-
PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑;
-
PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。