背景:
在 事务-1 事务隔离级别和Spring事务传播机制 中对事务的特性、隔离级别、Spring事务的传播机制结合案例进行了分析;在 事务-2 Spring与Mybatis事务实现原理
中对JDBC、Mybatis、Spring整合Mybatis实现事务的原理结合框架源码进行了介绍,过程中对SqlSession和SqlSessionTemplate的线程安全性也进行了说明。
本文以前两篇文章为基础,补充了一些遗漏的知识点(偏向于Spring),建议读者先阅读完上述两篇文章,有任何疑问或问题可以在品论区留言。
1.Spring事务
ORM框架是连接代码和数据库的桥梁,Spring作为基础框架提供了Spring Data JPA,也提供了适配其他ORM框架的能力,如集成Mybatis和Hibernate等。需要注意ORM框架提供的事务能力依赖于数据库事务,是对数据库事务的一层封装;如果底层数据库不支持事务(如Mysql的MyISAM引擎),在此之上的所有数据库或者DAO操作都无事务特性。
Spring事务提供了两种使用方式:声明式 (通过@Transactional注解的方式) 和编程式 (使用TransactionTemplate包裹代码方式)。其中,声明式使用较为简单且不容易出错;编程式相对比较灵活,可以进行细粒度控制。
声明式和编程式的底层实现原理相同,因编程式不常见,下文以声明式为例对Spring事务进行介绍。
对于声明式事务,Spring通过AOP机制将业务代码包裹起来,在业务代码前后开启和关闭事务,根据执行情况以及自定义条件进行事务的提交或回滚;提交/回滚时需要先获取对应的Connection对象,然后基于该对象进行。如何保证执行sql语句时的Connection对象和提交/回滚时的Connection对象为同一对象,以及同一线程在不同事务上下文的执行过程中获取的Connection对象为不是同一个;前者说明了Spring事务的底层原理,后者提供了线程安全保证。
这部分内容是本文的重点内容,也是理解Spring事务概念的关键;其实读者心里已经有概念了:ThreadLocal。
2.使用方式
参考: 事务-1 事务隔离级别和Spring事务传播机制 和 事务-2 Spring与Mybatis事务实现原理
3.实现原理
Spring Boot项目中常见的ORM框架有JPA、Hibernate、Mybatis, 本章节以Spring Boot整合Mybatis的业务场景为例对Spring事务进行说明。
3.1 Spring事务组件
在Spring项目中,可以通过xml配置(配置AOP)或注解(@EnableTransactionManagement)的方式开启事务功能;但都需要定义PlatformTransactionManager类型的Bean对象, 如下所示:
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"><property name="dataSource" ref="dataSource"></property>
</bean>
而SpringBoot项目通过自动装配机制,默认开启了Spring事务。
Spring为支持事务功能提供了InfrastructureAdvisorAutoProxyCreator后置处理器、TransactionInterceptor拦截器和BeanFactoryTransactionAttributeSourceAdvisor增强等组件,在本章节后续中会陆续进行介绍。
3.2 代理对象
InfrastructureAdvisorAutoProxyCreator同 Spring系列-8 AOP使用与原理 文中介绍的AnnotationAwareAspectJAutoProxyCreator均为AbstractAdvisorAutoProxyCreator的实现类,而AOP逻辑被封装在了AbstractAdvisorAutoProxyCreator中,且在Spring系列-8 AOP使用与原理 中已对该部分进行了较为全面地介绍,因此相同部分认为读者已知,本文不再赘述。
InfrastructureAdvisorAutoProxyCreator作为BPP,在对象初始化后期会调用InfrastructureAdvisorAutoProxyCreator的postProcessAfterInitialization方法:
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {if (bean != null) {Object cacheKey = getCacheKey(bean.getClass(), beanName);if (this.earlyProxyReferences.remove(cacheKey) != bean) {return wrapIfNecessary(bean, beanName, cacheKey);}}return bean;
}
不考虑循环依赖场景,进入wrapIfNecessary(bean, beanName, cacheKey)
方法:
// 省略缓存、日志、异常分支
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);if (specificInterceptors != DO_NOT_PROXY) {return createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));}return bean;
}
逻辑较为清晰:先通过getAdvicesAndAdvisorsForBean获取增强逻辑,后然通过动态代理技术将增强逻辑织入到目标对象中。
跟进getAdvicesAndAdvisorsForBean
方法,查看获取增强逻辑部分的源码实现:
// AbstractAdvisorAutoProxyCreator类中
protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {List<Advisor> candidateAdvisors = findCandidateAdvisors();List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);extendAdvisors(eligibleAdvisors);if (!eligibleAdvisors.isEmpty()) {eligibleAdvisors = sortAdvisors(eligibleAdvisors);}return eligibleAdvisors;
}
从IOC中获取所有Avisor类型的Bean对象,过滤出符合条件的部分,排序后返回;整体逻辑同Spring系列-8 AOP使用与原理,区别在于过滤部分:findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
。
过滤逻辑如下:
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {if (advisor instanceof IntroductionAdvisor) {return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);} else if (advisor instanceof PointcutAdvisor) {PointcutAdvisor pca = (PointcutAdvisor) advisor;return canApply(pca.getPointcut(), targetClass, hasIntroductions);} else {// It doesn't have a pointcut so we assume it applies.return true;}
}
入参:
(1) Advisor advisor:为BeanFactoryTransactionAttributeSourceAdvisor类型,来自IOC容器;
(2) Class<?> targetClass:目标类的class对象;
(3) boolean hasIntroductions:事务增强不存在引介,为false;
BeanFactoryTransactionAttributeSourceAdvisor为PointcutAdvisor子类,进一步跟踪canApply方法:
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {MethodMatcher methodMatcher = pc.getMethodMatcher();Set<Class<?>> classes = new LinkedHashSet<>();classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));for (Class<?> clazz : classes) {Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);for (Method method : methods) {if (methodMatcher.matches(method, targetClass)) {return true;}}}return false;
}
其中,methodMatcher.matches(method, targetClass)
会返回方法是否添加了@Transactional注解,通过computeTransactionAttribute
方法解析得到的TransactionAttribute是否为空进行确定:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {// Don't allow no-public methods as required.if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {return null;}// The method may be on an interface, but we need attributes from the target class.// If the target class is null, the method will be unchanged.Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);// First try is the method in the target class.TransactionAttribute txAttr = findTransactionAttribute(specificMethod);if (txAttr != null) {return txAttr;}// Second try is the transaction attribute on the target class.txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {return txAttr;}//。。。 省略return null;
}
首先判断方法是否为public, 非public方法返回false;尝试从目标方法中解析出TransactionAttribute对象,如果不为空则返回;否则再次尝试从目标类对象中解析TransactionAttribute对象。
因此,需要满足以下条件,才可以生产Spring事务代理对象,实现事务功能:
(1)目标对象是被IOC管理的Bean对象;
(2)需要在类或者方法上注解@Transactional;方法中的优先级高于类;类上的注解对整个类中的方法生效;
(3)要求方法必须为public和非static;
本节最后再看一下BeanFactoryTransactionAttributeSourceAdvisor增强对象的拦截器:
当完成代理后,TransationInterceptor拦截器作为增强逻辑被织入到了目标对象中;当目标方法被调用时,进行拦截器逻辑。
3.3 拦截器
当业务代码被调用时,进入动态代理的拦截器即TransactionInterceptor的invoke方法中:
public Object invoke(MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
MethodInvocation对象作为CGLIB代理拦截器方法的入参,持有目标对象的类型信息、目标方法以及调用目标方法的逻辑,分别对应上述:AopUtils.getTargetClass(invocation.getThis())
、invocation.getMethod()
、invocation::proceed
.
invokeWithinTransaction的主线逻辑如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, InvocationCallback invocation) {// ⚠️1.开启事务TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);Object retVal;try {// ⚠️2.直接调用目标方法,得到返回结果retVal = invocation.proceedWithInvocation();} catch (Throwable ex) {// ⚠️3.异常场景处理-根据条件判断是否rollbackcompleteTransactionAfterThrowing(txInfo, ex);throw ex;} finally {cleanupTransactionInfo(txInfo);}// ⚠️4.正常场景处理-commitcommitTransactionAfterReturning(txInfo);return retVal;}
}
step1: 开启事务
Spring通过TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
开启事务,并将信息存储在TransactionInfo对象中。
在进入代码之前,先了解一下TransactionInfo对象的组成结构:
上图可以形象地表示为:
TransactionInfo持有TransactionStatus对象;
TransactionStatus持有DataSourceTransactionObject
DataSourceTransactionObject持有ConnectionHolder
ConnectionHolder持有connection对象;
即,TransactionInfo属性中保存着connection对象信息。
跟进createTransactionIfNecessary方法的主线逻辑:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {txAttr = new DelegatingTransactionAttribute(txAttr) {@Override public String getName() { return joinpointIdentification;}};TransactionStatus status = tm.getTransaction(txAttr);return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
先介绍一下如参:
(1) PlatformTransactionManager tm: 从IOC容器中根据TransactionManager获取的Bean对象,此时为DataSourceTransactionManager类型;
(2) TransactionAttribute txAttr: 对@Transactional注解信息进行的封装;
(3) String joinpointIdentification:字符串类型,目标方法对应的 “包名.类名.方法名”。
createTransactionIfNecessary方法逻辑线如下:先对TransactionAttribute进行了一层简单封装DelegatingTransactionAttribute,增强了一个getName()方法,返回joinpointIdentification信息;
然后通过PlatformTransactionManager和TransactionAttribute获取TransactionStatus对象(该部分是关键);
最后将PlatformTransactionManager、DelegatingTransactionAttribute、TransactionStatus整合成TransactionInfo并返回。
其中,第二步中涉及Connection对象的创建以及事务的开启,需要引起重视:
// 删除日志和异常分支,并尽可能进行简化以突出主线逻辑:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {// 构造DataSourceTransactionObject对象Object transaction = doGetTransaction();return startTransaction(definition, transaction, false, null);
}
上述方法逻辑较为简单:创建一个DataSourceTransactionObject对象,创建一个空的ConnectionHolder对象并复制给DataSourceTransactionObject对象的connectionHolder属性。然后调用startTransaction
方法开启事务:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;
}
startTransaction代码逻辑较为简单:根据已有的事务信息对构建DefaultTransactionStatus对象,并通过doBegin
和prepareSynchronization
对其进行处理。
本文关心的内容在doBegin
方法中:
protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;// ⚠️1.根据DataSource对象创建Connection对象Connection newCon = this.obtainDataSource().getConnection();// ⚠️2.将Connection对象通过ConnectionHolder包装后赋值给DataSourceTransactionObject对象txObject.setConnectionHolder(new ConnectionHolder(newCon), true);txObject.getConnectionHolder().setSynchronizedWithTransaction(true);// ⚠️3.对DataSourceTransactionObject对象进行状态设置Connection con = txObject.getConnectionHolder().getConnection();Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);con.setAutoCommit(false);}this.prepareTransactionalConnection(con, definition);txObject.getConnectionHolder().setTransactionActive(true);int timeout = this.determineTimeout(definition);if (timeout != -1) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}if (txObject.isNewConnectionHolder()) {// ⚠️4.将Connection对象绑定到线程上下文中TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());}}
上述代码逻辑可以分为以下几个步骤:
(1)获取DataSource对象(此时为HikariDataSource), 然后基于该对象创建Connection对象;因此不同事务中 (即使同一线程) 的Connection不同。
(2)将Connection对象通过ConnectionHolder包装后赋值给DataSourceTransactionObject对象,用于后续的事务会滚或提交。
(3)为DataSourceTransactionObject设置超时时间、是否自动提交、隔离级别等;
(4)将Connection对象绑定到线程上下文中,为操作数据库时提供connection对象。进入TransactionSynchronizationManager类中的bindResource方法,ConnectionHolder对象被保存到resources对象中,该对象的定义如下:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
即resources对象为ThreadLocal,这要求事务内部只有一个线程运行。对于如下场景,事务会失去其作用:
目标方法中包括了数据库操作1-4,线程1在执行目标方法前后开启和关闭了事务;执行过程中,开启了两个子线程:线程2用于处理数据库操作1,线程3用于处理数据库操作2,数据库操作3和数据库操作4在线程1中进行。此时数据库操作3和4在线程1开启的事务中生效;而数据库操作1和2成为了独立的事务。
step2: 直接调用目标方法,得到返回结果
createTransactionIfNecessary
方法的入参InvocationCallback invocation
类型是一个函数式接口:
@FunctionalInterface
protected interface InvocationCallback {@NullableObject proceedWithInvocation() throws Throwable;
}
传参为:invocation::proceed
,当InvocationCallback对象的proceedWithInvocation()方法被调用时,执行invocation::proceed
这个lambda表达式,即目标对象的方法被执行。
step3: 回滚事务
当通过反射调用目标方法抛出Throwable类型的异常时,进入completeTransactionAfterThrowing(txInfo, ex);
分支:
// 这里删除了日志和异常处理带阿妹,突出主线逻辑:
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {// 回滚事务txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());} else {// 提交事务txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}
}
txInfo.transactionAttribute
属性中包含着注解在@Transactional中信息,如下所示:
其中,txInfo.transactionAttribute.rollbackOn(ex)
用于校验目标方法执行时抛出的异常是否在指定异常范围内:范围之内满足回滚条件,触发回滚;否则,触发提交操作。
举例说明:
@Transactional(rollbackFor = Exception.class)
public void updateMoney(int money) {accountRepository.updateMoney(money);
}
通过@Transactional注解指定的异常类型为Exception,当accountRepository.updateMoney(money);
语句执行抛出Exception或者其子类时(如IOException),执行回滚操作。
本文主题是介绍Spring实现事务的原理,不涉及Spring事务传播机制原理的介绍
因此,提交或者回滚关注主逻辑,略去事务传播机制的逻辑分支
回滚事务时,追踪txInfo.transactionAttribute.rollbackOn(ex)
:
protected void doRollback(DefaultTransactionStatus status) {DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {con.rollback();} catch (SQLException var5) {throw new TransactionSystemException("Could not roll back JDBC transaction", var5);}}
提交事务时,追踪txInfo.transactionAttribute.commit(ex)
:
protected void doCommit(DefaultTransactionStatus status) {DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {con.commit();} catch (SQLException var5) {throw new TransactionSystemException("Could not commit JDBC transaction", var5);}
}
无论rollback还是commit都依赖于Connection对象,即依赖于数据库的事务能力。
其中,获取Connection对象来自DefaultTransactionStatus对象,即来源于step1: 开启事务中创建的Connection对象。
step4: 提交事务
当目标方法执行过程中无异常发生时,进入以下逻辑:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {if (txInfo != null && txInfo.getTransactionStatus() != null) {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
与 step3: 回滚事务中介绍一致,用于提交事务对象。
3.4 数据库操作
在3.3 拦截器节中的step2步骤中调用invocation::proceed表达式反射调用目标方法,本节中结合Mybatis案例进行细节分析。
案例如下所示:
// Mapper类及其配置文件
@Mapper
public interface AccountMapper {void updateMoney(int money);
}<mapper namespace="com.seong.dao.AccountMapper"><update id="updateMoney" parameterType="java.lang.Integer">update t_accountset money = #{money}where name = 'a'</update>
</mapper>
当AccountMapper(动态代理对象)的updateMoney方法被调用时,由拦截器进入SqlSessionTemplate的update方法:
public int update(String statement, Object parameter) {return this.sqlSessionProxy.update(statement, parameter);
}
sqlSessionProxy也是一个代理对象,再次进入拦截器对象中(上述两次拦截在前文提及的事务文章中有介绍,有需要请参考) :
// 尽可能删除分支,突出核心逻辑:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// ⚠️1.获取sqlSession对象SqlSession sqlSession = SqlSessionUtils.getSqlSession(SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);// ⚠️2.数据库操作Object result = method.invoke(sqlSession, args);// ⚠️3.关闭 SqlSessionSqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);return result;
}
这部分的逻辑是获取sqlSession对象,并执行数据库操作;注意这里也有一个ThreadLocal对象,读者可自行分析其作用。这里关注的重点是SqlSessionUtils.getSqlSession
获取sqlsession对象以及执行过程。
SqlSession对象的属性和行为如下图所示:
通过executor间接包含了transaction事务对象,即每个SqlSession对象内部持有一个事务对象。事务对象创建之初内部的Connection属性为空,直到需要执行数据库操作时才会根据事务对象生成。通过SqlSessionUtils.getSqlSession
方法得到的SqlSession对象也是如此。
追踪该方法的调用链,进入openSessionFromDataSource方法:
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {Environment environment = configuration.getEnvironment();TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);Transaction tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);Executor executor = configuration.newExecutor(tx, execType);return new DefaultSqlSession(configuration, executor, autoCommit);
}
从Mybatis的configuration配置中获取环境对象,并从环境对象中获取事务工厂;注意:这里的configuration和环境对象、事务工厂是全局唯一的。当Spring集成Mybatis时,TransactionFactory会被设置为SpringManagedTransactionFactory类型。由SpringManagedTransactionFactory事务工厂创建的对象自然为SpringManagedTransaction类型:
public class SpringManagedTransactionFactory implements TransactionFactory {public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {return new SpringManagedTransaction(dataSource);}
}
当执行sql语句前先通过transaction对象获取数据库连接对象:
protected Connection getConnection(Log statementLog) throws SQLException {Connection connection = transaction.getConnection();if (statementLog.isDebugEnabled()) {return ConnectionLogger.newInstance(connection, statementLog, queryStack);} else {return connection;}
}
进入SpringManagedTransaction的getConnection方法:
public Connection getConnection() throws SQLException {if (this.connection == null) {openConnection();}return this.connection;
}private void openConnection() throws SQLException {this.connection = DataSourceUtils.getConnection(this.dataSource);this.autoCommit = this.connection.getAutoCommit();this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
所以,SpringManagedTransaction是通过DataSourceUtils.getConnection(this.dataSource)
获取Connection对象,追踪其调用链进入doGetConnection方法:
public static Connection doGetConnection(DataSource dataSource) throws SQLException {ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) {// ... } else {conHolder.requested();return conHolder.getConnection();}
}
通过TransactionSynchronizationManager.getResource(dataSource)
获取的ConnectionHolder对象正是开启事务时放入的。
因此,保证了事务在执行sql以及会滚/提交过程中使用的是同一个Connection对象。