基于本地消息表实现分布式事务

news/2025/1/23 0:57:43/

假设我们有一个电商系统,包含订单服务和库存服务。当用户下单时,需要在订单服务中创建订单,同时在库存服务中扣减库存。这是一个典型的分布式事务场景,我们需要保证这两个操作要么都成功,要么都失败,以保证数据的最终一致性。

项目结构:

  1. 订单服务(Order Service)
  2. 库存服务(Inventory Service)
  3. 本地消息表(Local Message Table)
  4. 消息恢复系统(Message Recovery System)

核心思想:
使用本地消息表来实现分布式事务。在订单服务中,我们将创建订单和发送消息这两个操作放在一个本地事务中。如果本地事务成功,则订单创建成功,消息也被保存到本地消息表中。然后通过定时任务或消息队列来发送消息到库存服务,实现库存扣减。如果在这个过程中出现任何异常,我们可以通过重试机制来保证最终一致性。

下面是详细的代码实现:

订单服务(Order Service)

@Service  
@Transactional  
public class OrderService {  @Autowired  private OrderRepository orderRepository;  @Autowired  private LocalMessageRepository localMessageRepository;  @Autowired  private KafkaTemplate<String, String> kafkaTemplate;  public void createOrder(Order order) {  // 开启本地事务  TransactionStatus txStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());  try {  // 1. 保存订单  orderRepository.save(order);  // 2. 创建本地消息  LocalMessage message = new LocalMessage();  message.setMessageId(UUID.randomUUID().toString());  message.setMessage(JSON.toJSONString(order));  message.setStatus("NEW");  localMessageRepository.save(message);  // 3. 提交事务  transactionManager.commit(txStatus);  // 4. 发送消息到Kafka  kafkaTemplate.send("inventory-topic", message.getMessageId(), message.getMessage());  } catch (Exception e) {  // 回滚事务  transactionManager.rollback(txStatus);  throw new RuntimeException("Create order failed", e);  }  }  
}

库存服务(Inventory Service)

@Service  
public class InventoryService {  @Autowired  private InventoryRepository inventoryRepository;  @KafkaListener(topics = "inventory-topic")  public void handleOrderCreation(ConsumerRecord<String, String> record) {  String messageId = record.key();  Order order = JSON.parseObject(record.value(), Order.class);  try {  // 扣减库存  inventoryRepository.decreaseStock(order.getProductId(), order.getQuantity());  // 确认消息处理成功  kafkaTemplate.send("inventory-result-topic", messageId, "SUCCESS");  } catch (Exception e) {  // 消息处理失败,发送失败消息  kafkaTemplate.send("inventory-result-topic", messageId, "FAILED");  }  }  
}

本地消息表(Local Message Table)

@Entity  
@Table(name = "local_message")  
public class LocalMessage {  @Id  private String messageId;  private String message;  private String status; // NEW, SENT, CONFIRMED  private Date createTime;  private Date updateTime;  // Getters and setters  
}

消息恢复系统(Message Recovery System)

@Component  
public class MessageRecoverySystem {  @Autowired  private LocalMessageRepository localMessageRepository;  @Autowired  private KafkaTemplate<String, String> kafkaTemplate;  @Scheduled(fixedRate = 60000) // 每分钟执行一次  public void recoverFailedMessages() {  List<LocalMessage> failedMessages = localMessageRepository.findByStatusAndCreateTimeBefore("NEW", new Date(System.currentTimeMillis() - 300000)); // 5分钟前的消息  for (LocalMessage message : failedMessages) {  try {  kafkaTemplate.send("inventory-topic", message.getMessageId(), message.getMessage());  message.setStatus("SENT");  localMessageRepository.save(message);  } catch (Exception e) {  // 记录日志,等待下次重试  log.error("Failed to recover message: " + message.getMessageId(), e);  }  }  }  @KafkaListener(topics = "inventory-result-topic")  public void handleInventoryResult(ConsumerRecord<String, String> record) {  String messageId = record.key();  String result = record.value();  LocalMessage message = localMessageRepository.findById(messageId).orElse(null);  if (message != null) {  if ("SUCCESS".equals(result)) {  message.setStatus("CONFIRMED");  } else {  message.setStatus("FAILED");  }  localMessageRepository.save(message);  }  }  
}

代码说明:

  1. 订单服务:
    • 在一个本地事务中完成订单创建和本地消息保存。
    • 事务成功后,立即发送消息到Kafka。
  2. 库存服务:
    • 监听Kafka消息,处理库存扣减。
    • 处理结果(成功或失败)通过Kafka反馈给订单服务。
  3. 本地消息表:
    • 存储待发送的消息,包括消息ID、内容、状态等信息。
  4. 消息恢复系统:
    • 定期检查本地消息表,重新发送失败的消息。
    • 监听库存服务的处理结果,更新本地消息状态。

项目亮点:

  1. 高可用性: 即使在网络故障或服务宕机的情况下,也能保证消息最终被成功处理。
  2. 数据一致性: 通过本地事务保证订单创建和消息发送的原子性,再通过消息重试机制保证最终一致性。
  3. 解耦性: 订单服务和库存服务通过消息进行异步通信,降低了系统耦合度。
  4. 可靠性: 使用本地消息表作为消息队列的可靠存储,避免了消息丢失的风险。
  5. 扩展性: 该方案易于扩展,可以方便地增加新的微服务而不影响现有服务。
  6. 性能: 采用异步处理方式,提高了系统的整体吞吐量。

通过这种方式,我们实现了在分布式系统中保证数据最终一致性的目标,同时保持了系统的高可用性和可扩展性。这种方案特别适用于对实时性要求不是特别高,但对数据一致性有较高要求的业务场景。


系列文章

  1. IT Governance Framework:IT治理框架
  2. 12306亿级流量架构分析(史上最全)
  3. 京东内部Redis性能优化最佳实践
  4. 金融级多数据中心灾备互联
  5. TOGAF业务架构-CSDN博客
  6. 如何建设金融数据中心-CSDN博客

互联网Java架构师-CSDN博客


资料下载和预览地址:

  • 链接: https://pan.baidu.com/s/1LFyFlsIHCv46DBQRfMGP9A 提取码: kx6b 


http://www.ppmy.cn/news/1565349.html

相关文章

Kafka 和 MQ 的区别

1.概述 1.1.MQ简介 消息中间件&#xff0c;其实准确的叫法应该叫消息队列&#xff08;message queue&#xff09;&#xff0c;简称MQ。其本质上是个队列&#xff0c;有FIFO的性质&#xff0c;即first in first out&#xff0c;先入先出。 目前市场上主流的MQ有三款&#xff…

python管理工具:conda部署+使用

python管理工具&#xff1a;conda部署使用 一、安装部署 1、 下载 - 官网下载&#xff1a; https://repo.anaconda.com/archive/index.html - wget方式&#xff1a; wget -c https://repo.anaconda.com/archive/Anaconda3-2023.03-1-Linux-x86_64.sh2、 安装 在conda文件的…

如何将本地电脑上的文件夹设置为和服务器的共享文件夹

将本地电脑上的文件夹设为与服务器共享的文件夹&#xff0c;通常是在本地开启文件共享&#xff0c;并配置相应的权限&#xff0c;使服务器可以访问该文件夹。以下以 Windows 系统为例说明具体操作步骤&#xff1a; 一、在本地电脑上设置共享文件夹 选择文件夹 找到需要共享的文…

基于libuv实现的C++定时器管理器——TimerManager

在多线程编程中&#xff0c;定时器是一个非常重要的功能&#xff0c;它能够让我们在特定的时间点执行特定的任务。本文将介绍一个基于libuv库实现的C定时器管理器——TimerManager&#xff0c;它通过创建多个工作线程&#xff0c;每个线程运行一个uv loop来高效地管理定时器任务…

【大模型】ChatGPT 高效处理图片技巧使用详解

目录 一、前言 二、ChatGPT 4 图片处理介绍 2.1 ChatGPT 4 图片处理概述 2.1.1 图像识别与分类 2.1.2 图像搜索 2.1.3 图像生成 2.1.4 多模态理解 2.1.5 细粒度图像识别 2.1.6 生成式图像任务处理 2.1.7 图像与文本互动 2.2 ChatGPT 4 图片处理应用场景 三、文生图操…

实现宿主机(Windows 10 Docker Desktop)和Linux容器之间的数据挂载的三种方法

在Windows 10上使用Docker Desktop运行Linux容器时&#xff0c;经常需要将宿主机上的文件或目录与容器内的文件或目录进行关联&#xff0c;以便实现数据的共享和持久化。Docker提供了多种方法来实现这一目标&#xff0c;包括使用-v选项挂载宿主机目录、创建Docker数据卷以及使用…

【数据结构篇】顺序表 超详细

目录 一.顺序表的定义 1.顺序表的概念及结构 1.1线性表 2.顺序表的分类 2.1静态顺序表 2.2动态顺序表 二.动态顺序表的实现 1.准备工作和注意事项 2.顺序表的基本接口&#xff1a; 2.0 创建一个顺序表 2.1 顺序表的初始化 2.2 顺序表的销毁 2.3 顺序表的打印 3.顺序…

0基础跟德姆(dom)一起学AI 自然语言处理18-解码器部分实现

1 解码器介绍 解码器部分: 由N个解码器层堆叠而成每个解码器层由三个子层连接结构组成第一个子层连接结构包括一个多头自注意力子层和规范化层以及一个残差连接第二个子层连接结构包括一个多头注意力子层和规范化层以及一个残差连接第三个子层连接结构包括一个前馈全连接子层…