聊聊PushConsumer与SimpleConsumer拉取消息的区别

ops/2024/9/17 19:06:16/ 标签: 开发语言, java, rocketmq

本文主要研究一下rocketmq5的PushConsumer与SimpleConsumer拉取消息的区别

ProcessQueueImpl

org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

    private void receiveMessageImmediately(String attemptId) {final ClientId clientId = consumer.getClientId();if (!consumer.isRunning()) {log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);return;}try {final Endpoints endpoints = mq.getBroker().getEndpoints();final int batchSize = this.getReceptionBatchSize();final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,longPollingTimeout, attemptId);activityNanoTime = System.nanoTime();// Intercept before message reception.final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);consumer.doBefore(context, Collections.emptyList());final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,longPollingTimeout);Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// Intercept after message reception.final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream().map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList());final MessageInterceptorContextImpl context0 =new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);consumer.doAfter(context0, generalMessages);try {onReceiveMessageResult(result);} catch (Throwable t) {// Should never reach here.log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "+ "clientId={}", mq, endpoints, clientId, t);onReceiveMessageException(t, attemptId);}}@Overridepublic void onFailure(Throwable t) {String nextAttemptId = null;if (t instanceof StatusRuntimeException) {StatusRuntimeException exception = (StatusRuntimeException) t;if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {nextAttemptId = request.getAttemptId();}}// Intercept after message reception.final MessageInterceptorContextImpl context0 =new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);consumer.doAfter(context0, Collections.emptyList());log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,clientId, t);onReceiveMessageException(t, nextAttemptId);}}, MoreExecutors.directExecutor());receptionTimes.getAndIncrement();consumer.getReceptionTimes().getAndIncrement();} catch (Throwable t) {log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);onReceiveMessageException(t, attemptId);}}

PushConsumer通过ProcessQueueImpl的receiveMessageImmediately拉取消息,其内部是通过consumer.receiveMessage(request, mq, longPollingTimeout)来拉取的,request是通过consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression, longPollingTimeout, attemptId)构建的

SimpleConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java

    public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {if (!this.isRunning()) {log.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",this.state(), clientId);final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");return Futures.immediateFailedFuture(e);}if (maxMessageNum <= 0) {final IllegalArgumentException e = new IllegalArgumentException("maxMessageNum must be greater than 0");return Futures.immediateFailedFuture(e);}final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);final ArrayList<String> topics = new ArrayList<>(copy.keySet());// All topic is subscribed.if (topics.isEmpty()) {final IllegalArgumentException e = new IllegalArgumentException("There is no topic to receive message");return Futures.immediateFailedFuture(e);}final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));final FilterExpression filterExpression = copy.get(topic);final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {final MessageQueueImpl mq = result.takeMessageQueue();final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,invisibleDuration, awaitDuration);return receiveMessage(request, mq, awaitDuration);}, MoreExecutors.directExecutor());return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),clientCallbackExecutor);}

SimpleConsumerImpl的receive0也是通过ConsumerImpl的receiveMessage(request, mq, awaitDuration)方法来拉取消息的,其request是通过wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration, awaitDuration)来构建的

ConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java

receiveMessage

    protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,MessageQueueImpl mq, Duration awaitDuration) {List<MessageViewImpl> messages = new ArrayList<>();try {final Endpoints endpoints = mq.getBroker().getEndpoints();final Duration tolerance = clientConfiguration.getRequestTimeout();final Duration timeout = awaitDuration.plus(tolerance);final ClientManager clientManager = this.getClientManager();final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> future =clientManager.receiveMessage(endpoints, request, timeout);return Futures.transformAsync(future, responses -> {Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR).setMessage("status was not set by server").build();Long transportDeliveryTimestamp = null;List<Message> messageList = new ArrayList<>();for (ReceiveMessageResponse response : responses) {switch (response.getContentCase()) {case STATUS:status = response.getStatus();break;case MESSAGE:messageList.add(response.getMessage());break;case DELIVERY_TIMESTAMP:final Timestamp deliveryTimestamp = response.getDeliveryTimestamp();transportDeliveryTimestamp = Timestamps.toMillis(deliveryTimestamp);break;default:log.warn("[Bug] Not recognized content for receive message response, mq={}, " +"clientId={}, response={}", mq, clientId, response);}}for (Message message : messageList) {final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);messages.add(view);}StatusChecker.check(status, future);final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);return Futures.immediateFuture(receiveMessageResult);}, MoreExecutors.directExecutor());} catch (Throwable t) {// Should never reach here.log.error("[Bug] Exception raised during message receiving, mq={}, clientId={}", mq, clientId, t);return Futures.immediateFailedFuture(t);}}

receiveMessage方法通过clientManager.receiveMessage(endpoints, request, timeout)来拉取消息,之后转换为ReceiveMessageResult

wrapReceiveMessageRequest(ProcessQueueImpl)

    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,FilterExpression filterExpression, Duration longPollingTimeout, String attemptId) {attemptId = null == attemptId ? UUID.randomUUID().toString() : attemptId;return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup()).setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression)).setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos())).setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();}

ProcessQueueImpl调用的wrapReceiveMessageRequest只传递了batchSize、mq、filterExpression, longPollingTimeout, attemptId这几个参数

wrapReceiveMessageRequest(SimpleConsumerImpl)

    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,FilterExpression filterExpression, Duration invisibleDuration, Duration longPollingTimeout) {final org.apache.rocketmq.shaded.com.google.protobuf.Duration duration = Durations.fromNanos(invisibleDuration.toNanos());return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup()).setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression)).setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos())).setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();}

SimpleConsumerImpl调用的wrapReceiveMessageRequest只传递了maxMessageNum(batchSize), mq, filterExpression, invisibleDuration, awaitDuration(longPollingTimeout)这几个参数
区别在于一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration

小结

rocketmq5的PushConsumer与SimpleConsumer拉取消息都是通过ConsumerImpl的receiveMessage方法来拉取的,区别在于构建的ReceiveMessageRequest参数不一样,一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration。


http://www.ppmy.cn/ops/92460.html

相关文章

第十二章 Spring MVC 框架扩展和SSM框架整合(2023版本IDEA)

学习目标 12.1 Spring MVC 框架处理JSON数据12.1.1 JSON数据的传递处理12.1.2 JSON数据传递过程中的中文乱码和日期问题12.1.3 多视图解析器 12.2 Spring MVC 框架中的数据格式转换12.2.1 Spring MVC 框架数据转换流程12.2.2 编写自定义转换器12.2.3 使用InitBinder装配自定义编…

vue项目打包问题

缓存导致打包后js文件404 修改vue.config.js打包输出文件名为动态&#xff0c;例如取当前时间戳。 在index.html文件添加meta标签设置不缓存。 更新完包&#xff0c;假如用户此刻正访问某一个页面时&#xff0c;访问的包还是原来的情况导致出现bug 解决VUE项目更新后需要客户手…

四种应用层协议——MQTT、CoAP、WebSockets和HTTP——在工业物联网监控系统中的性能比较

目录 摘要(Abstract) 实验设置 实验结果 节选自《A Comparative Analysis of Application Layer Protocols within an Industrial Internet of Things Monitoring System》&#xff0c;作者是 Jurgen Aquilina、Peter Albert Xuereb、Emmanuel Francalanza、Jasmine Mallia …

AIoT新技术融合基础设计课程开发与运营案例分析

本文来自下面的论文的第4部分&#xff1a;《Research on Basic Engineering Design Course Development and Application of New Technology AIoT (Artificial Intelligence of Things) Convergence Education》&#xff0c;作者是Yunja Hwang&#xff0c;来自韩国檀国大学工学…

Redis 分布式锁实现详解

Redis 分布式锁实现详解 在分布式系统中&#xff0c;我们需要解决的一个重要问题是多个服务实例之间如何协调共享资源的访问问题。例如&#xff0c;在电子商务系统中&#xff0c;库存更新需要被多个微服务实例所共享&#xff0c;但为了防止超卖&#xff0c;必须确保库存更新是…

一种JSON多态表示法

介绍 假设现在需要实现一种功能: 从某个远程的组件(消息队列或远程文件)拉取最后几条记录做一个展示. 需要支持如下的组件: Kafka RocketMQ OSS 假设还有很多, 这里不列了 … 显然, 每种组件需要的参数各不一样, 那么此时如何使用一个统一的结构来表达这些组件的参数呢?…

在Spring Boot中处理HTTP请求【后端 5】

在Spring Boot中处理HTTP请求 在Web开发中&#xff0c;处理HTTP请求和响应是构建任何应用程序的基础。Spring Boot以其简化的配置和快速开发的能力&#xff0c;成为构建RESTful Web服务的首选框架之一。本文将围绕Spring Boot中的请求处理&#xff0c;从基础到进阶&#xff0c;…

ue4.27 C++ 解析内容为json的字符串

json字符串为 R"({"x": -1870.0, "y": -11400.0})"&#xff0c;里面内容是个json对象。 const FString& Message R"({"x": -1870.0, "y": -11400.0})"; TSharedRef<TJsonReader<>> Reader TJs…

c++中std::endl 和“\n“ 这两个换行符有什么区别

std::endl 和 "\n" 都用于在C中生成换行符&#xff0c;但它们之间有一些重要的区别 std::endl&#xff1a; 功能&#xff1a;输出一个换行符&#xff0c;并刷新输出流&#xff08;即缓冲区&#xff09;。作用&#xff1a;确保所有数据立即输出到目的地&#xff0c;例…

海量数据处理商用短链接生成器平台 - 13

第三十九章 流量包模块订单支付消息业务逻辑开发实战 第1集 MQ消费者开发-订单支付状态更新模块 简介&#xff1a;MQ消费者开发-订单支付状态更新模块 编码实战 public void handleProductOrderMessage(EventMessage eventMessage) {String messageType eventMessage.getE…

React事件绑定的方式有哪些?区别?

React 中事件绑定的方式主要有以下几种&#xff1a; 直接在 JSX 中绑定事件&#xff1a; <button onClick{handleClick}>Click me</button> 这是最常见和推荐的方式。事件名&#xff08;如 onClick&#xff09;作为 JSX 的属性&#xff0c;值为一个函数&#xff0c…

【Linux】信号

进程的信号是操作系统中一种重要的软件中断机制&#xff0c;用于通知进程系统中发生了某种类型的事件。以下是对进程信号的详细解析&#xff1a; 一、信号的概念 进程信号是一种软件中断&#xff0c;它通知进程发生了某个事件&#xff0c;打断进程当前的操作&#xff0c;去处…

uni-app内置组件(基本内容,表单组件)()二

文章目录 一、 基础内容1.icon 图标2.text3.rich-text4.progress 二、表单组件1.button2.checkbox-group和checkbox3.editor 组件4.form5.input6.label7.picker8.picker-view 和 picker-view-column9.radio-group 和 radio10.slider11.switch12.textarea 一、 基础内容 1.icon…

【计算机网络】详解CA认证,预防中间者攻击!

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

【产业前沿】树莓集团如何以数字媒体产业园为引擎,加速产业升级?

在数字化转型的浪潮中&#xff0c;树莓集团以敏锐的洞察力和前瞻性的战略眼光&#xff0c;将数字媒体产业园打造成为产业升级的强劲引擎。这一创新举措不仅为传统行业插上了数字的翅膀&#xff0c;更为整个产业链注入了新的活力与可能。 树莓集团深知&#xff0c;数字媒体产业园…

合并图片为pdf

1.先使用IDM在网页下载&#xff1a; 2.按文件类型分组&#xff0c;在按名称大小排序&#xff0c;之后使用Acrobat合并文件成一个pdf即可

msys编译redis window版本及下载渠道

一、msys 编译工具 &#xff08;一&#xff09;、下载 ​ https://repo.msys2.org/distrib/ ​ 选择版本为tar.xz结尾的 ​ msys2-x86_64-latest.tar.xz 27-Jul-2024 13:17 47M (二)、镜像设置 ​ 解压msys2-x86_64-latest.tar.xz &#xff0c;打开mingw64.exe sed -i &…

ASP 表单处理入门指南

ASP 表单处理入门指南 简介 ASP&#xff08;Active Server Pages&#xff09;是一种由微软开发的服务器端脚本环境&#xff0c;用于动态生成交互性网页。它允许开发者结合HTML、VBScript或JScript脚本语言来创建和运行动态网页或Web应用程序。本文将重点介绍如何使用ASP来处理…

数据结构——栈的讲解(超详细)

前言&#xff1a; 小编已经在前面讲完了链表和顺序表的内容&#xff0c;下面我们继续乘胜追击&#xff0c;开始另一个数据结构&#xff1a;栈的详解&#xff0c;下面跟上小编的脚步&#xff0c;开启今天的学习之路&#xff01; 目录 1.栈的概念和结构 1.1.栈的概念 1.2.栈的结构…

Linux知识复习第2期

RHCE 远程登录服务-CSDN博客 Linux 用户和组管理_linux用户和组的管理-CSDN博客 Linux 文件权限详解-CSDN博客 目录 1、sshd 免密登录 (1)纯净实验环境 (2)生成密钥 (3)上锁 2、用户管理 (1)添加新用户 (2)删除用户 (3)修改用户信息 (4)为用户账号设…