Springboot整合RocketMQ分布式事务

embedded/2024/10/31 0:18:02/

RocketMQ分布式事务

  • rocketMQ5.0官方文档
  • 案例源码地址
  • 数据库初始化
    • 创建user_order和user_points
  • POM依赖
  • 配置文件
  • 事务消息处理流程
  • RocketMQLocalTransactionListener源码
  • 整体业务逻辑如下
  • 代码如下
    • Producer 发送事务消息
    • MQ Server回应消息发送成功
  • 消息投递
  • 事务回查
    • MQ Server回应消息发送成功和消息回查都是通过实现RocketMQLocalTransactionListener接口进行实现完整代码就是OrderListener
  • 消费者

rocketMQ5.0官方文档

https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/

案例源码地址

源码地址 https://gitee.com/Lin-seven/rocket-shiwu

数据库初始化

创建user_order和user_points

CREATE TABLE `user_order` (`order_id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',`order_code` varchar(50) DEFAULT NULL COMMENT '用户ID',`order_points` bigint DEFAULT NULL COMMENT '订单积分',PRIMARY KEY (`order_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb3;CREATE TABLE `user_points` (`user_id` bigint NOT NULL AUTO_INCREMENT COMMENT 'user_id',`order_id` bigint DEFAULT NULL COMMENT '订单id',`points` bigint DEFAULT NULL COMMENT '积分',PRIMARY KEY (`user_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

POM依赖

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!--mybatis和springboot整合--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.7</version></dependency><!--Mysql数据库驱动8 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency></dependencies>

配置文件


spring:application:name: rocketmq_demodatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/rocketmq_demo?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=trueusername: rootpassword: 123456
# ========================mybatis===================mybatis-plus:configuration:# 指定 MyBatis 使用 SLF4J 日志实现log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
rocketmq:# NameServername-server: 127.0.0.1:9876producer:# 发送消息超时时间,默认3000send-message-timeout: 30000# 生产者组group: groupTest# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2

事务消息处理流程

在这里插入图片描述

Producer 即MQ发送方,本例中是用户服务,负责新增订单。MQ订阅方即消息消费方,本例中是积分服务,负责
新增积分。
1、Producer 发送事务消息

Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注
意此时这条消息消费者(MQ订阅方)是无法消费到的。 本例中,Producer 发送 ”增加积分消息“ 到MQ Server。

2、MQ Server回应消息发送成功

MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

3、Producer 执行本地事务

Producer 端执行业务代码逻辑,通过本地数据库事务控制。 本例中,Producer 执行添加用户操作。

4、消息投递

若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积
分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息; 若Producer
本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删 除”增加积分消息“
。 MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即
程序执行正常则自动回应ack。

5、事务回查

如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer
来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此 只需关注本地事务的执行状态即可。

RocketMQLocalTransactionListener源码

public interface RocketMQLocalTransactionListener {
/**
‐ 发送prepare消息成功此方法被回调,该方法用于执行本地事务
‐ @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
‐ @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
‐ @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}

整体业务逻辑如下

订单创建成功,用户增加积分

代码如下

Producer 发送事务消息

@RestController
@RequestMapping("/order")
public class OrderController2 {@ResourceRocketMQTemplate rocketMqTemplate;@PostMapping("/orderAndPoints")public String orderAndPoints(@RequestBody  UserOrder order){TransactionSendResult transactionSendResult = rocketMqTemplate.sendMessageInTransaction("transaction-topic",MessageBuilder.withPayload(order).build(),null);//消息发送状态
//        SendStatus sendStatus = transactionSendResult.getSendStatus();
//        //本地事务状态
//        String localState = transactionSendResult.getLocalTransactionState().name();
//
//        System.out.println("发送状态:{},本地事务执行状态:{}"+ sendStatus+ "    " + localState);return order.toString();}
}

MQ Server回应消息发送成功

通过实现RocketMQLocalTransactionListener中的executeLocalTransaction 方法 如果在 controllerorderAndPoints rocketMQ的事务消息发送成功,会自动监听executeLocalTransaction方法
在executeLocalTransaction 方法中进行本地事务的执行,例如创建订单等。如果不发生异常自然是进行返回COMMIT
事务状态


@Component
@RocketMQTransactionListener
public class OrderListener implements RocketMQLocalTransactionListener {@ResourceOrderMapper orderMapper;/*** 消息发送成功执行 此方法本地事务* 事务消息发送成功本方法被回调* @param message* @param o* @return*/@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println("事务消息发送成功,开始指定本地事务");UserOrder payload = (UserOrder) message.getPayload();orderMapper.insert(payload);return RocketMQLocalTransactionState.COMMIT;}/*** 回查事务消息* @param message* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {UserOrder payload = (UserOrder) message.getPayload();UserOrder userOrder = orderMapper.selectById(payload.getOrderId());if (userOrder == null){return RocketMQLocalTransactionState.ROLLBACK;}System.out.println("检查本地事务消息"+ payload.toString());return RocketMQLocalTransactionState.COMMIT;}
}

消息投递

如通过executeLocalTransaction 返回的是COMMIT 事务状态,则消息“ 状态标记为可消费。

事务回查

通过RocketMQLocalTransactionListener中的checkLocalTransaction 进行事务回查,比如订单是否被添加到数据库,或者本地消息表是否有记录等,回查状态进行返回事务状态

MQ Server回应消息发送成功和消息回查都是通过实现RocketMQLocalTransactionListener接口进行实现完整代码就是OrderListener

消费者

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-topic")
public class TransactionMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("事务消息消费成功,消息内容:{}", message);}
}

http://www.ppmy.cn/embedded/133749.html

相关文章

分类与有序回归

分类问题 分类问题&#xff0c;例如分类猫、狗、猪时&#xff0c;使用数字进行表示为1&#xff0c;2&#xff0c;3。而1、2、3之间有大小&#xff0c;分类算法为了平衡标签之间的差异&#xff0c;使得损失公平&#xff0c;会使用one-hot编码。例如&#xff0c;分别使用&#x…

git 修改当前分支的上游分支

在Git中&#xff0c;如果你想要修改当前分支的上游分支&#xff08;即你想要改变当前分支跟踪的远程分支&#xff09;&#xff0c;你可以使用git branch命令的--set-upstream-to选项或者git push命令的-u或--set-upstream选项。 例如&#xff0c;如果你当前在feature-branch分…

Linux相关概念和易错知识点(18)(重定向、语言级缓冲区)

目录 1.重定向 &#xff08;1&#xff09;什么是重定向&#xff1f; &#xff08;2&#xff09;dup2 ①重定向原理 ②重定向方法 &#xff08;3&#xff09;重定向和程序替换的易混点 2.语言级缓冲区 &#xff08;1&#xff09;为什么需要语言级缓冲区 &#xff08;2&am…

uni-app @click.stop @click.stop.native均不生效

原因就是用了nvue导致的 vue等其他环境都可以 解决&#xff1a;e.stopPropagation() click"goExecute($event)" goExecute(e) {e.stopPropagation()}, uniApp官方真的是一坨大翔&#xff0c;不仅社区不维护&#xff0c;文档也写的跟粑粑一样&#xff0c;自创的nv…

Vue语法汇总

一、this.$refs用法 在 Vue.js 中&#xff0c;this.$refs 是一个对象&#xff0c;它用于直接访问 DOM 元素或子组件的实例。当你给元素或组件添加 ref 属性时&#xff0c;Vue 会自动将这些元素或组件的引用添加到 $refs 对象中。 this.$refs[‘DetailForm’] 这种写法的意思是…

2024年优秀的天气预测API

准确、可操作的天气预报对于许多组织的成功至关重要。 事实上&#xff0c;在整个行业中&#xff0c;天气条件会直接影响日常运营&#xff0c;包括航运、按需、能源和供应链&#xff08;仅举几例&#xff09;。 以公用事业为例。根据麦肯锡的数据&#xff0c;在 1.4 年的时间里…

开源FluentFTP实操,操控FTP文件

概述&#xff1a;通过FluentFTP库&#xff0c;轻松在.NET中实现FTP功能。支持判断、创建、删除文件夹&#xff0c;判断文件是否存在&#xff0c;实现上传、下载和删除文件。简便而强大的FTP操作&#xff0c;提升文件传输效率。 在.NET中&#xff0c;使用FluentFTP库可以方便地…

项目话术【【【【【【【【【【

项目背景&#xff1a;本项目是为应健康管理检测机构要求而进行开发&#xff1b; 项目介绍&#xff1a;此项目开发环境前端使用Nodejs&#xff0c;后端使用Maven&#xff0c;TomCat&#xff0c;Git 项目使用ssm框架和springboot&#xff0c;springsecurity以及MyBatis相结合&a…