话不多说,直接上代码
先整体结构
pom依赖:
<parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.18</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!-- <version>2.7.18</version>--></dependency><!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version><scope>runtime</scope></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--jdbc--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
代码:
DispatchService:
package com.zy.servce;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.beans.Transient;
import java.io.IOException;/*** @Author: zy* @Date: 2024-11-08-15:35* @Description: 运单系统** 消费者收到消息进行处理,处理成功则发送ACK消息通知MQ清除该条记录,* 否则通知MQ重发或者等待MQ自动重发。** 本地维护一个处理次数,如果多次处理仍然失败,则将该消息丢弃或者加入到死信队列(DLQ)中。死信队列中的数据可以人工干预。*/
@Slf4j
@Service
public class DispatchService {@Autowiredprivate JdbcTemplate jdbcTemplate;@RabbitListener(queues = "orderQueue")public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)throws IOException {try {//MQ里面的数据转换成JSON数据JSONObject orderInfo = JSONObject.parseObject(message);log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());Thread.sleep(1000L);//执行业务操作,同一个数据不能处理两次,根据业务情况去重,保证幂等性String orderid = orderInfo.getString("order_id");//分配快递员配送dispatch(orderid);//ack 通知MQ数据已经收到channel.basicAck(tag, false);} catch (Exception e) {//异常情况,需要根据需求去重发或者丢弃//重发一定次数后丢弃,日志告警(rabbitmq没有设置重发次数功能,重发时需要代码实现,比如使用redis记录重发次数,)channel.basicNack(tag, false, false);//系统关键数据异常,需要人工干预}//如果不给确认回复,就等这个consumer断开连接后,MQ会继续推送}/*** 分配快递员** @param orderId 订单编号*/@Transientprivate void dispatch(String orderId) throws Exception {String sql = "insert into tb_dispatch (order_id,courier,status) values (?,?,?)";int count = jdbcTemplate.update(sql, orderId, "东哥", "配送中");if (count != 1) {throw new Exception("调度数据插入失败,原因[数据库操作]");}}
}
OrderService:
package com.zy.servce;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.beans.Transient;/*** @Author: zy* @Date: 2024-11-08-15:21* @Description: 订单中心*/
@Slf4j
@Service
public class OrderService {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void setup() {//消息发送完成后,则回调此方法,ack代表此方法是否发送成功rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//ack为true,代表MQ已经准确收到消息if (!ack) {return;}try {String sql = "update tb_msgstatus set status = 1 where msg_id = ?";int count = jdbcTemplate.update(sql, correlationData.getId());if (count != 1) {log.warn("本地消息表状态修改失败");}} catch (Exception e) {log.warn("本息消息表状态修改异常", e);}}});}/*** 创建订单信息** @param order 订单信息* @throws Exception*/public void createOrder(JSONObject order) throws Exception {//保存订单信息saveOrder(order);//发送MQ消息,直接发送时不可靠,可能会失败(发送后根据回执修改状态表,定时任务扫表读取失败数据重新发送)sendMsg(order);}/*** 发送订单信息至MQ** @param order 订单信息*/private void sendMsg(JSONObject order) {//发送消息到MQ,CorrelationData作用:当收到消息回执时会带上这个参数rabbitTemplate.convertAndSend("orderExchange", "", order.toJSONString(),new CorrelationData((String) order.get("order_id")));}/*** 保存订单信息** @param order 订单信息* @throws Exception*/@Transientprivate void saveOrder(JSONObject order) throws Exception {String sql = "insert into tb_order (order_id,user_id,goods_id,order_time) values (? , ? , ? , now())";//保存订单信息int count = jdbcTemplate.update(sql, order.get("order_id"), order.get("user_id"), order.get("goods_id"));if (count != 1) {throw new Exception("订单创建失败");}//保存消息发送状态saveLocalMsg(order);}/*** 记录消息发送状态** @param order 订单信息* @throws Exception*/private void saveLocalMsg(JSONObject order) throws Exception {String sql = "insert into tb_msgstatus (msg_id,msg,status,send_time) values (? , ? , 0 , now())";//记录消息发送状态int count = jdbcTemplate.update(sql, order.get("order_id"), order.toJSONString());if (count != 1) {throw new Exception("记录消息发送状态失败");}}
}
RabbitMqDistributedTractionApp:
package com.zy;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author: zy* @Date: 2024-11-08-14:58* @Description: RabbitMQ实现分布式事务,保证数据一致性*/
@SpringBootApplication
public class RabbitMqDistributedTractionApp {public static void main(String[] args) {SpringApplication.run(RabbitMqDistributedTractionApp.class, args);System.out.println("启动成功。。。。");}}
yml文件:
server:port: 8080spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/rabbitmq?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=trueusername: 用户名password: MYSQL密码rabbitmq:host: MQ IPport: 5672username: MQ 用户名password: MQ 密码virtual-host: vHost1#必须配置这个,生产者才会确认回调publisher-confirm-type: correlatedpublisher-returns: true#重要,手动开启消费者ACK,控制消息在MQ中的删除、重发listener:simple:acknowledge-mode: MANUAL
Test类:
package com.zy;import com.alibaba.fastjson.JSONObject;
import com.zy.servce.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;/*** @Author: zy* @Date: 2024-11-08-15:22* @Description:*/
@SpringBootTest
public class Test {@Autowiredprivate OrderService orderService;@org.junit.jupiter.api.Testpublic void orderServiceTest() throws Exception {//生成订单信息JSONObject orderinfo = new JSONObject();orderinfo.put("order_id", UUID.randomUUID().toString());orderinfo.put("user_id",UUID.randomUUID().toString());orderinfo.put("goods_id",UUID.randomUUID().toString());System.out.println("数据:"+orderinfo);orderService.createOrder(orderinfo);}
}
下面是SQL文件:
-- ----------------------------
-- Table structure for tb_dispatch
-- ----------------------------
DROP TABLE IF EXISTS `tb_dispatch`;
CREATE TABLE `tb_dispatch` (`order_id` varchar(200) NOT NULL COMMENT '订单ID',`courier` varchar(200) DEFAULT NULL COMMENT '快递员姓名',`status` varchar(200) DEFAULT NULL COMMENT '配送状态',PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='配送表';-- ----------------------------
-- Table structure for tb_msgstatus
-- ----------------------------
DROP TABLE IF EXISTS `tb_msgstatus`;
CREATE TABLE `tb_msgstatus` (`msg_id` varchar(200) NOT NULL COMMENT '消息ID',`msg` text COMMENT '消息内容',`status` varchar(200) DEFAULT NULL COMMENT '消息状态,发送-0,发送成功-1,发送失败-2',`send_time` datetime DEFAULT NULL COMMENT '发送时间',PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- ----------------------------
-- Table structure for tb_order
-- ----------------------------
DROP TABLE IF EXISTS `tb_order`;
CREATE TABLE `tb_order` (`order_id` varchar(200) NOT NULL COMMENT '订单ID',`user_id` varchar(200) DEFAULT NULL COMMENT '用户ID',`goods_id` varchar(200) DEFAULT NULL COMMENT '商品ID',`order_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;SET FOREIGN_KEY_CHECKS = 1;```