Seata是一款开源的分布式事务解决方案
运行机制
Seata通过TC(事务协调者)、TM(事务管理器)、RM(资源管理器)来实现分布式事务。
【以下分析基于AT模式】
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TC由seata-server担当。
TC对全局事务的协调,通过对表global_table, branch_table的操作来实现。
当TM发起开启全局事务请求时,TC向global_table插入一条数据,并返回xid给TM;
然后TM通过rpc调用需要的RM,并传递xid给RM,RM向TC查询xid对应的全局事务
是否存在,存在则向TC注册分支,注册时TC向branch_table插入一条数据。
RM执行完之后,会从branch_table中删除对应的分支数据。
全局事务执行过程中,会更新global_table中数据的状态:
update global_table set status = ?, gmt_modified = now() where xid = ?
全局事务执行完成后,会从global_table中删除已经完成的全局事务。TC中有job不停地根据
状态扫描global_table中的数据。TC的job在io.seata.server.coordinator.DefaultCoordinator中定义。
以下是使用seata官网的示例在本地运行时,打印的sql debug日志:
15:08:28.500 DEBUG --- [rverHandlerThread_1_5_500] i.s.s.s.db.store.LogStoreDataBaseDAO : insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())15:08:28.778 INFO --- [rverHandlerThread_1_5_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: business-seata-example,transactionServiceGroup: my_test_tx_group, transactionName: dubbo-gts-seata-example,timeout:300000,xid:192.168.101.71:8091:486419889390418739315:08:28.814 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata,lockKey=t_stock:2,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.820 DEBUG --- [rverHandlerThread_1_6_500] i.s.s.s.db.store.LogStoreDataBaseDAO : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified from global_table where xid = ?15:08:28.877 DEBUG --- [rverHandlerThread_1_6_500] i.s.s.s.db.store.LogStoreDataBaseDAO : insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) : insert into branch_table(xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(6), now(6))15:08:28.881 INFO --- [rverHandlerThread_1_6_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.101.71:8091:4864198893904187393, branchId = 4864198893904187396, resourceId = jdbc:mysql://127.0.0.1:3306/seata ,lockKeys = t_stock:215:08:28.913 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata,lockKey=t_account:2,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.913 DEBUG --- [rverHandlerThread_1_7_500] i.s.s.s.db.store.LogStoreDataBaseDAO : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified from global_table where xid = ?15:08:28.920 DEBUG --- [rverHandlerThread_1_7_500] i.s.s.s.db.store.LogStoreDataBaseDAO : insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) : insert into branch_table(xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(6), now(6))15:08:28.924 INFO --- [rverHandlerThread_1_7_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.101.71:8091:4864198893904187393, branchId = 4864198893904187398, resourceId = jdbc:mysql://127.0.0.1:3306/seata ,lockKeys = t_account:215:08:28.940 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata,lockKey=t_order:104,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.940 DEBUG --- [rverHandlerThread_1_8_500] i.s.s.s.db.store.LogStoreDataBaseDAO : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified from global_table where xid = ?15:08:28.948 DEBUG --- [rverHandlerThread_1_8_500] i.s.s.s.db.store.LogStoreDataBaseDAO : insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) : insert into branch_table(xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(6), now(6))15:08:28.950 INFO --- [rverHandlerThread_1_8_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.101.71:8091:4864198893904187393, branchId = 4864198893904187400, resourceId = jdbc:mysql://127.0.0.1:3306/seata ,lockKeys = t_order:10415:08:28.970 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.101.71:8091:4864198893904187393,extraData=null,clientIp:192.168.101.71,vgroup:my_test_tx_group15:08:28.974 DEBUG --- [rverHandlerThread_1_9_500] i.s.s.s.db.store.LogStoreDataBaseDAO : queryGlobalTransactionDO(String xid) : select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified from global_table where xid = ?15:08:28.978 DEBUG --- [rverHandlerThread_1_9_500] i.s.s.s.db.store.LogStoreDataBaseDAO : queryBranchTransactionDO(String xid) : select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified from branch_table where xid = ? order by gmt_create asc15:08:28.985 DEBUG --- [rverHandlerThread_1_9_500] i.s.s.s.db.store.LogStoreDataBaseDAO : updateGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : update global_table set status = ?, gmt_modified = now() where xid = ?15:08:29.808 DEBUG --- [ AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO : queryBranchTransactionDO(List<String> xids) : select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified from branch_table where xid in (?) order by gmt_create asc15:08:29.841 DEBUG --- [ AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO : deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) : delete from branch_table where xid = ? and branch_id = ?15:08:29.857 DEBUG --- [ AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO : deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) : delete from branch_table where xid = ? and branch_id = ?15:08:29.866 DEBUG --- [ AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO : deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) : delete from branch_table where xid = ? and branch_id = ?15:08:29.869 DEBUG --- [ AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO : updateGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : update global_table set status = ?, gmt_modified = now() where xid = ?15:08:29.874 DEBUG --- [ AsyncCommitting_1_1] i.s.s.s.db.store.LogStoreDataBaseDAO : deleteGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) : delete from global_table where xid = ?
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
全局事务由@GlobalTransactional开启。标注有@GlobalTransactional的方法,会被
GlobalTransactionalInterceptor拦截处理。当全局事务为null时,创建一个全局事务,
初始化的全局事务的role=GlobalTransactionalRole.Launcher, status=GlobalStatus.Unknown.
只有TM才能开启全局事务。一个服务既可以充当TM,也可以充当RM。
开启事务时,会从TC(即seata-server)获取一个全局事务id, xid,并将该xid绑定到
RootContext中,即当前线程的上下文中。
( 问题1:TC端把这个xid保存在哪?以存储模式db为例 答:保存在global_table中)
以下是TM开启全局事务的源码:
DefaultGlobalTransaction.javapublic void begin(int timeout, String name) throws TransactionException {if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();...return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}xid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);...}
以下是TM开启全局事务-执行业务逻辑-提交|回滚全局事务的源码:
TransactionalTemplate.javapublic Object execute(TransactionalExecutor business) throws Throwable {...try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.beginTransaction(txInfo, tx);Object rs;try {// Do Your Businessrs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}
...}
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
使用时机
当一个业务流程需要调度多个分布在不同节点的服务时。
示例
seata-samples/seata-spring-boot-starter-samples at master · seata/seata-samples · GitHub
Question1: Seata AT 模式:”一阶段:业务数据和回滚日志记录在同一个本地事务中提交。。。 二阶段:提交异步化。。。“,
一阶段添加业务数据和插入回滚日志:
回滚日志日志插入的调用链如下:
通过rpc调度目标服务 >> org.apache.ibatis.binding.MapperProxy.invoke() >> org.mybatis.spring.SqlSessionTemplate.update() >> org.apache.ibatis.executor.statement.PreparedStatementHandler.update()>> io.seata.rm.datasource.PreparedStatementProxy.execute() L55>> io.seata.rm.datasource.exec.ExecuteTemplate.execute() L50>> io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute() L113>> io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.doExecute() L82>> io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.executeAutoCommitTrue() L139>> io.seata.rm.datasource.ConnectionProxy.commit() L187>> io.seata.rm.datasource.undo.AbstractUndoLogManager.flushUndoLogs() L242>> io.seata.rm.datasource.undo.mysql.MySQLUndoLogManager.insertUndoLogWithNormal()
Question2: 二阶段提交的是什么?
// 发送提交结果消息给TC,异步批量删除undo_log
RmBranchCommitProcessor.handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest)
Question3: undo_log的添加、删除与业务数据的提交、第一阶段、第二阶段的先后顺序是怎样的?
第一阶段 添加业务数据并新增undo_log;
第二阶段异步执行,报告第一阶段执行结果并删除undo_log
日志删除的调用链如下:
//sql: DELETE FROM undo_log WHERE branch_id IN (?) AND xid IN (?) AbstractUndoLogManager::batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn)AsyncWorker::deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts)AsyncWorker::dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts)AsyncWorker::doBranchCommit()AsyncWorker::doBranchCommitSafely()AsyncWorker::addToCommitQueue(Phase2Context context)AsyncWorker::branchCommit(String xid, long branchId, String resourceId)DataSourceManager::branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData)AbstractRMHandler::doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)AbstractRMHandler::handle(BranchCommitRequest request)BranchCommitRequest::handle(RpcContext rpcContext)AbstractRMHandler::onRequest(AbstractMessage request, RpcContext context)RmBranchCommitProcessor::handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest)RmBranchCommitProcessor::process(ChannelHandlerContext ctx, RpcMessage rpcMessage)AbstractNettyRemoting::processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)ClientHandler::channelRead(final ChannelHandlerContext ctx, Object msg)AbstractNettyRemotingClientclientBootstrap.setChannelHandlers(new ClientHandler()); L134
参考:Seata 是什么