分布式事物在RocketMQ中的应用

embedded/2025/3/3 17:19:45/

RocketMQ 4.3 版本之后提供了对分布式事务消息的支持,它采用了一种类似于两阶段提交(2PC)的机制,但又有所不同,可以实现最终一致性的分布式事务。RocketMQ 的事务消息主要用于解决生产者发送消息和本地事务的原子性问题。

应用场景:

典型的场景是电商中的下单流程:

  1. 用户下单,订单服务需要创建订单。
  2. 同时,需要发送一个消息到库存服务,通知其扣减库存。

如果订单创建成功,但消息发送失败,或者消息发送成功,但订单创建失败,都会导致数据不一致。RocketMQ 的事务消息可以解决这个问题。

RocketMQ 事务消息原理:

  1. 发送半消息(Half Message):

    • 生产者首先发送一个半消息(Half Message)到 RocketMQ Broker。
    • 半消息对消费者不可见,它会被存储在一个特殊的内部 topic(RMQ_SYS_TRANS_HALF_TOPIC)中。
    • Broker 会响应生产者,确认半消息已接收。
  2. 执行本地事务:

    • 生产者在收到 Broker 对半消息的确认后,执行本地事务(例如,创建订单)。
    • 本地事务的执行结果可能是提交、回滚或未知。
  3. 提交/回滚半消息:

    • 根据本地事务的执行结果,生产者向 Broker 发送提交(Commit)或回滚(Rollback)请求。
      • 提交: Broker 将半消息标记为可投递,并将其从 RMQ_SYS_TRANS_HALF_TOPIC 转移到真正的目标 topic。消费者现在可以看到并消费这条消息。
      • 回滚: Broker 删除半消息。
      • 未知: 生产者不发送任何请求,Broker 会定期回查生产者(见下文)。
  4. 事务回查 (Transaction Check):

    • 如果 Broker 长时间没有收到生产者对半消息的提交或回滚请求(可能是生产者宕机、网络问题等原因),Broker 会主动向生产者发送事务回查请求。
    • 生产者收到回查请求后,需要检查本地事务的状态(通常通过查询数据库),并再次向 Broker 发送提交或回滚请求。
    • RocketMQ默认回查15次, 可以通过transactionCheckMax参数配置.
  5. 消费者消费消息:

    • 消费者从目标 topic 消费消息,执行相应的业务逻辑(例如,扣减库存)。
    • 消费者的消费逻辑需要保证幂等性,因为在某些情况下(例如,消费者处理消息时宕机),消息可能会被重复消费。

关键概念:

  • 半消息 (Half Message): 一种特殊的消息,对消费者不可见,用于在生产者和 Broker 之间进行事务协调。
  • 事务回查 (Transaction Check): Broker 主动向生产者查询半消息状态的机制,用于处理生产者宕机或网络问题导致的消息状态不一致。
  • 事务消息生产者 (TransactionMQProducer): RocketMQ 提供的特殊生产者,用于发送事务消息。需要配置 TransactionListener
  • TransactionListener: 生产者端的监听器接口, 包含两个方法:
    • executeLocalTransaction: 执行本地事务,并返回本地事务的状态(LocalTransactionState.COMMIT_MESSAGELocalTransactionState.ROLLBACK_MESSAGELocalTransactionState.UNKNOW)。
    • checkLocalTransaction: 用于事务回查,检查本地事务的状态,并返回本地事务的状态。

代码示例 (Java):

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.*;public class TransactionProducer {public static void main(String[] args) throws Exception {// 1. 创建事务消息生产者TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");producer.setNamesrvAddr("localhost:9876"); // 设置 NameServer 地址// 2. 设置事务监听器ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(new TransactionListener() {// 执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println("Executing local transaction...");try {// 执行本地事务 (例如,创建订单)// ...boolean success = true; // 假设本地事务执行成功if (success) {System.out.println("Local transaction succeeded. Commit message.");return LocalTransactionState.COMMIT_MESSAGE; // 提交消息} else {System.out.println("Local transaction failed. Rollback message.");return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息}} catch (Exception e) {System.out.println("Local transaction exception. Unknown state.");return LocalTransactionState.UNKNOW; // 未知状态 (Broker 会回查)}}// 事务回查@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("Checking local transaction state...");// 检查本地事务的状态 (例如,查询订单数据库)// ...boolean exists = true; // 假设订单已创建if (exists) {System.out.println("Local transaction exists. Commit message.");return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.println("Local transaction does not exist. Rollback message.");return LocalTransactionState.ROLLBACK_MESSAGE;}}});// 3. 启动生产者producer.start();// 4. 发送事务消息String[] tags = new String[]{"TagA", "TagB", "TagC"};for (int i = 0; i < 3; i++) {Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送事务消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);}// 5. 关闭生产者 (实际应用中,生产者通常不需要立即关闭)// producer.shutdown();}
}

总结:

RocketMQ 的事务消息通过半消息、本地事务执行、提交/回滚/回查机制,以及消费者端的幂等性保证,实现了最终一致性的分布式事务。它适用于需要保证消息发送和本地事务原子性的场景,例如电商中的下单、支付等。 相比于传统的 2PC,RocketMQ 的事务消息性能更高,对业务代码的侵入性更小。

需要注意的点:

  • RocketMQ 的事务消息只保证最终一致性,不保证强一致性(即不保证隔离性)。
  • 消费者的消费逻辑需要保证幂等性。
  • 事务回查可能会导致消息重复发送,因此消费者的业务逻辑需要能够处理重复消息。
  • 事务消息不支持延时消息和批量消息.
  • 为了保证消息不丢失, Half 消息队列和 OP 消息队列也需要配置成主从同步复制模式.

http://www.ppmy.cn/embedded/169666.html

相关文章

2020年蓝桥杯Java B组第二场题目+部分个人解析

#A&#xff1a;门牌制作 624 解一&#xff1a; public static void main(String[] args) {int count0;for(int i1;i<2020;i) {int ni;while(n>0) {if(n%102) {count;}n/10;}}System.out.println(count);} 解二&#xff1a; public static void main(String[] args) {…

在nodejs中使用ElasticSearch(三)通过ES语义检索,实现RAG

RAG&#xff08;Retrieval-Augmented Generation&#xff09;是一种结合了信息检索和生成模型的技术&#xff0c;旨在提高生成模型的知识获取和生成能力。它通过在生成的过程中引入外部知识库或文档&#xff08;如数据库、搜索引擎或文档存储&#xff09;&#xff0c;帮助生成更…

android智能指针android::sp使用介绍

android::sp 是 Android 中的智能指针&#xff08;Smart Pointer&#xff09;的实现&#xff0c;用于管理对象的生命周期&#xff0c;避免手动管理内存泄漏等问题。它是 Android libutils 库中重要的一部分&#xff0c;常用于管理继承自 android::RefBase 的对象。 与标准库中…

北京大学DeepSeek提示词工程与落地场景(PDF无套路免费下载)

近年来&#xff0c;大模型技术飞速发展&#xff0c;但许多用户发现&#xff1a;即使使用同一款 AI 工具&#xff0c;效果也可能天差地别——有人能用 AI 快速生成精准方案&#xff0c;有人却只能得到笼统回答。这背后的关键差异&#xff0c;在于提示词工程的应用能力。 北京大…

用Python3脚本实现Excel数据到TXT文件的智能转换:自动化办公新姿势

文章目录 用Python3实现Excel数据到TXT文件的智能转换&#xff1a;自动化办公新姿势场景应用&#xff1a;为什么需要这种转换&#xff1f;技术解析&#xff1a;代码实现详解核心代码展示改进点说明 实战演练&#xff1a;从Excel到TXT的完整流程准备数据示例&#xff08;data.xl…

SkyWalking

一、APM系统 APM&#xff08;Application Performance Monitoring&#xff09;即应用程序性能监控系统&#xff0c;是对企业系统即时监控以实现对应用程序性能管理和故障管理的系统化的解决方案。应用性能管理&#xff0c;主要指对企业的关键业务应用进行监测、优化&#xff0…

C#委托(delegate)的常用方式

C# 中委托的常用方式&#xff0c;包括委托的定义、实例化、不同的赋值方式以及匿名委托的使用。 委托的定义 // 委托的核心是跟委托的函数结构一样 public delegate string SayHello(string c);public delegate string SayHello(string c);&#xff1a;定义了一个公共委托类型 …

【后端开发面试题】每日 3 题(三)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;https://blog.csdn.net/newin2020/category_12903849.html &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享后端开发面试中常见的面试题给大家~ ❤️如果有收获的话&#x…