消息中间件之RocketMQ源码分析(二十五)

news/2025/3/16 6:25:35/

Broker存储事务消息

在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。3个核心的初始化变量
在这里插入图片描述

1.TransactionalMessageService.

事务消息主要用于处理服务,默认实现类是TransactionalMessageServiceImpl.如果想自定义事务消息
处理实现类,需要实现TransactionMessageService接口,然后通过ServiceProvider.loadClass()方法进行加载。
TransactionalMessageService接口的基本操作定义如下
在这里插入图片描述

  • prepareMessage():用于保存Half事务消息,用户可以对其进行Commit或Rollback
  • deletePrepareMessage():用于删除事务消息,一般用于Broker回查失败的Half消息。
  • commitMessage():用于提交事务消息,使消费者可以正常地消费事务消息
  • rollbackMessage():用于回滚事务消息,回滚后消费者将不能够消费该消息。通常用于生产者主动进行Rollback时,以及Broker回查的生产者本地事务失败时
  • open():用于打开事务服务
  • close():用于关闭事务服务

2.transactionMessageCheckListener.

事务消息回查监听器,默认实现类是DefaultTransactionalMessageCheckListener.如果想自定义回查监听处理,需要继承AbstractTransactionalMessageCheckListener接口,然后通过ServiceProvider.loadClass()方法被加载

在这里插入图片描述
在这里插入图片描述

3.transactionalMessageCheckService.

事务消息回查服务是一个线程服务,定时调用transactionalMessageService.check()方法,检查超时的Half消息
是否需要回查
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

额外两个单独处理

上面三个事务处理类完成初始化后,Broker就可以处理事务消息了。
Broker存储事务消息和普通消息都是通过SendMessageProcessor类进行处理的,只是在存储消息时有两处事务消息需要单独处理。

第一个单独处理,sendMessage()

这里获取消息中的扩展字段MessageConst.PROPERTY_TRANSACTION_PREPARED的值,
如果该值为True则返回当前消息是事务消息;再判断当前Broker的配置是否支持事务消息,如果不支持就返回生产者不支持事务消息的信息;如果支持,则调用TransactionalMessageService#prepareMessage()方法保存Half消息
在这里插入图片描述

第二个单独处理:存储前事务消息预处理,处理方法是TransactionalMessageBridge.praseHalfMessageInner()

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
该方法的功能是将原消息的Topic、queueId、susFlg存储在消息的扩展字段中,并且修改Topic的值为RMQ_SYS_TRANS_HALF_TOPIC,修改queueId的值为0,然后,与其他消息一样,调用DefaultMessageStore.putMessage()方法保存到CommitLog中,CommitLog存储成功后,通过CommitLog.DefaultAppendMessageCallback.doAppend()方法单独对事务消息进行处理
在这里插入图片描述
Prepared消息其实就是Half消息,其实现逻辑是,设置当前Half消息的
queueOffset值为0,而不是其真实的位点值。这样该位点就不会建立ConsumeQueue索引,自然也不能被消费者消费


http://www.ppmy.cn/news/1368038.html

相关文章

mysql timestamp转换为datetime

MySQL timestamp转换为datetime的方法 1. 流程概述 在MySQL中,timestamp和datetime是两种不同的数据类型。timestamp存储了日期和时间,并且会自动更新,可以用于记录数据的创建和修改时间。datetime则是一个固定的日期和时间,不会自…

LeetCode215.数组中的第K个最大元素

题目 给定整数数组 nums 和整数 k,请返回数组中第 k 个最大的元素。 请注意,你需要找的是数组排序后的第 k 个最大的元素,而不是第 k 个不同的元素。 你必须设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 输入: [3,2,1,5,6,4], …

[机缘参悟-158] :西游记中的“佛” 、“道”之争

目录 前言 一、西游记中的佛教元素 1.1 佛教元素 1.2 西游记佛教思想 1.3 佛教的三界五行:物质世界 1.4 佛教中不在三界内,不在五行中:精神世界 二、西游记中的道教元素 2.1 主要元素 2.2 道家思想 三、“佛”如何兼容“道” 3.1 …

程序员视角的大语言模型,如何使用大语言模型

从程序员的视角来看,使用大语言模型(LLMs)主要涉及以下几个步骤: 选择合适的模型: 首先,需要确定哪个大语言模型最适合你的需求。不同的模型可能在不同的任务上有不同的表现,比如代码生成、代码…

Vue中<style scoped lang=“scss“>的含义

这段代码中的<style scoped lang"scss">是HTML和Vue框架结合使用时常见的一个模式&#xff0c;具体含义如下&#xff1a; scoped&#xff1a;这是一个Vue.js特有的属性&#xff0c;用来指定样式只应用于当前组件的元素。没有这个属性时&#xff0c;样式会全局应…

抽象类、模板方法模式

抽象类概述 在Java中abstract是抽象的意思&#xff0c;如果一个类中的某个方法的具体实现不能确定&#xff0c;就可以申明成abstract修饰的抽象方法&#xff08;不能写方法体了&#xff09;&#xff0c;这个类必须用abstract修饰&#xff0c;被称为抽象类。 抽象方法定义&…

剑指offer面试题20 顺时针打印矩阵

考察点 二维数组的遍历知识点 题目 分析 本题目要求从外向里顺时针打印每一个数字&#xff0c;这个题目也是二维数组的遍历&#xff0c;只要涉及到遍历就需要知道循环终止的条件是什么&#xff0c;以及每次怎么迭代。从外向里一圈一圈打印&#xff0c;所以通过审题也可以想到…

ssm172旅行社管理系统的设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1 研究…