参考
本文参考https://zhuanlan.zhihu.com/p/650791238从零到一搭建 TCC 分布式事务框架,并在小徐的基础上增加个人见解+笔记。
项目地址:https://github.com/xiaoxuxiansheng/gotcc
图解分布式事务实现原理(一):https://blog.csdn.net/qq_40318498/article/details/134430322?spm=1001.2014.3001.5502
图解分布式事务实现原理(二):https://blog.csdn.net/qq_40318498/article/details/134432420?spm=1001.2014.3001.5502
整体架构
TCC 本质上是一种 2PC(two phase commitment protocal 两阶段提交)的实现:
- 把分布式事务中,负责维护状态数据变更的模块,封装成一个 TCC 组件
- 把数据的变更状态拆分为对应 Try 操作的【冻结】、对应 Confirm 操作的【成功】以及对应 Cancel 操作的【失败回滚】
- 抽出一个统筹全局的事务协调者角色 TXManager. 在执行分布式事务时,分为两个阶段:
- 阶段 I:先对所有组件执行 Try 操作
- 阶段 II:根据上阶段 Try 操作的执行结果,决定本轮执行 Confirm 还是 Cancel 操作
在我们实现 TCC 框架的实战环节中,首先需要明确的事情是:
- 哪部分内容在 TCC 架构中属于通用的流程,这部分内容可以抽取出来放在 sdk 中,以供后续复用
- 哪部分内容需要给使用方预留出足够的自由度,由使用方自行实现,然后和通用 sdk 进行接轨.
这两点也是很好想,最终,这两部分内容明确如下:
- 在 TCC sdk 中实现的通用逻辑包含了和事务协调器 txManager 有关的核心流程
- 事务协调器 TXManager 开启事务以及 try-confirm/cancel 的 2PC 流程串联
- 事务协调器 TXManager 异步轮询任务,用于推进事务从中间态走向终态
- TCC 组件的注册流程
- 需要预定义事务日志存储模块 TXStore 的实现规范(声明 interface)
- 需要预定义 TCC 组件 TCCComponent 的实现规范(声明 interface)
以及使用方
- TCC 组件和 TXStore 两部分内容需要由使用方自行实现:
- 使用方自行实现 TCCComponent 类,包括其 Try、Confirm、Cancel 方法的执行逻辑
- 使用方自行实现具体的 TXStore 日志存储模块. 可以根据实际需要,选型合适的存储组件和存储方式
TCC Component
现在我们来思考一下TCC(Try-Confirm-Cancel)组件的定位和功能要求,以下是对每个要点的解释
-
用户自行实现:TCC组件是由用户自行实现的部分,这意味着开发者需要编写自定义的TCC组件代码来处理事务的Try、Confirm和Cancel操作。用户需要根据业务逻辑和需求来实现这些组件。
-
注册到RegistryCenter:TCC组件需要在TXManager启动时注册到注册中心(RegistryCenter)。这意味着用户需要配置TXManager以连接到注册中心,并将自定义的TCC组件注册到注册中心,以便TXManager能够获取并使用这些组件。
-
TXManager启动时获取组件:当使用方(应用程序)调用TXManager开启事务时,TXManager会通过注册中心获取已注册的TCC组件。这样,TXManager能够识别并使用用户自定义的TCC组件来执行事务的Try、Confirm和Cancel操作。
-
TCC组件的能力:TCC组件需要具备以下能力:
实现Try、Confirm和Cancel操作:TCC组件需要实现事务的Try、Confirm和Cancel三个阶段的操作,以确保事务的一致性。
处理业务逻辑:TCC组件需要能够执行与具体业务相关的操作,包括业务检查、资源预留、业务确认和回滚操作。
与TXManager进行交互:TCC组件需要与TXManager进行通信,以接收来自TXManager的指令,并汇报操作的执行结果(成功或失败)。
那么对应的代码实现可以简单化为:
// tcc 组件
type TCCComponent interface {// 返回组件唯一 idID() string// 执行第一阶段的 try 操作Try(ctx context.Context, req *TCCReq) (*TCCResp, error)// 执行第二阶段的 confirm 操作Confirm(ctx context.Context, txID string) (*TCCResp, error)// 执行第二阶段的 cancel 操作Cancel(ctx context.Context, txID string) (*TCCResp, error)
}
这里我们只是简单定义了TCCComponent 接口。
TX Manager
下面是关于事务协调器 TXManager 的定位.
- TXManager 是整个 TCC 架构中最核心的角色
- TXManager 作为 gotcc 的统一入口,供使用方执行启动事务和注册组件的操作
- TXManager 作为中枢系统分别和 RegisterCenter、TXStore 交互
- TXManager 需要串联起整个 Try-Confirm/Canel 的 2PC 调用流程
- TXManager 需要运行异步轮询任务,推进未完成的事务走向终态
TX Store
TXStore 是用于存储和管理事务日志明细记录的模块:
- 需要支持事务明细数据的 CRUD 能力
- 通常情况下,底层需要应用到实际的存储组件作为支持
- TXStore 在 gotcc 的 sdk 中体现为一个抽象的 interface. 需要由用户完成具体类的实现,并将其注入到 TXManager 当中.
// 事务日志存储模块
type TXStore interface {// 创建一条事务明细记录CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)// 更新事务进度:实际更新的是每个组件的 try 请求响应结果TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error// 提交事务的最终状态, 标识事务执行结果为成功或失败TXSubmit(ctx context.Context, txID string, success bool) error// 获取到所有未完成的事务GetHangingTXs(ctx context.Context) ([]*Transaction, error)// 获取指定的一笔事务GetTX(ctx context.Context, txID string) (*Transaction, error)// 锁住整个 TXStore 模块(要求为分布式锁)Lock(ctx context.Context, expireDuration time.Duration) error// 解锁TXStore 模块Unlock(ctx context.Context) error
}
- CreateTX:创建一条事务明细记录,会在入参中传入本事务涉及的 TCC 组件列表,同时需要在出参中返回全局唯一的事务 id
- TXUpdate:更新一条事务明细记录. 这里指的更新,针对于,事务中某个 TCC 组件 Try 响应状态的更新
- TXSubmit:提交一条事务的执行结果. 要么置为成功,要么置为失败
- GetHangingTXs:获取所有未完成的事务明细记录
- GetTX:根据事务 id,获取指定的一条事务明细记录
- Lock:锁住整个事务日志存储模块(要求为分布式锁)
- Unlock:解锁整个事务日志存储模块
RegistryCenter
最后是 TCC 组件的注册管理中心 RegistryCenter,负责给 txManager 提供注册和查询 TCC 组件的能力.
TXManager 核心源码讲解
理完了基本的流程和概念,下面我们一起开启一线实战环节.
TXManager
下面是关于事务协调器 TXManager 的几个核心字段:
- txStore:内置的事务日志存储模块,需要由使用方实现并完成注入
- registryCenter:TCC 组件的注册管理中心
- opts:内聚了一些 TXManager 的配置项,可以由使用方自定义,并通过 option 注入
- ctx:用于反映 TXManager 运行生命周期的的 context,当 ctx 终止时,异步轮询任务也会随之退出
- stop:用于停止 txManager 的控制器. 当 stop 被调用后,异步轮询任务会被终止
type TXManager struct {ctx context.Contextstop context.CancelFuncopts *OptionstxStore TXStoreregistryCenter *registryCenter
}func NewTXManager(txStore TXStore, opts ...Option) *TXManager {ctx, cancel := context.WithCancel(context.Background())txManager := TXManager{opts: &Options{},txStore: txStore,registryCenter: newRegistryCenter(),ctx: ctx,stop: cancel,}for _, opt := range opts {opt(txManager.opts)}repair(txManager.opts)go txManager.run()return &txManager
}
事务主流程
下面进入最核心的部分,介绍一下整个分布式事务的运行流程.
主流程
用户可以通过 txManager.Transaction 方法,一键启动动一个分布式事务流程,其中包含的几个核心步骤展示如下图:
txManager.Transaction 方法是用户启动分布式事务的入口,需要在入参中声明本次事务涉及到的组件以及需要在 Try 流程中传递给对应组件的请求参数:
type RequestEntity struct {// 组件名称ComponentID string `json:"componentName"`// Try 请求时传递的参数Request map[string]interface{} `json:"request"`
}
比如可以定义如下的参数:
componentAID := "componentA"
RequestEntity{ComponentID: componentAID,Request: map[string]interface{}{"biz_id": componentAID + "_biz", }
}
txManager.Transaction 对应源码如下,核心步骤均给出了注释. 核心的 try-confirm/cancel 流程,会在后续的 txManager.twoPhaseCommit 方法中展开.
// 启动事务
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {// 1 限制分布式事务执行时长tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)defer cancel()// 2 获得所有的涉及使用的 tcc 组件componentEntities, err := t.getComponents(tctx, reqs...)if err != nil {return false, err}// 3 调用 txStore 模块,创建新的事务明细记录,并取得全局唯一的事务 idtxID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)if err != nil {return false, err}// 4. 开启两阶段提交流程:try-confirm/cancelreturn t.twoPhaseCommit(ctx, txID, componentEntities)
}
接下来我们看一下twoPhaseCommit函数。
2PC 串联
func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {// 1 创建子 context 用于管理子 goroutine 生命周期// 手握 cancel 终止器,能保证在需要的时候终止所有子 goroutine 生命周期cctx, cancel := context.WithCancel(ctx)defer cancel()// 2 创建一个 chan,用于接收子 goroutine 传递的错误errCh := make(chan error)// 3 并发启动,批量执行各 tcc 组件的 try 流程go func() {// 通过 waitGroup 进行多个子 goroutine 的汇总var wg sync.WaitGroupfor _, componentEntity := range componentEntities {// shadowcomponentEntity := componentEntitywg.Add(1)// 并发执行各组件的 try 流程go func() {defer wg.Done()resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{ComponentID: componentEntity.Component.ID(),TXID: txID,Data: componentEntity.Request,})// 出现 tcc 组件执行 try 操作失败,则需要对事务明细记录进行更新,同时把错误通过 chan 抛给父 goroutineif err != nil || !resp.ACK {// 对对应的事务进行更新_ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())return}// try 请求成功,则对事务明细记录进行更新. 倘若更新失败,也要视为错误,抛给父 goroutineif err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {errCh <- err}}()}// 等待所有子 goroutine 运行完成wg.Wait()// 关闭 errCh,告知父 goroutine 所有任务已运行完成的信息close(errCh)}()successful := true// 4 通过 chan,阻塞子 goroutine 执行完成// 4.1 但凡出现一个子 goroutine 遇到了错误,则会提前接收到错误,并调用 cancel 方法熔断其他所有子 goroutine 流程// 4.2 倘若所有子 goroutine 都执行成功,则会通过 chan 的关闭事件推进流程,对应 err 为 nilif err := <-errCh; err != nil {// 只要有一笔 try 请求出现问题,其他的都进行终止cancel()successful = false}// 5 异步执行第二阶段的 confirm/cancel 流程// 之所以是异步,是因为实际上在第一阶段 try 的响应结果尘埃落定时,对应事务的成败已经有了定论// 第二阶段能够容忍异步执行的原因在于,执行失败时,还有轮询任务进行兜底go t.advanceProgressByTXID(txID)// 6 响应结果// 6.1 倘若所有 try 请求都成功,则 successful 为 try,事务成功// 6.2 但凡有一个 try 请求处理出现问题,successful 为 false,事务失败return successful, nil
}
该函数的入参是ctx,事务id以及所有的组件。
这段代码实现了一个分布式事务管理器中的两阶段提交(Two-Phase Commit, 2PC)过程。整个过程分为几个主要步骤:
- 创建子 Context 用于管理子 Goroutine 生命周期
使用 context.WithCancel 创建一个可取消的子上下文(cctx)和对应的取消函数(cancel)。这样,在需要的时候可以通过调用 cancel 函数来终止所有子 Goroutine。 - 创建错误通道
初始化一个错误通道(errCh),用于从子 Goroutine 接收错误信息。 - 并发启动,执行各组件的 Try 流程
通过一个匿名 Goroutine 并发地启动各组件的 Try 流程。
使用 sync.WaitGroup 来等待所有子 Goroutine 完成。
在子 Goroutine 中:
对每个组件执行 Try 方法。
如果 Try 方法执行失败,或者响应的 ACK 字段为 false,则更新事务状态并通过错误通道发送错误。
如果 Try 请求成功,更新事务状态。如果更新失败,也通过错误通道发送错误。 - 阻塞等待子 Goroutine 完成
从错误通道接收错误。如果接收到错误,调用 cancel 函数终止所有子 Goroutine,并设置 successful 标记为 false。 - 异步执行第二阶段的 Confirm/Cancel 流程
通过一个新的 Goroutine 调用 advanceProgressByTXID 函数来异步处理事务的 Confirm 或 Cancel 阶段。
第二阶段的执行可以是异步的,因为第一阶段的结果已经确定了事务是否成功。 - 返回结果
返回 successful 标记和 nil(无错误)。如果所有 Try 请求成功,successful 为 true,表示事务成功;如果任一 Try 请求失败,successful 为 false,表示事务失败。
现在我们看一下advanceProgressByTXID函数
事务进度推进
当一笔事务在第一阶段中所有的 Try 请求都有了响应后,就需要根据第一阶段的结果,执行第二阶段的 Confirm 或者 Cancel 操作,并且将事务状态推进为成功或失败的终态:
- 倘若所有组件的 Try 响应都是成功,则需要批量调用组件的 Confirm 接口,并在这之后将事务状态更新为成功
- 倘若存在某个组件 Try 响应为失败,则需要批量调用组件的 Cancel 接口,并在这之后将事务状态更新为失败
- 倘若当前事务已执行超时,同样需要批量调用组件的 Cancel 接口,并在这之后将事务状态更新为失败
// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgressByTXID(txID string) error {// 获取事务日志明细tx, err := t.txStore.GetTX(t.ctx, txID)if err != nil {return err}// 推进进度return t.advanceProgress(tx)
}
// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgress(tx *Transaction) error {// 1 推断出事务当前的状态// 1.1 倘若所有组件 try 都成功,则为 successful// 1.2 倘若存在组件 try 失败,则为 failure// 1.3 倘若事务超时了,则为 failure// 1.4 否则事务状态为 hangingtxStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))// hanging 状态的事务暂时不处理if txStatus == TXHanging {return nil}// 2 根据事务是否成功,定制不同的处理函数success := txStatus == TXSuccessfulvar confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)var txAdvanceProgress func(ctx context.Context) errorif success {// 如果事务成功,则需要对组件进行 confirmconfirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {return component.Confirm(ctx, tx.TXID)}// 如果事务成功,则需要在最后更新事务日志记录的状态为成功txAdvanceProgress = func(ctx context.Context) error {return t.txStore.TXSubmit(ctx, tx.TXID, true)}} else {// 如果事务失败,则需要对组件进行 cancelconfirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) { return component.Cancel(ctx, tx.TXID)}// 如果事务失败,则需要在最后更新事务日志记录的状态为失败txAdvanceProgress = func(ctx context.Context) error { return t.txStore.TXSubmit(ctx, tx.TXID, false)}}// 3 批量调用组件,执行第二阶段的 confirm/cancel 操作for _, component := range tx.Components {// 获取对应的 tcc componentcomponents, err := t.registryCenter.getComponents(component.ComponentID)if err != nil || len(components) == 0 {return errors.New("get tcc component failed")}//components是一个数组,这里取一个元素 resp, err := confirmOrCancel(t.ctx, components[0])if err != nil {return err}if !resp.ACK {return fmt.Errorf("component: %s ack failed", component.ComponentID)}}// 4 二阶段 confirm/cancel 操作都执行完成后,对事务状态进行提交return txAdvanceProgress(t.ctx)
}
这段代码是一个事务管理器 (TXManager) 中的 advanceProgress 函数,它用于处理事务的第二阶段操作(即确认或取消)在分布式事务的两阶段提交协议中。以下是代码的详细解读和步骤概述:
- 判断事务当前状态
- 根据事务的情况推断当前状态。状态判断依据包括:
- 成功(TXSuccessful):如果所有组件的 try 都成功。
- 失败(TXFailure):如果任何组件的 try 失败,或者事务超时。
- 挂起(TXHanging):如果事务状态未明确为成功或失败。
- 如果事务处于挂起状态,函数直接返回,不进行进一步处理。
- 根据事务状态选择相应的操作
- 根据事务的成功或失败状态,选择 confirm 或 cancel 操作:
- 成功时:使用 confirm 函数处理每个组件。
- 失败时:使用 cancel 函数处理每个组件。
- 准备一个函数 txAdvanceProgress 来在所有组件处理完毕后更新事务日志的状态。
- 执行第二阶段的 Confirm/Cancel 操作
- 对于事务中的每个组件:
- 从注册中心获取相应的 TCC 组件。
- 执行 confirmOrCancel 函数(根据事务状态决定是确认还是取消)。
- 检查操作的响应,如果响应不成功,则返回错误。
- 提交事务状态
- 在所有组件的第二阶段操作执行完毕后,使用 txAdvanceProgress 函数提交事务的最终状态。
总结:这个函数实现了分布式事务两阶段提交协议的第二阶段。它首先判断事务的当前状态,然后根据这个状态对事务中的每个组件执行确认(Confirm)或取消(Cancel)操作。最后,它更新事务的最终状态。这是分布式事务管理中非常关键的一部分,确保了事务的一致性和完整性。
异步轮询流程
接下来聊聊 txManager 的异步轮询流程. 这个流程同样非常重要,是支撑 txManager 鲁棒性的重要机制.
倘若存在事务已经完成第一阶段 Try 操作的执行,但是第二阶段没执行成功,则需要由异步轮询流程进行兜底处理,为事务补齐第二阶段的操作,并将事务状态更新为终态。
启动时机
异步轮询任务是在 txManager 的初始化流程中启动的,通过异步 goroutine 持久运行:
go txManager.run()
轮询流程
异步轮询任务运行时,基于 for 循环 + select 多路复用的方式,实现定时任务的执行.
轮询的时间间隔会根据一轮任务处理过程中是否出现错误,而进行动态调整. 这里调整规则指的是:当一次处理流程中发生了错误,就需要调大当前节点轮询的时间间隔,让其他节点的异步轮询任务得到更大的执行机会.
func (t *TXManager) run() {var tick time.Durationvar err error// 1 for 循环自旋式运行任务for {// 如果处理过程中出现了错误,需要增长轮询时间间隔if err == nil {tick = t.opts.MonitorTick} else {tick = t.backOffTick(tick)}// select 多路复用select {// 倘若 txManager.ctx 被终止,则异步轮询任务退出case <-t.ctx.Done():return// 2 等待 tick 对应时长后,开始执行任务case <-time.After(tick):// 对 txStore 加Redis分布式锁,避免分布式服务下多个服务节点的轮询任务重复执行if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {// 取锁失败时(大概率被其他节点占有),不需要增加 tick 时长err = nilcontinue}// 3 获取处于 hanging 状态的事务var txs []*Transactionif txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {_ = t.txStore.Unlock(t.ctx)continue}// 4 批量推进事务进度err = t.batchAdvanceProgress(txs)_ = t.txStore.Unlock(t.ctx)}}
}
这段 Go 语言代码定义了一个名为 TXManager 的结构体的 run 方法。该方法实现了一个异步任务处理流程,主要用于处理分布式事务。我将按照代码中的逻辑分步骤解读:
- 循环运行:
- 方法使用一个 for 循环,这意味着它会不断地运行,直到满足某个退出条件。
- 错误处理和轮询间隔:
- 在每次循环的开始,根据之前的执行是否出错来决定轮询时间间隔 tick。如果没有错误发生,则使用 t.opts.MonitorTick 作为间隔;如果发生错误,则调用 t.backOffTick(tick) 来增加轮询间隔。
- select 语句多路复用:
- 使用 select 语句来同时处理多种情况。
- 第一种情况是 t.ctx.Done() 通道收到消息,这通常意味着 TXManager 的上下文被取消或终止,此时方法返回,结束运行。
- 第二种情况是等待 tick 时间后执行任务。
- 获取和处理分布式事务:
- 首先尝试对 txStore 加分布式锁。如果锁定失败(可能是因为其他节点已经占用锁),则跳过当前循环迭代,继续等待下一个 tick。
- 如果成功加锁,则继续获取处于挂起(hanging)状态的事务列表。
- 如果获取事务列表时发生错误,则释放锁并继续下一个循环迭代。
- 批量处理事务:
- 对获取到的事务列表进行批量处理,推进事务进度。
- 处理完毕后释放之前获取的锁。
- 错误处理和轮询调整:
- 在整个过程中,如果任何步骤出现错误,该错误会被记录并用于调整下一次的轮询间隔。
有关于轮询时间间隔的退避谦让策略为:每次对时间间隔进行翻倍,封顶为初始时长的 8 倍:
func (t *TXManager) backOffTick(tick time.Duration) time.Duration {tick <<= 1if threshold := t.opts.MonitorTick << 3; tick > threshold {return threshold}return tick
}
批量推进事务进度
下面是异步轮询任务批量推进事务第二阶段执行的流程,核心是开启多个 goroutine 并发对多项事务进行处理:
func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {// 1 创建一个 chan,用于接收子 goroutine 传输的 errerrCh := make(chan error)go func() {// 2 通过 waitGroup 聚合多个子 groutinevar wg sync.WaitGroupfor _, tx := range txs {// shadowtx := txwg.Add(1)go func() {defer wg.Done()// 3 推进每笔事务的进度if err := t.advanceProgress(tx); err != nil {// 遇到错误则投递到 errCherrCh <- err}}()}// 4 收口等待所有子 goroutine 执行完成wg.Wait()// 5 所有子 goroutine 执行完成后关闭 chan,唤醒阻塞等待的父 goroutineclose(errCh)}()// 记录遇到的第一个错误var firstErr error// 6 父 goroutine 通过 chan 阻塞在这里,直到所有 goroutine 执行完成,chan 被 close 才能往下for err := range errCh {// 记录遇到的第一个错误if firstErr != nil {continue}firstErr = err}// 7 返回错误,核心是标识执行过程中,是否发生过错误return firstErr
}