10.事务消息

news/2024/11/8 18:50:30/

4.6 事务消息

4.6.1 流程分析

image.png

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1)事务消息发送及提交

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2)事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3)事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • LocalTransactionState.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • LocalTransactionState.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • LocalTransactionState.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

4.6.1 发送事务消息

1) 创建事务消息生产者:TransactionMQProducer

使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。

java public class Producer {    public static void main(String[] args) throws MQClientException, InterruptedException {        //创建事务监听器        TransactionListener transactionListener = new TransactionListenerImpl();        //创建消息生产者        TransactionMQProducer producer = new TransactionMQProducer("group6");        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");        //生产者设置监听器        producer.setTransactionListener(transactionListener);        //启动消息生产者        producer.start();        String[] tags = new String[]{"TagA", "TagB", "TagC"};        for (int i = 0; i < 3; i++) {            try {                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                SendResult sendResult = producer.sendMessageInTransaction(msg, null);                System.out.printf("%s%n", sendResult);                TimeUnit.SECONDS.sleep(1);           } catch (MQClientException | UnsupportedEncodingException e) {                e.printStackTrace();           }       }        //producer.shutdown();   } }

2)实现事务的监听接口:TransactionListener

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

java public class TransactionListenerImpl implements TransactionListener {    /**      * 在该方法中执行本地事务      */    @Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        System.out.println("执行本地事务");        if (StringUtils.equals("TagA", msg.getTags())) {            return LocalTransactionState.COMMIT_MESSAGE;       } else if (StringUtils.equals("TagB", msg.getTags())) {            return LocalTransactionState.ROLLBACK_MESSAGE;       } else {            return LocalTransactionState.UNKNOW;       } ​   }    /**      * 执行事务回查      */    @Override    public LocalTransactionState checkLocalTransaction(MessageExt msg) {        System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");        return LocalTransactionState.COMMIT_MESSAGE;   } }

4.6.2 使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECKIMMUNITYTIMEINSECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。需要做幂等。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

4.6.3 注意事项

image.png


http://www.ppmy.cn/news/543181.html

相关文章

uni-app微信小程序获取手机号授权登录(复制即用,js完成敏感数据对称解密,无需走服务端处理)

目录 一、示例 二、具体实现说明 一、示例 获取到的手机号 二、具体实现说明 属性说明 属性名说明生效时机getphonenumber获取用户手机号回调open-type"getPhoneNumber" 按钮写法 <template><view class"login"><view class"content…

第三章 处理机调度与死锁

目录 一、调度的概念、层次 2.1 调度的基本概念 2.2 调度的三个层次 2.2.1 高级调度 2.2.2 低级调度 2.2.3 中级调度 2.2.3.1 进程的挂起态 2.2.4 三层调度的联系、对比 二、进程调度的时机、切换与过程、方式 2.1 进程调度的时机 2.2 进程调度的方式 2.2.1 非抢占…

Neutrino 北京活动| 1月14日以太坊君士但丁堡升级专场分享会

千呼万唤始出来&#xff0c;以太坊的重大升级&#xff0c;君士坦丁堡硬分叉将于2019年1月16日开启。这次以太坊的升级&#xff0c;将是决定以太坊未来走向的关键一步。在行业寒冬期&#xff0c;此次硬分叉之后&#xff0c;究竟以太坊将走向何处&#xff0c;我们拭目以待。 为此…

access 战地1不加入ea_《战地5》即日起加入EA Access免费阵容

今天(6月7日)EA官方宣布《战地5 Battlefield 5》于即日起正式加入EA Access免费阵容&#xff0c;EA Access用户们可以从现在开始下载体验本作。稍后于8月2日发售的《麦登橄榄球20》也将向EA Access用户开放优先体验。 截至目前EA Access的免费Xbox One游戏包含《战地5》、《NBA…

psp模拟器完美字库_PS3模拟器《但丁地狱》试玩演示!中配电脑即可60帧

关注PS3模拟器的同学们&#xff0c;好消息来啦&#xff01;PS3模拟器“RPCS3”的开发团队昨日确认&#xff0c;目前PS3游戏《但丁地狱》目前已经可以用“RPCS3”完整通关了&#xff01;以下是一段“RPCS3”模拟器的《但丁地狱》的试玩演示&#xff0c;一起来看看吧&#xff01;…

一份给艺术爱好者的书单

此书单包括三份&#xff1a; 一、中国美院彭哲老师给入学新生的推荐书目 二、浙江大学人文学院读书节时杨振宇老师的推荐的100本书目 三、整理后精华版书单 一、中国美院彭哲老师给入学新生的推荐书目 本书单以类别分别分为&#xff1a; A专业技法用书B艺术史C艺术理论D文化形…

用Jsoup抓取长颈鹿但丁图片

(官网似乎已改版&#xff0c;此代码没用了) 1、pom文件配置或者添加jsoup1.6.3jar包&#xff1a; <dependencies><dependency><groupId>org.jsoup</groupId><artifactId>jsoup</artifactId><version>1.6.3</version></depe…

Java之利用FreeMarker引擎实现枚举和脚本自动生成

开心一笑 【天气热了 翻箱倒柜找了半天短袖 结果找出来一看 全是些名牌短袖 感觉穿出去太高调了 比如什么中国电信啊 天翼4G啊 太太乐鸡精啊 莲花味精啊 海天酱油啊。。最珍贵的一件 要属那件史丹利复合肥 跟刘能同款 哎头大了纠结该穿哪个好呢&#xff1f;穿出去不会被人说我…