Springboot RabbitMq 集成分布式事务问题

server/2024/11/17 2:18:24/

话不多说,直接上代码

先整体结构

在这里插入图片描述
pom依赖:

<parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.18</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!--        <version>2.7.18</version>--></dependency><!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version><scope>runtime</scope></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--jdbc--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

代码:
DispatchService:


package com.zy.servce;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.beans.Transient;
import java.io.IOException;/*** @Author: zy* @Date: 2024-11-08-15:35* @Description: 运单系统** 消费者收到消息进行处理,处理成功则发送ACK消息通知MQ清除该条记录,* 否则通知MQ重发或者等待MQ自动重发。** 本地维护一个处理次数,如果多次处理仍然失败,则将该消息丢弃或者加入到死信队列(DLQ)中。死信队列中的数据可以人工干预。*/
@Slf4j
@Service
public class DispatchService {@Autowiredprivate JdbcTemplate jdbcTemplate;@RabbitListener(queues = "orderQueue")public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)throws IOException {try {//MQ里面的数据转换成JSON数据JSONObject orderInfo = JSONObject.parseObject(message);log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());Thread.sleep(1000L);//执行业务操作,同一个数据不能处理两次,根据业务情况去重,保证幂等性String orderid = orderInfo.getString("order_id");//分配快递员配送dispatch(orderid);//ack 通知MQ数据已经收到channel.basicAck(tag, false);} catch (Exception e) {//异常情况,需要根据需求去重发或者丢弃//重发一定次数后丢弃,日志告警(rabbitmq没有设置重发次数功能,重发时需要代码实现,比如使用redis记录重发次数,)channel.basicNack(tag, false, false);//系统关键数据异常,需要人工干预}//如果不给确认回复,就等这个consumer断开连接后,MQ会继续推送}/*** 分配快递员** @param orderId 订单编号*/@Transientprivate void dispatch(String orderId) throws Exception {String sql = "insert into tb_dispatch (order_id,courier,status) values (?,?,?)";int count = jdbcTemplate.update(sql, orderId, "东哥", "配送中");if (count != 1) {throw new Exception("调度数据插入失败,原因[数据库操作]");}}
}

OrderService:

package com.zy.servce;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.beans.Transient;/*** @Author: zy* @Date: 2024-11-08-15:21* @Description: 订单中心*/
@Slf4j
@Service
public class OrderService {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void setup() {//消息发送完成后,则回调此方法,ack代表此方法是否发送成功rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//ack为true,代表MQ已经准确收到消息if (!ack) {return;}try {String sql = "update tb_msgstatus set status = 1 where msg_id = ?";int count = jdbcTemplate.update(sql, correlationData.getId());if (count != 1) {log.warn("本地消息表状态修改失败");}} catch (Exception e) {log.warn("本息消息表状态修改异常", e);}}});}/*** 创建订单信息** @param order 订单信息* @throws Exception*/public void createOrder(JSONObject order) throws Exception {//保存订单信息saveOrder(order);//发送MQ消息,直接发送时不可靠,可能会失败(发送后根据回执修改状态表,定时任务扫表读取失败数据重新发送)sendMsg(order);}/*** 发送订单信息至MQ** @param order 订单信息*/private void sendMsg(JSONObject order) {//发送消息到MQ,CorrelationData作用:当收到消息回执时会带上这个参数rabbitTemplate.convertAndSend("orderExchange", "", order.toJSONString(),new CorrelationData((String) order.get("order_id")));}/*** 保存订单信息** @param order 订单信息* @throws Exception*/@Transientprivate void saveOrder(JSONObject order) throws Exception {String sql = "insert into tb_order (order_id,user_id,goods_id,order_time) values (? , ? , ? , now())";//保存订单信息int count = jdbcTemplate.update(sql, order.get("order_id"), order.get("user_id"), order.get("goods_id"));if (count != 1) {throw new Exception("订单创建失败");}//保存消息发送状态saveLocalMsg(order);}/*** 记录消息发送状态** @param order 订单信息* @throws Exception*/private void saveLocalMsg(JSONObject order) throws Exception {String sql = "insert into tb_msgstatus (msg_id,msg,status,send_time) values (? , ? , 0 , now())";//记录消息发送状态int count = jdbcTemplate.update(sql, order.get("order_id"), order.toJSONString());if (count != 1) {throw new Exception("记录消息发送状态失败");}}
}

RabbitMqDistributedTractionApp:

package com.zy;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author: zy* @Date: 2024-11-08-14:58* @Description: RabbitMQ实现分布式事务,保证数据一致性*/
@SpringBootApplication
public class RabbitMqDistributedTractionApp {public static void main(String[] args) {SpringApplication.run(RabbitMqDistributedTractionApp.class, args);System.out.println("启动成功。。。。");}}

yml文件:

server:port: 8080spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/rabbitmq?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=trueusername: 用户名password: MYSQL密码rabbitmq:host: MQ IPport: 5672username: MQ 用户名password: MQ 密码virtual-host: vHost1#必须配置这个,生产者才会确认回调publisher-confirm-type: correlatedpublisher-returns: true#重要,手动开启消费者ACK,控制消息在MQ中的删除、重发listener:simple:acknowledge-mode: MANUAL

Test类:


package com.zy;import com.alibaba.fastjson.JSONObject;
import com.zy.servce.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;/*** @Author: zy* @Date: 2024-11-08-15:22* @Description:*/
@SpringBootTest
public class Test {@Autowiredprivate OrderService orderService;@org.junit.jupiter.api.Testpublic void orderServiceTest() throws Exception {//生成订单信息JSONObject orderinfo = new JSONObject();orderinfo.put("order_id", UUID.randomUUID().toString());orderinfo.put("user_id",UUID.randomUUID().toString());orderinfo.put("goods_id",UUID.randomUUID().toString());System.out.println("数据:"+orderinfo);orderService.createOrder(orderinfo);}
}

下面是SQL文件:


-- ----------------------------
-- Table structure for tb_dispatch
-- ----------------------------
DROP TABLE IF EXISTS `tb_dispatch`;
CREATE TABLE `tb_dispatch` (`order_id` varchar(200) NOT NULL COMMENT '订单ID',`courier` varchar(200) DEFAULT NULL COMMENT '快递员姓名',`status` varchar(200) DEFAULT NULL COMMENT '配送状态',PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='配送表';-- ----------------------------
-- Table structure for tb_msgstatus
-- ----------------------------
DROP TABLE IF EXISTS `tb_msgstatus`;
CREATE TABLE `tb_msgstatus` (`msg_id` varchar(200) NOT NULL COMMENT '消息ID',`msg` text COMMENT '消息内容',`status` varchar(200) DEFAULT NULL COMMENT '消息状态,发送-0,发送成功-1,发送失败-2',`send_time` datetime DEFAULT NULL COMMENT '发送时间',PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- ----------------------------
-- Table structure for tb_order
-- ----------------------------
DROP TABLE IF EXISTS `tb_order`;
CREATE TABLE `tb_order` (`order_id` varchar(200) NOT NULL COMMENT '订单ID',`user_id` varchar(200) DEFAULT NULL COMMENT '用户ID',`goods_id` varchar(200) DEFAULT NULL COMMENT '商品ID',`order_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;SET FOREIGN_KEY_CHECKS = 1;```

http://www.ppmy.cn/server/142528.html

相关文章

【Python爬虫实战】轻量级爬虫利器:DrissionPage之SessionPage与WebPage模块详解

&#x1f308;个人主页&#xff1a;易辰君-CSDN博客 &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html ​ 目录 前言 一、SessionPage &#xff08;一&#xff09;SessionPage 模块的基本功能 &#xff08;二&#xff09;基本使…

1.两数之和-力扣(LeetCode)

题目&#xff1a; 解题思路&#xff1a; 在解决这个问题之前&#xff0c;首先要明确两个点&#xff1a; 1、参数returnSize的含义是返回答案的大小&#xff08;数目&#xff09;&#xff0c;由于这里的需求是寻找数组中符合条件的两个数&#xff0c;那么当找到这两个数时&#…

Python 正则表达式基础教程:简单匹配

Python 正则表达式基础教程&#xff1a;简单匹配 正则表达式&#xff08;Regular Expression&#xff09;是一种用于匹配字符串模式的强大工具。在 Python 中&#xff0c;正则表达式广泛用于数据处理、文本分析等任务&#xff0c;能够帮助我们快速找到或替换特定的字符或字符串…

什么是 Real-Time Factor (RTF)

在 TTS&#xff08;Text-to-Speech&#xff09; 领域&#xff0c;RTF 通常指的是 Real-Time Factor&#xff0c;即“实时因子”。这是一个衡量 TTS 系统性能的重要指标&#xff0c;用来评估模型在语音生成过程中的效率。 什么是 Real-Time Factor (RTF) RTF 表示生成语音所需…

Git - 命令杂谈

记录一些平时常用的Git命令 clone git clone URL -b BRANCH path--depth 1 日志中只有最近1次提交--bare 以镜像的形式拉取代码--mirror 以镜像的形式拉取代码&#xff0c;与--bare相比&#xff0c;--mirror不仅将源的本地分支映射到目标的本地分支&…

Linux网络编程之UDP编程

UDP编程效率高&#xff0c;不需要差错校验&#xff0c;在视频点播场景应用高 基于UDP协议客户端和服务端的编程模型&#xff0c;和TCP模型有点类似&#xff0c;有些发送接收函数不同,TCP是之间调用I/O函数read0或write()进行读写操作&#xff0c;而UDP是用sendto()和readfrom(…

产品思维如何颠覆我的开发与盈利观-营销自己

之前&#xff0c;我独自一人开发了一个名为“心情追忆”的小程序&#xff0c;旨在帮助用户记录日常的心情变化及重要时刻。从项目的构思、设计、前端&#xff08;小程序&#xff09;开发、后端搭建到最终部署&#xff0c;所有环节都由我一人包办。经过一个月的努力&#xff0c;…

基于图像处理与机器学习的车牌检测识别系统设计与实现

摘要&#xff1a;随着智能交通系统的快速发展&#xff0c;车牌检测识别技术在交通管理、安防监控等领域的应用日益广泛。然而&#xff0c;复杂环境因素如光照变化、遮挡、背景干扰等给车牌检测识别带来诸多挑战。本研究旨在设计并实现一种鲁棒性强、准确率高的车牌检测识别系统…