分布式系统事务一致性解决方案(基于事务消息)

embedded/2024/9/20 3:56:32/ 标签: 分布式

参考:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/

文章目录

    • 概要
    • 错误的方案
    • 方案一:业务方自己实现
    • 方案二:RocketMQ 事务消息
      • 什么是事务消息
      • 事务消息处理流程
      • 事务消息生命周期
      • 使用限制
      • 使用示例
      • 使用建议


概要

说起分布式事务,就会谈到 Bob 给 Smith 账户转账的问题:2 个账号, 分别处于 A、B 两个 DB,或者说 2 个不同的子系统中,Bob 的账户要扣钱,Smith 的账户要加钱,如何保证原子性?

一般思路是通过消息中间件来实现 “最终一致性”:A系统扣钱,然后发送消息给 MQ,B系统接收此消息,进行加钱。

但这里有个问题:是先update DB后发消息?还是先发消息后update DB呢?

  • 假设先update DB成功发送消息时网络问题,重发又失败,怎么办?
  • 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都会有问题。

错误的方案

有人可能会想到,可以把 “发送消息” 这个网络调用和 update DB 放在同一个事务中,如果发送消息失败,update DB 自动回滚。这样不就保证原子性了吗?

这个方案看似正确,但其实是错误的,原因:

  1. 把网络调用放在 DB 事务中,可能会因为网络的延时,导致 DB 长事务。严重的还会 block 整个 DB,这个风险很大。
  2. 如果发送消息失败,发送方并不知道是 MQ 没收到消息,还是已收到消息只是返回 response 的时候失败了。如果接收方已经收到消息了,而发送方认为没有收到,执行 update DB 回滚的操作,会导致 A 账户钱没有扣,B 账户的钱却增加了。

方案一:业务方自己实现

假设消息中间件没提供 “事务消息” 功能,比如 Kafka。那该如何解决这个问题?

结局方案如下:

  1. Producer 准备 1 个消息表,把 update DB 和 send message 这 2 个操作,放在一个DB事务中。
  2. 准备一个后台程序,不停地把消息表中的 message 传送给消息中间件。发送失败则不断重试,直到成功。允许消息重复,但消息不会丢。
  3. Consumer 准备 1 个判重表。处理过的消息记录在里面,实现业务的幂等性

通过以上 3 步,就基本解决了原子性问题

但此方案缺点是:需要设计 DB 消息表,同时还需要 1 个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合,额外增加业务方的负担。

方案二:RocketMQ 事务消息

目前各大知名电商平台和互联网公司,几乎都是使用 “事务消息” 这种方案来实现 “最终一致性” 的。这种方式适用的业务场景广泛,且比较靠谱。

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

在这里插入图片描述

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查(服务端回查的间隔时间和最大回查次数,请参见参数限制)。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

在这里插入图片描述

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制

  1. 消息类型一致性
    事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

  2. 消费事务性
    Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

  3. 中间状态可见性
    Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

  4. 事务超时机制
    Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.type为TRANSACTION的属性用来支持事务消息

以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilderImpl();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。*/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

使用建议

  1. 避免大量未决事务导致超时

    Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

  2. 正确处理"进行中"的事务

    消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

    • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

    • 程序能正确识别正在进行中的事务。



http://www.ppmy.cn/embedded/22464.html

相关文章

覆盖完整产业链“2024长三角消费电子产业展会”11月在南京召开

2024长三角消费电子产业展览会将与11月份在南京国际博览中心盛大开幕。作为一场集智慧生活、智慧健康、人工智能、雷达技术、智能机器人、5G通信和自动驾驶等众多领域于一体的消费电子产业盛会&#xff0c;本届展会不仅全面覆盖了消费电子产业链的各个环节&#xff0c;更致力于…

实验一: 设备密码配置与远程管理

1.实验环境 用路由器和交换机搭建实验环境 2.需求描述 实现管理员主机对交换机和路由器的远程管理 设备上配置的密码都要被加密 3.推荐步骤 对路由器配置的步骤如下&#xff1a; 实现路由器和PC的连通性配置VTY密码和特权模式密码在PC上Telnet 到路由器。 对交换机配置的…

.net 图片操作

图片操作 bitmap 旋转 bitmap左右镜像 /// <summary>/// bitmap角度旋转/// </summary>/// <param name"image"></param>/// <param name"angle"></param>/// <returns></returns>public static Bitmap R…

0418EmpTomCat项目 初次使用ajax实现局部动态离职

0418EmpTomCat项目包-CSDN博客 数据库字段&#xff1a; 员工部门表 分页查询&#xff1b; 多条件查询&#xff1b; 添加新员工&#xff1b; ajax点击离职操作效果&#xff1a;

PhotosCollage for Mac:优雅且实用的照片拼贴软件

PhotosCollage for Mac是一款优雅且实用的照片拼贴软件&#xff0c;为Mac用户提供了一个便捷、高效的平台&#xff0c;以创建精美、个性化的照片拼贴作品。 PhotosCollage for Mac v1.4.1激活版下载 该软件界面简洁直观&#xff0c;操作便捷。用户只需将想要拼贴的照片拖入“照…

k8s pod 镜像拉取策略

在 Kubernetes (k8s) 中&#xff0c;Pod 容器镜像的拉取策略通过 imagePullPolicy 属性来控制。这一策略决定了 kubelet 如何以及何时从容器镜像仓库中拉取镜像。以下是三种主要的镜像拉取策略及其详细说明&#xff1a; Always: 说明: 这是默认的拉取策略。当设置为 Always 时&…

从NoSQL到NewSQL——10年代大数据浪潮下的技术革新

引言 在数字化浪潮的推动下&#xff0c;数据库技术已成为支撑数字经济的坚实基石。腾讯云 TVP《技术指针》联合《明说三人行》特别策划的直播系列——【中国数据库前世今生】&#xff0c;我们将通过五期直播&#xff0c;带您穿越五个十年&#xff0c;深入探讨每个时代的数据库演…

mysql 开启远程连接

登录到mysql mysql -uroot -p 打开mysql数据库并查询user表 use mysql; select user, host from user;更改需要远程连接数据库为任何ip 可以连接&#xff0c; 并刷新系统权限相关的表 update user set host% where hostlocalhost and userroot; flush privileges;

Apache Flink 流处理-[CentOS|Rocky] 镜像

Flink Docker仓库包含了Dockerfiles用于为Flink构建docker images使用&#xff0c;这些 Dockerfile 由 Apache Flink 社区维护&#xff0c;但 Docker 社区负责在 Docker Hub 上构建和托管映像。目前市面上流行的Flink镜像都是基于Ubuntu镜像构建&#xff0c;由于项目需求变化&a…

3. uniapp开发工具的一些事

前言 新的一天&#xff0c;又要开始卷起来了&#xff0c;开发程序开发当前离不开开发工具&#xff0c;一个好的开发工具办事起来那必然是事倍功半的...本文主要分享了关于uniapp里开发工具的一些事~ 概述 阅读时间&#xff1a;约5&#xff5e;7分钟&#xff1b; 本文重点&am…

020Node.js的FS模块使用fs.mkdir创建目录

Node.js的FS模块使用fs.mkdir创建目录 //fs.mkdir 创建目录 /*path 将创建的目录路径mode 目录权限&#xff08;读写权限&#xff09;&#xff0c;默认777callback 回调&#xff0c;传递异常参数err*/ const fsrequire(fs);fs.mkdir(./css,(err)>{if(err){console.log(err)…

MySql:连接和关闭

c connector 下面是一个示例&#xff0c;世界使用c connector时注意release版本和debug版本要和响应的动态库匹配才可以 #include <mysql_driver.h> #include <mysql_connection.h> #include <cppconn/resultset.h> #include <cppconn/statement.h>i…

UDP和TCP(传输层)

这里写目录标题 UDPUDP的基本特点UDP协议报文格式 TCPTCP协议报文格式TCP特点可靠传输实现机制确认应答超时重传数据丢了应答报文丢了 小结 UDP UDP的基本特点 无连接不可靠传输面向数据报全双工 UDP协议报文格式 2个字节有效范围(无符号): 0 ~ 65535(2^16 - 1). 2个字节有效范…

Pytorch迁移学习训练病变分类模型

划分数据集 1.创建训练集文件夹和测试集文件夹 # 创建 train 文件夹 os.mkdir(os.path.join(dataset_path, train))# 创建 test 文件夹 os.mkdir(os.path.join(dataset_path, val))# 在 train 和 test 文件夹中创建各类别子文件夹 for Retinopathy in classes:os.mkdir(os.pa…

JS常用数据类型的方法函数调用,数组的方法、对象的方法、字符串的方法、数字的方法、集合的方法、映射的方法、栈的方法、队列的方法、链表的封装、树的封装、

JS常用数据类型的方法函数调用&#xff0c;数组的方法、对象的方法、字符串的方法、数字的方法、集合的方法、映射的方法、栈的方法、队列的方法、链表的封装、树的封装 1. 数组&#xff08;Array&#xff09;&#xff1a;数组是一组按顺序存储的元素的集合&#xff1a;2. 对象…

GPT的全面历史和演变:从GPT-1到GPT-4

人工智能新篇章&#xff1a;GPT-4与人类互动的未来&#xff01; 本文探讨了生成式预训练 Transformer (GPT) 的显着演变&#xff0c;提供了从开创性的 GPT-1 到复杂的 GPT-4 的旅程。 每次迭代都标志着重大的技术飞跃&#xff0c;深刻影响人工智能领域以及我们与技术的互动。 我…

feign整合sentinel做降级知识点

1&#xff0c;配置依赖 <!-- Feign远程调用依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency> <!--sentinel--><dependency>…

微软开源了 MS-DOS 4.00

DOS的历史源远流长&#xff0c;很多现在的年轻人不知道DOS了。其实早期的windows可以看做是基于DOS的窗口界面的模拟器&#xff0c;系统的本质其实是DOS。后来DOS的漏洞还是太多了&#xff0c;微软重新写了windows的底层内核。DOS只是一个辅助终端的形式予以保留了。 微软是在…

golang上传文件到ftp服务器

之前有个业务需要把文件上传到ftp服务器&#xff0c;写了一个上传ftp的功能 package ftpimport "context"type Client interface {// UploadFile 上传文件UploadFile(ctx context.Context, opt *UploadFileOpt) error }type UploadFileOpt struct {Data […

操作系统——优先权算法c++实现

变量描述 测试数据 5 A 0 4 4 B 1 3 2 C 2 5 3 D 3 2 5 E 4 4 1 先来先服务算法 简述 该算法实现非常简单就是对到达时间排个序&#xff0c;然后依次进行即可&#xff0c;对结构体的sort进行了重载 代码 void FCFS() {//先来先服务算法std::cout<<"\n\t\t\t\t\…