【数据库与事务系列】多数据源切换

news/2025/1/15 21:05:47/

分库分表

不光是管理多个数据源,是对sql的优化、改写、归并等一系列操作的解决方案。关注的是sql语句。以shardingSphere为例,虽然也支持跟sql无关的hint策略提供路由功能,但是在sql改写以及归并过程中,依旧对sql有限制。

多数据源切换

如果只是简单的切换多个数据源,而对sql的逻辑没有任何限制,就不要选择分库分表了。直接选用多数据源切换多方案更简单。spring-jdbc模块提供了AbstractRoutingDataSource抽象类,其内部可以包含多个DataSource,只需要实现其抽象方法,在运行时就可以动态访问指定的数据库。但是需要自己实现一些aop的切换能力,这个mybaitis-plus都帮我们做好了。
请添加图片描述
业界主要有两种实现方案:

  • AOP + ThreadLocal ,如:Mybatis-plus的多数据源(dynamic-datasource);
  • 语义解析,如:客户端侧:ShardingSphere-Jdbc,服务端侧:ShardingSphere-Proxy,阿里云、腾讯云proxy。

一、动态数据源切换

(AbstractRoutingDataSource实现)

我们来查看AbstractRoutingDataSource源码,来更好的理解多数据源配置。
首先查看该类的属性,根据名称我们能看出他们的作用。

private Map<Object, Object> targetDataSources;
private Object defaultTargetDataSource;
private boolean lenientFallback = true;
private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
private Map<Object, DataSource> resolvedDataSources;
private DataSource resolvedDefaultDataSource;
  • targetDataSources是目标数据源集合
  • defaultTargetDataSource是默认数据源
  • resolvedDataSources是解析后的数据源集合
  • resolvedDefaultDataSource是解析后的默认数据源

对数据源赋值的代码如下:

public void setTargetDataSources(Map<Object, Object> targetDataSources) {this.targetDataSources = targetDataSources;
}public void setDefaultTargetDataSource(Object defaultTargetDataSource) {this.defaultTargetDataSource = defaultTargetDataSource;
}

因为方法是set开头,我们便能把这两个方法配置在spring中,继续向下看。

public void afterPropertiesSet() {if (this.targetDataSources == null) {throw new IllegalArgumentException("Property 'targetDataSources' is required");} else {this.resolvedDataSources = new HashMap(this.targetDataSources.size());Iterator var1 = this.targetDataSources.entrySet().iterator();while(var1.hasNext()) {Entry<Object, Object> entry = (Entry)var1.next();Object lookupKey = this.resolveSpecifiedLookupKey(entry.getKey());DataSource dataSource = this.resolveSpecifiedDataSource(entry.getValue());this.resolvedDataSources.put(lookupKey, dataSource);}if (this.defaultTargetDataSource != null) {this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);}}
}

这个afterPropertiesSet方法是遍历我们的targetDataSources数据源集合,并添加resolvedDataSources的map数据,map的key和value是根据resolveSpecifiedLookupKey方法和resolveSpecifiedDataSource方法得到。接着找到resolveSpecifiedLookupKey和resolveSpecifiedDataSource。

protected Object resolveSpecifiedLookupKey(Object lookupKey) {return lookupKey;
}protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {if (dataSource instanceof DataSource) {return (DataSource)dataSource;} else if (dataSource instanceof String) {return this.dataSourceLookup.getDataSource((String)dataSource);} else {throw new IllegalArgumentException("Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);}
}

resolveSpecifiedLookupKey方法返回的实际就是targetDataSources的key,而resolveSpecifiedDataSource返回的是targetDataSources的value转成的DataSource。afterPropertiesSet方法的作用实际就是将原targetDataSources转成resolvedDataSources。

继续向下看,我们能看到数据库的连接方法。

public Connection getConnection() throws SQLException {return this.determineTargetDataSource().getConnection();
}public Connection getConnection(String username, String password) throws SQLException {return this.determineTargetDataSource().getConnection(username, password);
}

我们接着去看determineTargeDataSource方法,估计这个方法是返回指定数据源的。

protected DataSource determineTargetDataSource() {Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");Object lookupKey = this.determineCurrentLookupKey();DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);if (dataSource == null && (this.lenientFallback || lookupKey == null)) {dataSource = this.resolvedDefaultDataSource;}if (dataSource == null) {throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");} else {return dataSource;}
}

果然,这个方法是返回数据源的,我们来仔细读这个方法,从第3行开始”Object lookupKey = this.determineCurrentLookupKey();”,这个determineCurrentLookupKey返回了一个key,第四句是根据这个key去resolvedDataSources中拿到对应DataSource,接下来的代码是DataSource不存在便返回默认的数据源。determineCurrentLookupKey方法就是返回key的逻辑处理部分,联系spring中的配置,它返回的就是”cms”、”epg”中的一个。

实战

新建一个springboot项目,pom.xml文件中引入如下依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId>
</dependency>

2. application.yml文件

server:port: 8090
spring:application:name: springboot-dynamic-aopdatasource:type: com.alibaba.druid.pool.DruidDataSourcemaster:jdbc-url: jdbc:mysql://localhost:3306/dynamic-master?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8username: rootpassword: 123456driver-class-name: com.mysql.cj.jdbc.Driverslave:jdbc-url: jdbc:mysql://localhost:3306/dynamic-slave?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8username: rootpassword: 123456driver-class-name: com.mysql.cj.jdbc.Driver
mybatis:mapper-locations: classpath:mapper/*.xmlconfiguration:use-actual-param-name: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3. 数据源配置类

@Configuration
public class DataSourceConfig {@Bean@ConfigurationProperties(prefix = "spring.datasource.master")public DataSource masterDataSource() {return DataSourceBuilder.create().build();}@Bean@ConfigurationProperties(prefix = "spring.datasource.slave")public DataSource slaveDataSource() {return DataSourceBuilder.create().build();}@Beanpublic DataSourceTransactionManager masterDataSourceTransactionManager(DynamicDataSource dynamicDataSource) {DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();dataSourceTransactionManager.setDataSource(dynamicDataSource);return dataSourceTransactionManager;}@Beanpublic DataSourceTransactionManager slaveDataSourceTransactionManager(DynamicDataSource dynamicDataSource) {DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();dataSourceTransactionManager.setDataSource(dynamicDataSource);return dataSourceTransactionManager;}
}

4. 动态数据源类

新建一个类继承AbstractRoutingDataSource,实现其抽象类

@Primary
@Component
public class DynamicDataSource extends AbstractRoutingDataSource {public static final ThreadLocal<String> name = new ThreadLocal<>();@AutowiredDataSource masterDataSource;@AutowiredDataSource slaveDataSource;@Overrideprotected Object determineCurrentLookupKey() {return name.get();}@Overridepublic void afterPropertiesSet() {Map<Object, Object> targetDataSources = new HashMap<>();targetDataSources.put("master", masterDataSource);targetDataSources.put("slave", slaveDataSource);//设置目标数据源super.setTargetDataSources(targetDataSources);//设置默认数据源super.setDefaultTargetDataSource(masterDataSource);super.afterPropertiesSet();}
}

5. 实现多数据源切换

一般情况下,读写分离的数据源使用MyBatis插件实现动态切换数据源,不同业务来源的数据源使用AOP结合自定义注解实现动态切换数据源,或者定义多个mybatis sqlsessionFactory来实现

5.1. MyBatis插件实现动态切换

新建一个插件类,实现Interceptor接口

@Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
})
public class DynamicDataSourcePlugin implements Interceptor {@Overridepublic Object intercept(Invocation invocation) throws Throwable {Object[] objects = invocation.getArgs();MappedStatement mappedStatement = (MappedStatement) objects[0];if (mappedStatement.getSqlCommandType().equals(SqlCommandType.SELECT)) {DynamicDataSource.name.set("slave");} else {DynamicDataSource.name.set("master");}return invocation.proceed();}@Overridepublic Object plugin(Object target) {if (target instanceof Executor) {return Plugin.wrap(target, this);} else {return target;}}@Overridepublic void setProperties(Properties properties) {}
}

再将DynamicDataSourcePlugin类加入DataSourceConfig配置类

@Bean
public Interceptor interceptor() {return new DynamicDataSourcePlugin();
}

5.2. AOP结合自定义注解实现

新建一个自定义注解DS

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DS {String value() default "";
}

新建切面类

@Aspect
@Component
public class DynamicDataSourceAspect implements Ordered {@Before("@within(ds)")public void before(JoinPoint joinPoint, DS ds) {DynamicDataSource.name.set(ds.value());}@Overridepublic int getOrder() {return 0;}
}

5.3 spring即成多个mybatis 工厂实现

请添加图片描述
SpringBoot配置文件 配置多个数据库 分别为 his/pt/lis


spring:datasource:his:driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc-url: jdbc:sqlserver://192.168.200.200\HIS;DatabaseName=his_fyusername: sapassword: 123456#初始化连接池的连接数量 大小 最小 最大initial-size: 5min-idle: 5max-active: 20#配置获取连接等待超时的时间max-wait: 60000#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒time-between-eviction-runs-millis: 60000# 配置一个连接在池中最小生存的时间,单位是毫秒min-evictable-idle-time-millis: 30000# 配置一个连接在池中最大生存的时间,单位是毫秒max-evictable-idle-time-millis: 300000pt:driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc-url: jdbc:sqlserver://192.168.200.200\HIS;DatabaseName=his_pt_datausername: sapassword: 123456#初始化连接池的连接数量 大小 最小 最大initial-size: 5min-idle: 5max-active: 20#配置获取连接等待超时的时间max-wait: 60000#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒time-between-eviction-runs-millis: 60000# 配置一个连接在池中最小生存的时间,单位是毫秒min-evictable-idle-time-millis: 30000# 配置一个连接在池中最大生存的时间,单位是毫秒max-evictable-idle-time-millis: 300000lis:driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc-url: jdbc:sqlserver://192.168.200.200\HIS;DatabaseName=LISusername: sapassword: 123456#初始化连接池的连接数量 大小 最小 最大initial-size: 5min-idle: 5max-active: 20#配置获取连接等待超时的时间max-wait: 60000#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒time-between-eviction-runs-millis: 60000# 配置一个连接在池中最小生存的时间,单位是毫秒min-evictable-idle-time-millis: 30000# 配置一个连接在池中最大生存的时间,单位是毫秒max-evictable-idle-time-millis: 300000

1. 引入核心依赖

创建his对应的配置文件

/*** 多数据源配置类 此类配置读取his_fy数据库* @author zhaogx* @date 2022/5/18 14:28*/
@Configuration
@MapperScan(basePackages = {"com.thwy.mapper.his"},sqlSessionFactoryRef = "hisSqlSessionFactory"
)
public class HisDataSourceConfig {/*** @ConfigurationProperties(prefix = "spring.datasource.his") 读取配置文件中的数据源信息* @return 返回一个数据源 名字为 hisDataSource*/@Bean(name = "hisDataSource")@ConfigurationProperties(prefix = "spring.datasource.his")public DataSource hisDataSource(){return DataSourceBuilder.create().build();}/*** 配置SqlSessionFactory* @Qualifier("hisDataSource") 类型相同时指定注入哪一个名称的bean* @param hisDataSource hisDataSource方法中创建的指定数据源* @return* @throws Exception*/@Bean(name = "hisSqlSessionFactory")public SqlSessionFactory hisSqlSessionFactory(@Qualifier("hisDataSource") DataSource hisDataSource) throws Exception{SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();//设置数据源sqlSessionFactoryBean.setDataSource(hisDataSource);//设置mybtais配置 驼峰命名配置org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();config.setMapUnderscoreToCamelCase(true);sqlSessionFactoryBean.setConfiguration(config);//设置mapper.xml所在目录sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/his/*.xml"));return sqlSessionFactoryBean.getObject();}/*** 配置SqlSessionTemplate 可省略此步骤* @param hisSqlSessionFactory* @return*/public SqlSessionTemplate hisSqlSessionTemplate(@Qualifier("hisSqlSessionFactory") SqlSessionFactory hisSqlSessionFactory){return new SqlSessionTemplate(hisSqlSessionFactory);}}

创建pt对应的配置文件

@Configuration
@MapperScan(basePackages = {"com.thwy.mapper.pt"},sqlSessionFactoryRef = "ptSqlSessionFactory"
)
public class PTDataSourceConfig {/*** @ConfigurationProperties(prefix = "spring.datasource.pt") 读取配置文件中的数据源信息* @return 返回一个数据源 名字为 ptDataSource*/@Bean(name = "ptDataSource")@ConfigurationProperties(prefix = "spring.datasource.pt")public DataSource ptDataSource(){return DataSourceBuilder.create().build();}/*** 配置SqlSessionFactory* @Qualifier("ptDataSource") 类型相同时指定注入哪一个名称的bean* @param ptDataSource ptDataSource方法中创建的指定数据源* @return* @throws Exception*/@Bean(name = "ptSqlSessionFactory")public SqlSessionFactory ptSqlSessionFactory(@Qualifier("ptDataSource") DataSource ptDataSource) throws Exception{SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();//设置数据源sqlSessionFactoryBean.setDataSource(ptDataSource);//设置mybtais配置 驼峰命名配置org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();config.setMapUnderscoreToCamelCase(true);sqlSessionFactoryBean.setConfiguration(config);//设置mapper.xml所在目录sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/pt/*.xml"));return sqlSessionFactoryBean.getObject();}/*** 配置SqlSessionTemplate 可省略此步骤* @param ptSqlSessionFactory* @return*/public SqlSessionTemplate ptSqlSessionTemplate(@Qualifier("ptSqlSessionFactory") SqlSessionFactory ptSqlSessionFactory){return new SqlSessionTemplate(ptSqlSessionFactory);}
}

创建lis对应的配置文件

/*** 多数据源配置类 此类配置读取LIS数据库* @author zhaogx* @date 2022/5/18 14:28*/
@Configuration
@MapperScan(basePackages = {"com.thwy.mapper.lis"},sqlSessionFactoryRef = "lisSqlSessionFactory"
)
public class LisDataSourceConfig {/*** @ConfigurationProperties(prefix = "spring.datasource.lis") 读取配置文件中的数据源信息* @return 返回一个数据源 名字为 lisDataSource*/@Bean(name = "lisDataSource")@ConfigurationProperties(prefix = "spring.datasource.lis")public DataSource lisDataSource(){return DataSourceBuilder.create().build();}/*** 配置SqlSessionFactory* @Qualifier("lisDataSource") 类型相同时指定注入哪一个名称的bean* @param lisDataSource lisDataSource方法中创建的指定数据源* @return* @throws Exception*/@Bean(name = "lisSqlSessionFactory")public SqlSessionFactory lisSqlSessionFactory(@Qualifier("lisDataSource") DataSource lisDataSource) throws Exception{SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();//设置数据源sqlSessionFactoryBean.setDataSource(lisDataSource);//设置mybtais配置 驼峰命名配置org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();config.setMapUnderscoreToCamelCase(true);sqlSessionFactoryBean.setConfiguration(config);//设置mapper.xml所在目录sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/lis/*.xml"));return sqlSessionFactoryBean.getObject();}/*** 配置SqlSessionTemplate 可省略此步骤* @param lisSqlSessionFactory* @return*/public SqlSessionTemplate lisSqlSessionTemplate(@Qualifier("lisSqlSessionFactory") SqlSessionFactory lisSqlSessionFactory){return new SqlSessionTemplate(lisSqlSessionFactory);}}

实现

在配置文件中我们配置了 mapper的包扫描与xml文件的存放路径
此时当我们执行指定包下的mapper中的方法时,就会走与之对应的数据库
在这里插入图片描述
在这里插入图片描述
自己整合实现多数据源多有麻烦,baomidou提供的dynamic-datasource-spring-boot-starter已实现了上述功能,只需要引入该依赖即可,可以参阅SpringBoot整合dynamic-datasource实现动态切换多数据源

SpringBoot整合dynamic-datasource实现动态切换多数据源

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.1</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version>
</dependency>

2. application.yml配置

server:port: 8226
spring:application:name: springboot-dynamic-mybatis-plusdatasource:type: com.alibaba.druid.pool.DruidDataSourcedynamic:primary: masterstrict: true #严格匹配数据源datasource:master:url: jdbc:mysql://localhost:3306/dynamic-master?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8username: rootpassword: LIU81&yjdriver-class-name: com.mysql.cj.jdbc.Driverslave:url: jdbc:mysql://localhost:3306/dynamic-slave?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8username: rootpassword: LIU81&yjdriver-class-name: com.mysql.cj.jdbc.Driverdruid:initial-size: 5 #初始连接数min-idle: 10 #最小连接池max-active: 20 #最大连接池max-wait: 60000 #连接等待超时时间time-between-eviction-runs-millis: 60000 #检测间隔时间,毫秒min-evictable-idle-time-millis: 300000 #连接池最小生存时间,毫秒max-evictable-idle-time-millis: 900000 #连接池最大生存时间,毫秒validation-query: SELECT 1 FROM DUAL #连接检测mybatis-plus:mapper-locations: classpath*:/mapper/**/*.xmlconfiguration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3. 引入注解动态切换数据源

@Service
@DS(value = "master")
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements CustomerService {
}

若需要使用到事务,只需要在最外层加注解@DSTransactional即可
当然了mybatisPlus在很多位置给我们留了拓展,比如如何加载数据源、对接其它连接池、自定义负责均衡策略、自定义路由查找:
请添加图片描述
其主要类图如下:感兴趣的可以去读读源码
在这里插入图片描述

多数据源带来的问题

引入多数据源后,解决了多数据源访问的问题,同时也带来另外2个问题:

  • 事务问题:对多数据源写操作时,如何保证数据的一致性,完整性?
  • 多层嵌套切换问题(AOP方案):如:serviceA—>ServiceB—>ServiceC,如何保证每层都使用自己的数据源?
    特殊情况下还是可以,比如这样的
    请添加图片描述

二、一个方法开启两个事务,实现spring编程或者声明式事务

1、为每个数据源定义一个事务管理器
//数据源1
@Bean
public DataSource dataSource1() {org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/ds1?characterEncoding=UTF-8");dataSource.setUsername("root");dataSource.setPassword("root123");dataSource.setInitialSize(5);return dataSource;
}//事务管理器1,对应数据源1
@Bean
public PlatformTransactionManager transactionManager1(@Qualifier("dataSource1")DataSource dataSource) {return new DataSourceTransactionManager(dataSource);
}//数据源2
@Bean
public DataSource dataSource2() {org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/ds2?characterEncoding=UTF-8");dataSource.setUsername("root");dataSource.setPassword("root123");dataSource.setInitialSize(5);return dataSource;
}//事务管理器2,对应数据源2
@Bean
public PlatformTransactionManager transactionManager2(@Qualifier("dataSource2")DataSource dataSource) {return new DataSourceTransactionManager(dataSource);
}
2、指定事务的管理器 bean 名称

使用@Transaction 中时,需通过@Transaction 注解的 value 或 transactionManager 属性指定事务管理器 bean 名称,如:

@Transactional(transactionManager = "transactionManager1", propagation = Propagation.REQUIRED)
public void required(String name) {this.jdbcTemplate1.update("insert into user1(name) VALUES (?)", name);
}

多数据源事务的使用就这么简单,下面我们来看案例,案例才是精华。

事务管理器运行过程

这里先给大家解释一下 REQUIRED 传播行为下,事务管理器的大致的运行过程,方便理解后面的案例代码。

Service1中:
@Transactional(transactionManager = "transactionManager1", propagation = Propagation.REQUIRED)
public void m1(){this.jdbcTemplate1.update("insert into user1(name) VALUES ('张三')");service2.m2();
}Service2中:
@Transactional(transactionManager = "transactionManager1", propagation = Propagation.REQUIRED)
public void m2(){this.jdbcTemplate1.update("insert into user1(name) VALUES ('李四')");
}

spring 事务中有个 resources 的 ThreadLocal,static 修饰的,用来存放共享的资源,稍后过程中会用到。
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>(“Transactional resources”);
下面看 m1 方法简化版的事务过程:

1、TransactionInterceptor拦截m1方法
2、获取m1方法的事务配置信息:事务管理器bean名称:transactionManager1,事务传播行为:REQUIRED
3、从spring容器中找到事务管理器transactionManager1,然后问一下transactionManager1,当前上下文中有没有事务,显然现在是没有的
4、创建一个新的事务//获取事务管理器对应的数据源,即dataSource1DataSource dataSource1 = transactionManager1.getDataSource();//即从dataSource1中获取一个连接Connection conn = transactionManager1.dataSource1.getConnection();//开启事务手动提交conn.setAutoCommit(false);//将dataSource1->conn放入map中map.put(dataSource1,conn);//将map丢到上面的resources ThreadLocal中resources.set(map);
5、下面来带m1放的第一行代码:this.jdbcTemplate1.update("insert into user1(name) VALUES ('张三')");
6、jdbctemplate内部需要获取数据连接,获取连接的过程//从resources这个ThreadLocal中获取到mapMap map = resources.get();//通过jdbcTemplate1.datasource从map看一下没有可用的连接Connection conn = map.get(jdbcTemplate1.datasource);//如果从map没有找到连接,那么重新从jdbcTemplate1.datasource中获取一个//大家应该可以看出来,jdbcTemplate1和transactionManager1指定的是同一个dataSource,索引这个地方conn是不为null的if(conn==null){conn = jdbcTemplate1.datasource.getConnection();}
7、通过上面第6步获取的conn执行db操作,插入张三
8、下面来到m1方法的第2行代码:service2.m2();
9、m2方法上面也有@Transactional,TransactionInterceptor拦截m2方法
10、获取m2方法的事务配置信息:事务管理器bean名称:transactionManager1,事务传播行为:REQUIRED
11、从spring容器中找到事务管理器transactionManager1,然后问一下transactionManager1,当前上下文中有没有事务,显然是是有的,m1开启的事务正在执行中,所以m2方法就直接加入这个事务了
12、下面来带m2放的第一行代码:this.jdbcTemplate1.update("insert into user1(name) VALUES ('李四')");
13、jdbctemplate内部需要获取数据连接,获取连接的过程//从resources这个ThreadLocal中获取到mapMap map = resources.get();//通过jdbcTemplate1.datasource从map看一下没有可用的连接Connection conn = map.get(jdbcTemplate1.datasource);//如果从map没有找到连接,那么重新从jdbcTemplate1.datasource中获取一个//大家应该可以看出来,jdbcTemplate1和transactionManager1指定的是同一个dataSource,索引这个地方conn是不为null的if(conn==null){conn = jdbcTemplate1.datasource.getConnection();}
14、通过第13步获取的conn执行db操作,插入李四
15、最终TransactionInterceptor发现2个方法都执行完毕了,没有异常,执行事务提交操作,如下//获取事务管理器对应的数据源,即dataSource1DataSource dataSource1 = transactionManager1.getDataSource();//从resources这个ThreadLocal中获取到mapMap map = resources.get();//通过map拿到事务管理器开启的连接Connection conn = map.get(dataSource1);//通过conn提交事务conn.commit();//管理连接conn.close();
16、清理ThreadLocal中的连接:通过map.remove(dataSource1)将连接从resource ThreadLocal中移除
17、清理事务

从上面代码中可以看出:整个过程中有 2 个地方需要用到数据库连接 Connection 对象,第 1 个地方是:spring 事务拦截器启动事务的时候会从 datasource 中获取一个连接,通过这个连接开启事务手动提交,第 2 个地方是:最终执行 sql 操作的时候,也需要用到一个连接。那么必须确保这两个连接必须是同一个连接的时候,执行 sql 的操作才会受 spring 事务控制,那么如何确保这 2 个是同一个连接呢?从代码中可以看出必须让事务管理器中的 datasource 和 JdbcTemplate 中的 datasource 必须是同一个,那么最终 2 个连接就是同一个对象。

什么是事务挂起操作?

这里以事务传播行为 REQUIRED_NEW 为例说明一下,REQUIRED_NEW 表示不管当前事务管理器中是否有事务,都会重新开启一个事务,如果当前事务管理器中有事务,会把当前事务挂起。

所谓挂起,你可以这么理解:对当前存在事务的现场生成一个快照,然后将事务现场清理干净,然后重新开启一个新事务,新事务执行完毕之后,将事务现场清理干净,然后再根据前面的快照恢复旧事务。

下面我们再回到本文的内容,多数据源事务管理。
事务管理器如何判断当前是否有事务?
简化版的过程如下:

Map map=resource的ThreadLocal.get();
DataSource datasource = transactionManager.getDataSource();
Connection conn = map.get(datasource);
//如果conn不为空,就表示当前有事务
if(conn!=null){
}

从这段代码可以看出:判断是否存在事务,主要和 datasource 有关,和事务管理器无关,即使是不同的事务管理器,只要事务管理器的 datasource 是一样的,那么就可以发现当前存在的事务。

多数据源事务管理(使用JTA+多mybatis工厂)

1.1 环境说明

1.1.1 组件说明

DataSource: Alibaba Druid
Database: MySQL 5.7
SpringBoot: 2.2.2.RELEASE
ORM: MyBatis
JTA: Atomikos

1.1.2 项目关键依赖

    <dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.1</version></dependency><!--atomikos transaction management--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.21</version></dependency>

1.1.3 多数据源事务管理()

  • 数据源使用两个数据库的不同表
  • 都是用Druid做连接池,然后用Atomikos管理
1.1.4 JTA的工具
  • SpringBoot可以用的官方说了两个一个是Atomikos,另一个是Bitronix,除此之外还可以在支持JTA的web server中用。(Tomcat不支持)
    SpringBoot文档中的说明:当检测到JTA环境时,将使用Spring的0
  • JtaTransactionManager来管理事务。JMS、DataSource、JPA已升级为支持XA事务。可以用标准的Spring用法(例如@Transactional)来参与分布式事务。如果您在JTA环境中,并且仍要使用本地事务,则可以将spring.jta.enabled属性设置为false以禁用JTA自动配置。

1.2 实例业务说明

简单逻辑,两张表,分别在两个不同的库中,然后一个service方法操作两个库的数据。

1.3 多数据源配置

第一张表:是账户表
第二章表:是订单表

spring:application:name: two-data-sourcedatasource:account:url: jdbc:mysql://127.0.0.1:3306/transaction_account?useSSL=false&characterEncoding=UTF-8username: rootpassword: xxxxxorder:url: jdbc:mysql://127.0.0.1:3306/transaction_order?useSSL=false&characterEncoding=UTF-8username: rootpassword: xxxxx
#logging:
#  level:
#    root: DEBUG

1.3.2 Bean注册

主要包括以下步骤

1、分别注册对应DataSource、SqlSessionFactory、SqlSessionTemplate的Bean
2、然后指定表的Mapper的位置,并且把Mybatis中原有的sqlSessionTemplate设置成你注册的。

需要注意的点:
DataSource不能直接使用Druid提供的DruidDataSource, 需要使用atomikos来包装一下Druid提供的DruidXADataSource,来支持XA规范
如果你不想用Druid,可以考虑使用MysqlXADataSource(我没试过)
注册的Bean的对应关系要正确

  • order库的类似 此处略(本质上和上面的mybatis多数据源一样,需要指定不同的sqlfactory)
```c
@Configuration
@MapperScan(basePackages = {"io.ilss.transaction.twodatasource.dao.account"}, sqlSessionTemplateRef = "accountSqlSessionTemplate")
public class AccountConfiguration {@Value("${spring.datasource.account.url}")private String url;@Value("${spring.datasource.account.username}")private String username;@Value("${spring.datasource.account.password}")private String password;@Bean(name = "accountDataSource")public DataSource accountDataSource() {AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();DruidXADataSource druidXADataSource = new DruidXADataSource();druidXADataSource.setUrl(url);druidXADataSource.setUsername(username);druidXADataSource.setPassword(password);druidXADataSource.setName("druidDataSource-account");atomikosDataSourceBean.setXaDataSource(druidXADataSource);atomikosDataSourceBean.setUniqueResourceName("accountResource");return atomikosDataSourceBean;}@Bean(name = "accountSqlSessionFactory")public SqlSessionFactory accountSqlSessionFactory(DataSource accountDataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(accountDataSource);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mappers/account/*.xml"));return factoryBean.getObject();}@Bean(name = "accountSqlSessionTemplate")@Primarypublic SqlSessionTemplate accountSqlSessionTemplate(@Qualifier("accountSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {return new SqlSessionTemplate(sqlSessionFactory);}
}

配置正确后会有如下日志信息

c.atomikos.jdbc.AbstractDataSourceBean   : AtomikosDataSoureBean 'orderResource': poolSize equals default - this may cause performance problems!
com.alibaba.druid.pool.DruidDataSource   : {dataSource-1,druidDataSource-order} inited
c.atomikos.jdbc.AbstractDataSourceBean   : AtomikosDataSoureBean 'accountResource': poolSize equals default - this may cause performance problems!
com.alibaba.druid.pool.DruidDataSource   : {dataSource-2,druidDataSource-account} inited
c.a.icatch.provider.imp.AssemblerImp     : Loaded jar:file:/Users/feng/.m2/repository/com/atomikos/transactions/4.0.6/transactions-4.0.6.jar!/transactions-defaults.properties
c.a.icatch.provider.imp.AssemblerImp     : Thanks for using Atomikos! Evaluate http://www.atomikos.com/Main/ExtremeTransactions for advanced features and professional support...略
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.default_max_wait_time_on_shutdown = 9223372036854775807
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.allow_subtransactions = true
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.recovery_delay = 10000
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.automatic_resource_registration = true
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.oltp_max_retries = 5
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.client_demarcation = false
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.threaded_2pc = false
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.serial_jta_transactions = true
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.log_base_dir = /Users/feng/Projects/java/transaction-example/transaction-logs
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.rmi_export_class = none
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.max_actives = 50
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.checkpoint_interval = 500
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.enable_logging = true
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.log_base_name = tmlog
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.max_timeout = 300000
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.trust_client_tm = false
c.a.icatch.provider.imp.AssemblerImp     : USING: java.naming.factory.initial = com.sun.jndi.rmi.registry.RegistryContextFactory
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.tm_unique_name = 10.11.11.11.tm
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.forget_orphaned_log_entries_delay = 86400000
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.oltp_retry_interval = 10000
c.a.icatch.provider.imp.AssemblerImp     : USING: java.naming.provider.url = rmi://localhost:1099
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.force_shutdown_on_vm_exit = false
c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.default_jta_timeout = 10000
c.a.icatch.provider.imp.AssemblerImp     : Using default (local) logging and recovery...
c.a.d.xa.XATransactionalResource         : orderResource: refreshed XAResource
c.a.d.xa.XATransactionalResource         : accountResource: refreshed XAResource

首先初始化两个Atomikos包裹的Druid的数据源,
然后设置atomikos的参数,都是默认的
最后XAResource刷新
至此,配置完毕,可能有人好奇,JTA的代码一个都没有,因为SpringBoot使用JTA的时候引入的starter做了

1.4 事务实例

简单模拟订单生成支付过程,从账户中扣除一比钱,然后新增一比订单。
编程的方式和Spring事务的方式一毛一样,没什么不同。

1.4.1 实现代码
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {@Autowiredprivate OrderInfoDAO orderInfoDAO;@Autowiredprivate AccountDAO accountDAO;@AutowiredPlatformTransactionManager transactionManager;@Override@Transactionalpublic String createOrder(OrderInfoDO orderInfoDO) {AccountDO accountDO = accountDAO.selectByPrimaryKey(orderInfoDO.getAccountId());if (null == accountDO) {log.error("createOrder user is not present, accountId: {}", orderInfoDO.getAccountId());return "用户不存在!";}// 用户费用扣除accountDO.setBalance(accountDO.getBalance().subtract(orderInfoDO.getAmount()));accountDAO.updateByPrimaryKey(accountDO);orderInfoDAO.insertSelective(orderInfoDO);return "成功";}@Overridepublic String createOrderCode(OrderInfoDO orderInfoDO) {TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();// 获取事务 开始业务执行TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);try {AccountDO accountDO = accountDAO.selectByPrimaryKey(orderInfoDO.getAccountId());if (null == accountDO) {log.error("createOrder user is not present, accountId: {}", orderInfoDO.getAccountId());return "用户不存在!";}// 用户费用扣除accountDO.setBalance(accountDO.getBalance().subtract(orderInfoDO.getAmount()));accountDAO.updateByPrimaryKey(accountDO);orderInfoDAO.insertSelective(orderInfoDO);error("createOrderCode error");transactionManager.commit(transaction);return "成功";} catch (Exception e) {log.error("create order failed, accountId: {}, errMsg: {}", orderInfoDO.getAccountId(), e.getMessage());transactionManager.rollback(transaction);}return "失败";}public static void error(String  msg) {throw new RuntimeException(msg);}
}

mybatiss plus多数据源事务管理(seta)

mybatis plus从3.3.0开始支持本地多数据源事务,无需第三方。
尝试手动构建数据源结合JTA方案 如https://www.cnblogs.com/cicada-smile/p/13289306.html。

多数据源事务方案一直是一个难题,通常的解决方案有以下二种。

利用atomiks手动构建多数据源事务,适合数据源较少,配置的参数也不太多的项目。难点就是手动配置量大,需要耗费一定时间。
用seata类似的分布式事务解决方案,难点就是需要搭建维护如seata-server的统一管理中心。

不支持spring原生事务,不支持spring事务,不支持spring事务,可分别使用,千万不能混用。
再次强调不支持spring事务注解,可理解成独立写了一套事务方案。
只适合简单本地多数据源场景, 如果涉及异步和微服务等场景,请使用seata方案

在需要切换数据源且需要事务支持的方法上加@DSTransactional.

PS:一般需要分布式事务的场景大多数都是微服务化,个人并不建议在单体项目引入多数据源+分布式事务,有能力尽早拆开,可为过度方案。

seata Github地址https://github.com/seata/seata
seata 文档https://seata.io/zh-cn/docs/overview/what-is-seata.html
seata 示例https://github.com/seata/seata-samples
seata 最新版本

总结:

使用mybatisplus来实现,基于上面的集中场景,完成配置式的开发。
支持 数据源分组 ,适用于多种场景 纯粹多库 读写分离 一主多从 混合模式。
支持数据库敏感配置信息 加密 ENC()。
支持每个数据库独立初始化表结构schema和数据库database。
支持无数据源启动,支持懒加载数据源(需要的时候再创建连接)。
支持 自定义注解 ,需继承DS(3.2.0+)。
提供并简化对Druid,HikariCp,BeeCp,Dbcp2的快速集成。
提供对Mybatis-Plus,Quartz,ShardingJdbc,P6sy,Jndi等组件的集成方案。
提供 自定义数据源来源 方案(如全从数据库加载)。
提供项目启动后 动态增加移除数据源 方案。
提供Mybatis环境下的 纯读写分离 方案。
提供使用 spel动态参数 解析数据源方案。内置spel,session,header,支持自定义。
支持 多层数据源嵌套切换 。(ServiceA >>> ServiceB >>> ServiceC)。
提供 **基于seata的分布式事务方案。
提供 本地多数据源事务方案。

引用

  • https://zhuanlan.zhihu.com/p/529772940
  • https://blog.csdn.net/qq_40300227/article/details/125541289
  • https://www.jianshu.com/p/421f7be8627c
  • https://juejin.cn/post/6844904041852436493
  • https://blog.csdn.net/chenzoff/article/details/125077167
  • 官网https://www.kancloud.cn/tracy5546/dynamic-datasource/purchase
  • https://blog.csdn.net/ooaash/article/details/117709676
  • https://www.kancloud.cn/ztgis/gisboot/2312961

http://www.ppmy.cn/news/1160.html

相关文章

C语言学习笔记(十八)

C语言学习第十八天&#xff0c;做昨天按位运算符的习题 /* 练习2-6 编写一个函数setbits(x, p, n, y)&#xff0c;该函数返回对x执行下列操作后的结果值&#xff1a;将x中从第p位开始的n个&#xff08;二进制&#xff09;位设置为y中最右边的n位的值&#xff0c;x的其余各位保…

Java分布式系统和云计算教程

Java分布式系统和云计算教程 大规模学习分布式 Java 应用程序、并行编程、分布式计算和云软件架构 课程英文名&#xff1a;Distributed Systems & Cloud Computing with Java 此视频教程共4.0小时&#xff0c;中英双语字幕&#xff0c;画质清晰无水印&#xff0c;源码附…

map/set疑难一网打尽(含经典面试)

set的作用&#xff1a;判断某⼀个元素是不是在⼀个组⾥⾯ map的作用&#xff1a;映射&#xff0c;相当于字典&#xff0c;把⼀个值映射成另⼀个值&#xff0c;可以创建字典 首先要了解map和set常用的操作&#xff0c;对于stl容器&#xff0c;无非就是增删查改&#xff0c;但对…

20221203今天的世界发生了什么

///光大银行&#xff1a;执行董事、行长付万军辞任 于2022年12月2日向本行董事会提交辞呈&#xff0c;辞去本行执行董事、董事会风险管理委员会主任委员及委员、普惠金融发展和消费者权益保护委员会主任委员及委员、战略委员会委员及行长职务 ///奈飞据称将扩大“预览俱乐部”…

最棘手的Java面试题(上)

这是收集的10个最棘手的Java面试问题列表。这些问题主要来自 Java 核心部分 ,不涉及 Java EE 相关问题。你可能知道这些棘手的 Java 问题的答案&#xff0c;或者觉得这些不足以挑战你的 Java 知识&#xff0c;但这些问题都是容易在各种 Java 面试中被问到的&#xff0c;而且包括…

基于纳芯微产品的尾灯方案介绍

文章目录1.前言2.方案简介2.1 概述2.2 功能介绍2.3 DEMO资料3.主要器件介绍3.1 LED Driver3.2 LDO3.3 CAN\LIN收发器4.演示视频5.推荐阅读1.前言 最近拜访一些做尾灯模组的客户了解到&#xff0c;目前LED Driver依然紧缺&#xff0c;特别是TPS929120&#xff0c;BD18331这些差…

Unity Debug的简单封装

对Unity Debug的简单封装 使用前提&#xff1a; Project Settings-Player-Other Settings-Script Define Symbols添加 EnableLog&#xff0c;点击Apply 测试代码&#xff1a; using MTools.Debuger; using UnityEngine;public class NewBehaviourScript : MonoBehaviour {p…

【每日一题Day46】LC1796字符串中第二大的数字 | 模拟

字符串中第二大的数字【LC1796】 Given an alphanumeric string s, return the second largest numerical digit that appears in s, or -1 if it does not exist. An alphanumeric string is a string consisting of lowercase English letters and digits. 快快学完今天的&am…