Seata介绍

news/2024/10/28 0:28:03/

  Seata是一款开源的分布式事务解决方案

运行机制

Seata通过TC(事务协调者)、TM(事务管理器)、RM(资源管理器)来实现分布式事务。

【以下分析基于AT模式】

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TC由seata-server担当。

TC对全局事务的协调,通过对表global_table, branch_table的操作来实现。

当TM发起开启全局事务请求时,TC向global_table插入一条数据,并返回xid给TM;

然后TM通过rpc调用需要的RM,并传递xid给RM,RM向TC查询xid对应的全局事务

是否存在,存在则向TC注册分支,注册时TC向branch_table插入一条数据。

RM执行完之后,会从branch_table中删除对应的分支数据。

全局事务执行过程中,会更新global_table中数据的状态:

update global_table   set status = ?,       gmt_modified = now() where xid = ?

全局事务执行完成后,会从global_table中删除已经完成的全局事务。TC中有job不停地根据

状态扫描global_table中的数据。TC的job在io.seata.server.coordinator.DefaultCoordinator中定义。

以下是使用seata官网的示例在本地运行时,打印的sql debug日志:

	15:08:28.500 DEBUG --- [rverHandlerThread_1_5_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())15:08:28.778  INFO --- [rverHandlerThread_1_5_500] i.s.s.coordinator.DefaultCoordinator     : Begin new global transaction applicationId: business-seata-example,transactionServiceGroup: my_test_tx_group, transactionName: dubbo-gts-seata-example,timeout:300000,xid:192.168.101.71:8091:486419889390418739315:08:28.814  INFO --- [     batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata,lockKey=t_stock:2,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.820 DEBUG --- [rverHandlerThread_1_6_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  from global_table where xid = ?15:08:28.877 DEBUG --- [rverHandlerThread_1_6_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) : insert into branch_table(xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(6), now(6))15:08:28.881  INFO --- [rverHandlerThread_1_6_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.101.71:8091:4864198893904187393, branchId = 4864198893904187396, resourceId = jdbc:mysql://127.0.0.1:3306/seata ,lockKeys = t_stock:215:08:28.913  INFO --- [     batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata,lockKey=t_account:2,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.913 DEBUG --- [rverHandlerThread_1_7_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  from global_table where xid = ?15:08:28.920 DEBUG --- [rverHandlerThread_1_7_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) : insert into branch_table(xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(6), now(6))15:08:28.924  INFO --- [rverHandlerThread_1_7_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.101.71:8091:4864198893904187393, branchId = 4864198893904187398, resourceId = jdbc:mysql://127.0.0.1:3306/seata ,lockKeys = t_account:215:08:28.940  INFO --- [     batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata,lockKey=t_order:104,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.940 DEBUG --- [rverHandlerThread_1_8_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  from global_table where xid = ?15:08:28.948 DEBUG --- [rverHandlerThread_1_8_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) : insert into branch_table(xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(6), now(6))15:08:28.950  INFO --- [rverHandlerThread_1_8_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.101.71:8091:4864198893904187393, branchId = 4864198893904187400, resourceId = jdbc:mysql://127.0.0.1:3306/seata ,lockKeys = t_order:10415:08:28.970  INFO --- [     batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,extraData=null,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.974 DEBUG --- [rverHandlerThread_1_9_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  from global_table where xid = ?15:08:28.978 DEBUG --- [rverHandlerThread_1_9_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : queryBranchTransactionDO(String xid) : select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified  from branch_table where xid = ? order by gmt_create asc15:08:28.985 DEBUG --- [rverHandlerThread_1_9_500] i.s.s.s.db.store.LogStoreDataBaseDAO     : updateGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : update global_table   set status = ?,       gmt_modified = now() where xid = ?15:08:29.808 DEBUG --- [      AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO     : queryBranchTransactionDO(List<String> xids) : select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified  from branch_table where xid in (?) order by gmt_create asc15:08:29.841 DEBUG --- [      AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO     : deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) : delete from branch_table where xid = ?   and branch_id = ?15:08:29.857 DEBUG --- [      AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO     : deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) : delete from branch_table where xid = ?   and branch_id = ?15:08:29.866 DEBUG --- [      AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO     : deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) : delete from branch_table where xid = ?   and branch_id = ?15:08:29.869 DEBUG --- [      AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO     : updateGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : update global_table   set status = ?,       gmt_modified = now() where xid = ?15:08:29.874 DEBUG --- [      AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO     : deleteGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : delete from global_table where xid = ?

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

全局事务由@GlobalTransactional开启。标注有@GlobalTransactional的方法,会被

GlobalTransactionalInterceptor拦截处理。当全局事务为null时,创建一个全局事务,

初始化的全局事务的role=GlobalTransactionalRole.Launcher, status=GlobalStatus.Unknown.

只有TM才能开启全局事务。一个服务既可以充当TM,也可以充当RM。

开启事务时,会从TC(即seata-server)获取一个全局事务id, xid,并将该xid绑定到

RootContext中,即当前线程的上下文中。

( 问题1:TC端把这个xid保存在哪?以存储模式db为例  答:保存在global_table中)

以下是TM开启全局事务的源码:

DefaultGlobalTransaction.javapublic void begin(int timeout, String name) throws TransactionException {if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();...return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}xid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);...}

以下是TM开启全局事务-执行业务逻辑-提交|回滚全局事务的源码:

TransactionalTemplate.javapublic Object execute(TransactionalExecutor business) throws Throwable {...try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.beginTransaction(txInfo, tx);Object rs;try {// Do Your Businessrs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}
...}

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

使用时机

  当一个业务流程需要调度多个分布在不同节点的服务时。

示例

  seata-samples/seata-spring-boot-starter-samples at master · seata/seata-samples · GitHub

Question1: Seata AT 模式:”一阶段:业务数据和回滚日志记录在同一个本地事务中提交。。。 二阶段:提交异步化。。。“, 
    一阶段添加业务数据和插入回滚日志:
    回滚日志日志插入的调用链如下:

通过rpc调度目标服务 >> org.apache.ibatis.binding.MapperProxy.invoke() >> org.mybatis.spring.SqlSessionTemplate.update() >> org.apache.ibatis.executor.statement.PreparedStatementHandler.update()>> io.seata.rm.datasource.PreparedStatementProxy.execute() L55>> io.seata.rm.datasource.exec.ExecuteTemplate.execute() L50>> io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute() L113>> io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.doExecute() L82>> io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.executeAutoCommitTrue() L139>> io.seata.rm.datasource.ConnectionProxy.commit() L187>> io.seata.rm.datasource.undo.AbstractUndoLogManager.flushUndoLogs() L242>> io.seata.rm.datasource.undo.mysql.MySQLUndoLogManager.insertUndoLogWithNormal()

Question2: 二阶段提交的是什么?

// 发送提交结果消息给TC,异步批量删除undo_log
RmBranchCommitProcessor.handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest)

Question3: undo_log的添加、删除与业务数据的提交、第一阶段、第二阶段的先后顺序是怎样的?
    第一阶段 添加业务数据并新增undo_log;
    第二阶段异步执行,报告第一阶段执行结果并删除undo_log
        日志删除的调用链如下:

//sql: DELETE FROM undo_log WHERE  branch_id IN  (?)  AND xid IN  (?) AbstractUndoLogManager::batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn)AsyncWorker::deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts)AsyncWorker::dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts)AsyncWorker::doBranchCommit()AsyncWorker::doBranchCommitSafely()AsyncWorker::addToCommitQueue(Phase2Context context)AsyncWorker::branchCommit(String xid, long branchId, String resourceId)DataSourceManager::branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData)AbstractRMHandler::doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)AbstractRMHandler::handle(BranchCommitRequest request)BranchCommitRequest::handle(RpcContext rpcContext)AbstractRMHandler::onRequest(AbstractMessage request, RpcContext context)RmBranchCommitProcessor::handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest)RmBranchCommitProcessor::process(ChannelHandlerContext ctx, RpcMessage rpcMessage)AbstractNettyRemoting::processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)ClientHandler::channelRead(final ChannelHandlerContext ctx, Object msg)AbstractNettyRemotingClientclientBootstrap.setChannelHandlers(new ClientHandler());	L134

参考:Seata 是什么


http://www.ppmy.cn/news/645498.html

相关文章

数据库上机考试B试卷

2022-2023-2学期《数据库原理及应用》上机测评B卷 &#xff08;时间&#xff1a;100分钟&#xff09; 学号&#xff1a;__ 姓名&#xff1a;_________ 成绩&#xff1a;______________ 说明&#xff1a;本测试为学生独立完成&#xff0c;请遵守学校考试规定&#xff0c;不得…

uniapp项目 封装一个饼图组件 并且修改显示项的排列方式

需求如下: 真实数据渲染后的完成效果如下: 记录一下代码: <template><view><view style"height: 600rpx;"><l-echart ref"chart" finished"init"></l-echart></view></view> </template><…

最近要看的网址

http://www.msdnwebcast.com/webcast/1986.aspx http://www.silverlight.net/getstarted/ MVC的控件 http://www.telerik.com/products/winforms.aspx 转载于:https://www.cnblogs.com/Smalltalk/archive/2009/09/16/1568151.html

最近需要看的网站

http://en.csharp-online.net/CSharp_Code_Snippets http://www.codeguru.com/video/ http://www.java2s.com/Code/CSharp/CatalogCSharp.htm http://www.example-code.com/ http://java.sys-con.com/node/325148 转载于:https://www.cnblogs.com/softwareking/archive/2011/04…

盘点那些免费视频网址到底有多爽

盘点那些免费视频网址到底有多爽 1 https://www.bilibili.com/ 2 https://www.bttwo.com/ 3 http://www.baishixi.com/

分享几个可能用网站

python3官方中文文档 爬虫框架网站scrapy PIP使用国内镜像提升下载速度和安装成功率 pychearts使用官方网站地址 pycharm专业版激活 splash官方文档 splash介绍网址 appnium安装教程 Docker的安装——ubuntu中 Docker加速 目前为止最全的微信小程序项目实例 数据来源国外 分布…

不要VIP,想看啥就看啥的在线网站!

有句话说得好&#xff0c;我们舍得花300块买衣服&#xff0c;却舍不得掏20块充视频VIP&#xff0c;毕竟充了也不是天天都看&#xff0c;那岂不是亏大发了&#xff0c;出于种种原因&#xff0c;我们有时候不得不做伸手党&#xff0c;诺&#xff0c;手伸出来&#xff0c;我给你啊…

cad快看_星期日来啦!分享5个珍藏已久的电影网站,各种大片免费看

每周最令人期待的一天就是星期日了&#xff0c;大部分人都可以好好休假一天&#xff0c;但是在家无聊&#xff0c;我们一般都会找一些电影看&#xff0c;然而很多想看的不是要会员就是要付费&#xff0c;很令人头疼。 不过这些都是小事儿&#xff0c;今天我就来分享5个珍藏已久…