消息队列中的事务消息

news/2024/10/30 21:26:56/

大家好,我是易安!今天我们谈一谈消息队列中的事务消息这个话题。

一说起事务,你可能自然会联想到数据库。我们日常使用事务的场景,绝大部分都是在操作数据库的时候。像MySQL、Oracle这些主流的关系型数据库,也都提供了完整的事务实现。那消息队列为什么也需要事务呢?

其实很多场景下,我们“发消息”这个过程,目的往往是通知另外一个系统或者模块去更新数据, 消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。

拿电商产品来举个例子。一般来说,用户在电商APP上购物时,先把商品加到购物车里,然后几件商品一起下单,最后支付,完成购物流程,就可以愉快地等待收货了。

这个过程中有一个需要用到消息队列的步骤,订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤,并不是用户下单支付这个主要流程中必需的步骤,使用消息队列来异步清理购物车是更加合理的设计。

alt

对于订单系统来说,它创建订单的过程中实际上执行了2个步骤的操作:

  1. 在订单库中插入一条订单数据,创建订单;
  2. 发消息给消息队列,消息的内容就是刚刚创建的订单。

购物车系统订阅相应的主题,接收订单创建的消息,然后清理购物车,在购物车中删除订单中的商品。

在分布式系统中,上面提到的这些步骤,任何一个步骤都有可能失败,如果不做任何处理,那就有可能出现订单数据与购物车数据不一致的情况,比如说:

  • 创建了订单,没有清理购物车;
  • 订单没创建成功,购物车里面的商品却被清掉了。

那我们需要解决的问题可以总结为:在上述任意步骤都有可能失败的情况下,还要保证订单库和购物车库这两个库的数据一致性。

对于购物车系统收到订单创建成功消息清理购物车这个操作来说,失败的处理比较简单,只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。

问题的关键点集中在订单系统,创建订单和发送消息这两个步骤要么都操作成功,要么都操作失败,不允许一个成功而另一个失败的情况出现。

这就是事务需要解决的问题。

什么是分布式事务?

那什么是事务呢?如果我们需要对若干数据进行更新操作,为了保证这些数据的完整性和一致性,我们希望这些更新操作 要么都成功,要么都失败。 至于更新的数据,不只局限于数据库中的数据,可以是磁盘上的一个文件,也可以是远端的一个服务,或者以其他形式存储的数据。

这就是通常我们理解的事务。其实这段对事务的描述不是太准确也不完整,但是,它更易于理解,大体上也是正确的。所以我还是倾向于这样来讲“事务”这个比较抽象的概念。

一个严格意义的事务实现,应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。

  • 原子性,是指一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情况。

  • 一致性,是指这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据。

  • 隔离性,是指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰,这个有点儿像我们打网游中的副本,我们在副本中打的怪和掉的装备,与其他副本没有任何关联也不会互相影响。

  • 持久性,是指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何影响。

大部分传统的单体关系型数据库都完整的实现了ACID,但是,对于分布式系统来说,严格的实现ACID这四个特性几乎是不可能的,或者说实现的代价太大,大到我们无法接受。

分布式事务就是要在分布式系统中的实现事务。在分布式系统中,在保证可用性和不严重牺牲性能的前提下,光是要实现数据的一致性就已经非常困难了,所以出现了很多变种版的一致性,比如顺序一致性、最终一致性等等。

显然实现严格的分布式事务是更加不可能完成的任务。所以,目前大家所说的分布式事务,更多情况下,是在分布式系统中事务的不完整实现。在不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题。

在实际应用中,比较常见的分布式事务实现有2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel)和事务消息。每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。

这里我就重点讲述下2PC的方式

两阶段提交方式解决分布式事务

两阶段提交协议为了保证分布在不同节点上的分布式事务的一致性,我们需要引入一个协调者来管理所有的节点,负责各个本地资源的提交和回滚,并确保这些节点正确提交操作结果,若提交失败则放弃事务。

XA 协议

XA 是一个分布式事务协议,规定了事务管理器和资源管理器接口。因此,XA 协议可以分为两部分,即事务管理器本地资源管理器

  • 事务管理器作为 协调者,负责各个本地资源的提交和回滚;
  • 资源管理器就是分布式 事务的参与者.其中资源管理通常是 数据库

基于 XA 协议的二阶段提交方法中,二阶段提交协议(The two-phase commit protocol,2PC),用于保证分布式系统中事务提交时的数据一致性,是 XA 在全局事务中用于协调多个资源的机制。

什么是二阶段提交

分为投票提交两个阶段。

alt

投票为第一阶段:

  • 1 协调者(Coordinator,即事务管理器)会向事务的参与者(Cohort,即本地资源管理器)发起执行操作的 CanCommit 请求,并等待参与者的响应.
  • 2 参与者接收到请求后,会执行请求中的事务操作,记录日志信息(包含事务执行前的镜像),同时锁定当前记录。参与者执行成功,则向协调者发送“Yes”消息,表示同意操作;若不成功,则发送“No”消息,表示终止操作。
  • 3 当所有的参与者都返回了操作结果(Yes 或 No 消息)后,系统进入了提交阶段。

提为第二阶段

协调者会根据所有参与者返回的信息向参与者发送 DoCommit 或 DoAbort 指令

  • 若协调者收到的都是“Yes”消息,则向参与者发送“DoCommit”消息,参与者会完成剩余的操作并释放资源,然后向协调者返回“HaveCommitted”消息;
  • 如果协调者收到的消息中包含“No”消息,则向所有参与者发送“DoAbort”消息,此时发送“Yes”的参与者则会根据之前执行操作时的回滚日志对操作进行回滚,然后所有参与者会向协调者发送“HaveCommitted”消息;
  • 协调者接收到“HaveCommitted”消息,就意味着整个事务结束了。

2PC问题

同步阻塞问题:二阶段提交算法在执行过程中,所有参与节点都是事务阻塞型的。也就是说,当本地资源管理器占有临界资源时,其他资源管理器如果要访问同一临界资源,会处于阻塞状态。

协调者单点故障导致参与者长期阻塞问题:基于 XA 的二阶段提交算法类似于集中式算法,一旦事务管理器发生故障,整个系统都处于停滞状态。尤其是在提交阶段,一旦事务管理器发生故障,资源管理器会由于等待管理器的消息,而一直锁定事务资源,导致整个系统被阻塞。

数据不一致问题:在提交阶段,当协调者向参与者发送 DoCommit 请求之后,如果发生了局部网络异常,或者在发送提交请求的过程中协调者发生了故障,就会导致只有一部分参与者接收到了提交请求并执行提交操作,但其他未接到提交请求的那部分参与者则无法执行事务提交。于是整个分布式系统便出现了数据不一致的问题。

二阶段无法解决的问题:协调者再发出DoCommit 消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。

3PC

三阶段提交协议(Three-phase commit protocol,3PC),是对二阶段提交(2PC)的改进。为了解决两阶段提交的同步阻塞和数据不一致问题,三阶段提交引入了超时机制准备阶段

超时机制

同时在协调者和参与者中引入超时机制。如果协调者或参与者在规定的时间内没有接收到来自其他节点的响应,就会根据当前的状态选择提交或者终止整个事务。

准备阶段

在第一阶段和第二阶段中间引入了一个准备阶段,也就是在提交阶段之前,加入了一个预提交阶段。在预提交阶段排除一些不一致的情况,保证在最后提交之前各参与节点的状态是一致的。

alt

CanCommit 阶段

协调者向参与者发送请求操作(CanCommit 请求),询问参与者是否可以执行事务提交操作,然后等待参与者的响应;参与者收到 CanCommit 请求之后,回复 Yes,表示可以顺利执行事务;否则回复 No。(我个人理解类似做TCC中Try操作

alt

PreCommit 阶段

协调者根据参与者的回复情况,来决定是否可以进行 PreCommit 操作 或 中断事务。

如果所有参与者回复的都是“Yes”,那么协调者就会执行事务的预执行:

  • 发送预提交请求。协调者向参与者发送 PreCommit 请求,进入预提交阶段。
  • 事务预提交。参与者接收到 PreCommit 请求后执行事务操作,并将 Undo 和 Redo 信息记录到事务日志中,同时锁定当前记录。
  • 响应反馈。如果参与者成功执行了事务操作,则返回 ACK 响应,同时开始等待最终指令

如果任何一个参与者向协调者发送了“No”消息,或者等待超时之后,协调者都没有收到参与者的响应,就执行中断事务的操作:

  • 发送中断请求。协调者向所有参与者发送“Abort”消息。
  • 中断事务。参与者收到“Abort”消息之后,或超时后仍未收到协调者的消息,执行事务的中断操作。
alt

DoCommit 阶段

协调者根据参与者的回复情况,来决定是否可以进行 DoCommit 操作 或 中断事务。

如果所有参与者回复的都是“Yes”,那么协调者就会执行事务的提交:

  • 发送提交请求。协调者接收到所有参与者发送的 Ack 响应,从预提交状态进入到提交状态,并向所有参与者发送 DoCommit 消息。
  • 事务提交。参与者接收到 DoCommit 消息之后,正式提交事务。完成事务提交之后,释放所有锁住的资源。
  • 响应反馈。参与者提交完事务之后,向协调者发送 Ack 响应。
  • 完成事务。协调者接收到所有参与者的 Ack 响应之后,完成事务。

如果任何一个参与者向协调者发送了“No”消息,或者协调者等待超时之后,协调者都没有收到参与者的响应,就执行中断事务的操作:

  • 发送中断请求。协调者向所有参与者发送 Abort 请求。
  • 事务回滚。参与者接收到 Abort 消息之后,利用其在 PreCommit 阶段记录的 Undo 信息执行事务的回滚操作,并释放所有锁住的资源。
  • 反馈结果。参与者完成事务回滚之后,向协调者发送 Ack 消息。
  • 中断事务。协调者接收到参与者反馈的 Ack 消息之后,执行事务的中断,并结束事务。 。

当参与者PreCommit 阶段向协调者发送 Ack 消息后,如果长时间没有得到协调者的响应,在默认情况下,参与者会自动将超时的事务进行提交,不会像两阶段提交那样被阻塞住

alt
  • 如何解决协调者单点故障导致参与者长期阻塞。

由于存在超时机制,即使协调者发生故障,参与者无法及时收到来自协调者的信息之后,他会默认执行commit。避免参与者长期阻塞。

  • 同步阻塞问题

3PC会在2阶段到3阶段间阻塞,2PC会在1阶段到2阶段整个事务过程中阻塞,因而总体来说3PC并不能不阻塞,只是最大限度减少了阻塞的时间。同时安装5.2也能够解决协调者单点故障导致参与者长期阻塞的问题

  • 数据不一致问题

3PC和2PC都无法解决数据一致的问题,不过3PC存在超时会通过超时保证协调者和参与者在提交阶段无法通信过程中最终一致,而不需人工介入。

可以看到不管是2阶段提交还是3阶段提交都是有些问题的,当然我们还有消息队列中的事务消息这种思路。事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。

消息队列如何实现分布式事务?

事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能。

回到订单和购物车这个例子,我们一起来看下如何用消息队列来实现分布式事务,这里以RocketMQ来举例。

alt

首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka和RocketMQ给出了2种不同的解决方案。

Kafka的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ则给出了另外一种解决方案。

RocketMQ中的分布式事务实现

在RocketMQ中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果Producer也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ的Broker没有收到提交或者回滚的请求,Broker会定期去Producer上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。

在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ会自动根据事务反查的结果提交或者回滚事务消息。

这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。

综合上面讲的通用事务消息的实现和RocketMQ的事务反查机制,使用RocketMQ事务消息功能实现分布式事务的流程如下图:

alt

总结

本文通过一个订单购物车的例子,学习了事务的ACID四个特性,以及如何使用消息队列来实现分布式事务。然后我给出了现有的几种分布式事务的解决方案,包括事务消息,但是这几种方案都不能解决分布式系统中的所有问题,每一种方案都有局限性和特定的适用场景。

最后,我们一起学习了RocketMQ的事务反查机制,这种机制通过定期反查事务状态,来补偿提交事务消息可能出现的通信失败。在Kafka的事务功能中,并没有类似的反查机制,需要用户自行去解决这个问题。但是,这不代表RocketMQ的事务功能比Kafka更好,只能说在我们这个例子的场景下,更适合使用RocketMQ。Kafka对于事务的定义、实现和适用场景,和RocketMQ有比较大的差异。

本文由 mdnice 多平台发布


http://www.ppmy.cn/news/62073.html

相关文章

K8s基础8——svc基础使用、应用暴露、iptables代理、ipvs代理

文章目录 一、Service基本了解二、Service定义与创建2.1 相关命令2.2 yaml文件参数大全2.3 创建svc2.3.1 两种创建方式类比2.3.2 验证集群内A应用访问B应用2.3.3 将集群外服务定义为K8s的svc2.3.4 分配多个端口 2.4 常用三种类型2.4.1 ClusterIP(集群内部访问&#…

持续测试:DevOps时代质量保证的关键

在 DevOps 时代,持续测试已成为质量保证的一个重要方面。近年来,软件开发方法论发生了快速转变。随着 DevOps 的出现,已经发生了向自动化和持续集成与交付 (CI/CD) 的重大转变。传统的质量保证方法已不足以满足现代软件开发实践的需求。持续测…

基于微信小程序的酒店预定管理系统设计与实现

第1章 绪论 1 1.1开发背景与意义 1 1.2开发方法 1 1.3论文结构 1 2系统开发技术与环境 3 2.1 系统开发语言 3 2.2 系统开发工具 3 2.3 系统页面技术 3 2.4 系统数据库的选择 4 2.5 系统的运行环境 4 2.5.1 硬件环境 4 2.5.2 软件环境 4 3系统分析 5 3.1可行性分析 5 3.1.1 经济…

JUC并发编程与源码分析笔记13-AbstractQueuedSynchronizer之AQS

前置知识 公平锁和非公平锁可重入锁自旋思想LockSupport数据结构之双向链表设计模式之模板设计模式 AQS入门级别理论知识 是什么 AbstractQueuedSynchronizer:抽象的队列同步器。 用来实现锁或其他同步器组件的公共基础部分的抽象实现,是重量级基础框…

IDP中的黄金路径究竟是什么?

在云原生时代,开发人员面临着越来越多的工具、技术、思维方式的选择,给他们带来了极大的认知负担和工作量。为了提高开发人员的开发效率与开发体验,一些头部科技公司开始建立自己的内部开发者平台(IDP)。在之前的文章我…

阿里云CentOS服务器安装Redis教程(一步步操作)

使用阿里云服务器ECS安装Redis数据库流程,操作系统为CentOS 7.6镜像,在CentOS上安装Redis 4.0.14,云服务器选择的是持久内存型re6p实例,新手站长分享阿里云CentOS服务器安装Redis流程方法: 目录 在CentOS系统中部署R…

C++容器适配器stack和queue(含deque,priority_queue)

目录 1.容器适配器 1.1 什么是适配器 1.2 STL标准库中stack和queue底层结构 1.3 deque 1.3.1 deque原理介绍(了解) 1.3.2 deque优点和缺点 1.3.3 为什么选择deque作为stack和queue的底层默认容器 2. stack介绍和使用 2.1 stack介绍 2.2 stack使用 2.3 …

删除lpt1.css.asp或com8.index.asp这类文件的方法_asp木马无法删除解决办法

aux|prn|con|nul|com1|com2|com3|com4|com5|com6|com7|com8|com9|lpt1|lpt2|lpt3|lpt4|lpt5|lpt6|lpt7|lpt8|lpt9 但是可以通过cmd的copy命令实现: D:\wwwroot>copy rootkit.asp \\.\D:\wwwroot\lpt6.80sec.asp 前面必须有 \\.\ 已复制 1 个文件。 D…