背景
当项目有很多数据源的时候,通常会在启动的时候就把数据源连接加载缓存上,当数据源进行变更后如何自动实时将缓存的数据源进行更新呢?如果是单个项目直接调接口方法就行了,但是涉及到分布式多个系统呢?
解决方案:
使用Redis轻量级消息队列,它可以实现实时通知,实时状态更新等功能,配合AOP实现自动更新数据源状态。
下面结合代码写一个使用示例:
1.首先创建数据源对象
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/**** @author ws* @since 2022-08-12*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
@TableName("ed_datasource_info")
public class DatasourceInfo implements Serializable {private static final long serialVersionUID = 1L;@TableId(value = "id", type = IdType.AUTO)private Integer id;/*** 数据源编码*/@TableField("datasource_code")private String datasourceCode;/*** 数据源名称*/@TableField("datasource_name")private String datasourceName;/*** 数据源类型*/@TableField("datasource_type")private String datasourceType;/*** 类型 0:数据库 1:Rest-api*/@TableField("type")private Integer type;/*** 创建人*/@TableField("creator")private String creator;/*** 模式*/@TableField("schema_name")private String schemaName;@TableField("create_time")private Date createTime;@TableField("update_time")private Date updateTime;/*** 数据源连接信息*/@TableField("link_json")private String linkJson;}
2.初始化启动加载数据源
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.sztech.common.constant.DataSourceTypeEnum;
import com.sztech.entity.DatasourceInfo;
import com.sztech.service.DatasourceInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DataSourceRecovery implements InitializingBean {@Resourceprivate DatasourceInfoService datasourceInfoService;@Overridepublic void afterPropertiesSet() throws Exception {refresh();}private void refresh() throws Exception{this.refresh(null);}public void refresh(String sourceCode){QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();queryWrapper.eq("type", DataSourceTypeEnum.DB.getKey());if(StringUtils.isNotBlank(sourceCode)){queryWrapper.eq("datasource_code",sourceCode);}List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);if(CollectionUtils.isEmpty(list)){return;}CountDownLatch countDownLatch = new CountDownLatch(list.size());for(DatasourceInfo datasourceInfo : list){new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start();}try {countDownLatch.await(1,TimeUnit.MINUTES);} catch (InterruptedException e) {log.error("数据源加载等待超时",e);}}/*** 多线程加载数据源,提高启动速度*/static class ReadloadThread implements Runnable {private DatasourceInfo datasourceInfo;private CountDownLatch countDownLatch;public ReadloadThread() {}public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) {this.datasourceInfo = datasourceInfo;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {DataSourceContext.setClientMap(datasourceInfo);DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo);}catch (Exception e){log.error("datasource:{},加载失败",datasourceInfo.getDatasourceCode(),e);}finally {countDownLatch.countDown();}}}
}
3.创建DataSourceContext,用于数据源缓存数据源连接
import com.sztech.core.tool.DBTool;
import com.sztech.entity.DatasourceInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: wangsheng* Date: 2022-02-11* Time: 14:05*/
public class DataSourceContext {/*** 客户端缓存*/private final static Map<String, IClient> clientMap = new ConcurrentHashMap<>();/*** 数据源配置缓存*/private final static Map<String, DatasourceInfo> configMap = new ConcurrentHashMap<>();public static void setClientMap(DatasourceInfo datasourceInfo) {if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){try {clientMap.get(datasourceInfo.getDatasourceCode()).close();}catch (Exception ignored){}}clientMap.put(datasourceInfo.getDatasourceCode(),DBTool.buildClient(datasourceInfo));}public static void setConfigMap(String key, DatasourceInfo datasourceInfo) {configMap.put(key, datasourceInfo);}public static void removeClientMap(String key) {if(clientMap.containsKey(key)){try {clientMap.get(key).close();}catch (Exception ignored){}}clientMap.remove(key);}public static void removeConfigMap(String key) {configMap.remove(key);}public static IClient getClientMap(String key) {IClient client = clientMap.get(key);if(null == client){throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));}return client;}public static DatasourceInfo getConfigMap(String key) {DatasourceInfo datasourceInfo = configMap.get(key);if(null == datasourceInfo){throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));}return datasourceInfo;}
}
package com.sztech.core.tool;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Instance;
import com.sztech.common.constant.ResultEnum;
import com.sztech.common.exception.BizException;
import com.sztech.common.utils.ReflectionUtils;
import com.sztech.common.utils.SpringUtils;
import com.sztech.common.utils.ThreadPoolUtil;
import com.sztech.core.datasource.DataSourceContext;
import com.sztech.core.datasource.IClient;
import com.sztech.core.datasource.rdbms.RdbmsConfig;
import com.sztech.entity.*;
import com.sztech.pojo.dto.ColumnDto;
import com.sztech.pojo.dto.QueryTableDto;
import com.sztech.pojo.dto.TableDto;
import com.sztech.pojo.node.PartitionColumn;
import com.sztech.pojo.vo.*;
import com.sztech.service.CreateTableLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;/*** Description:* User: wangsheng* Date: 2022-08-12* Time: 16:59*/
@Slf4j
public class DBTool {/*** 建立客户端*/public static IClient buildClient(DatasourceInfo datasourceInfo) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);return client.open(datasourceInfo);}/*** 测试数据源** @return*/public static boolean testSource(DatasourceInfo datasourceInfo) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);return client.testSource(datasourceInfo);}public static List<String> getSchemas(DatasourceInfo datasourceInfo) {List<String> schemas = new ArrayList<>();Connection conn = null;try {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);Class.forName(client.driverName());String linkJson = datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);conn = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword());DatabaseMetaData metadata = conn.getMetaData();try (ResultSet resultSet = metadata.getSchemas()) {while (resultSet.next()) {String schemaName = resultSet.getString("TABLE_SCHEM");schemas.add(schemaName);}}} catch (SQLException e) {throw new RuntimeException(e);} catch (ClassNotFoundException e) {throw new RuntimeException(e);} finally {if (conn != null) {try {conn.close();} catch (SQLException ex) {ex.printStackTrace();}}}return schemas;}/*** 获取驱动名称*/public static String getDriverName(String datasourceType) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceType, "type", IClient.class);return client.driverName();}/*** 获取表中列信息*/public static List<ColumnDto> getColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName);}/*** 获取表中分区列信息*/public static List<String> getPartitionColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName);}/*** 获取表信息*/public static List<String> getTableNames(String datasourceCode, String tableNameLike) {return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike);}/*** 获取表信息*/public static List<TableDto> getTables(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getTables();}/*** 获取单个表信息*/public static TableDto getTableByName(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName);}/*** 获取单个表信息(创建时间,字段数)*/public static TableDto getTableField(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName);}/*** 获取表信息(获取创建时间)** @param dto* @return*/public static TableInfoVo getTableData(QueryTableDto dto) {IClient client = DataSourceContext.getClientMap(dto.getDataSourceCode());return client.getTableInfo(dto.getTableName());}/*** 根据字段type建表*/public static void createTableByColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSql(columnDtos, tableName, true);log.info("执行建表语句为:" + JSON.toJSONString(sqls));sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));}/*** 根据字段type建表*/public static void createTableByNotTransformedColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSql(columnDtos, tableName, false);log.info("执行建表语句为:" + JSON.toJSONString(sqls));sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));}/*** 创建索引* 注: oracle 索引名在整个库里必须唯一 否则建立失败** @param datasourceCode 数据源编码* @param tableName 表名* @param filedNames filed1,filed2...* @param unique 唯一*/public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) {DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique);}/*** sql校验** @param datasourceCode* @param sql* @param sourceType* @return*/public static Map<String, Object> checkSql(String datasourceCode, String sql, String sourceType) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.checkSql(sql, sourceType);}/*** 根据sql创建表** @param datasourceCode* @param sql*/public static void createTableWithSql(String datasourceCode, String sql) {IClient client = DataSourceContext.getClientMap(datasourceCode);log.info("执行建表语句为:" + JSON.toJSONString(sql));client.executeCommandSyn(sql, new HashMap<>());
// DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql);}/*** 删除表** @param datasourceCode* @param tableName*/public static void dropTable(String datasourceCode, String tableName) {DataSourceContext.getClientMap(datasourceCode).dropTable(tableName);}/*** 单表查询数据*/public static List<Map<String, Object>> selectDataFromTable(String datasourceCode, List<DataTableColumn> columns, String tableName, String search, Integer limit) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getSelectSql(columns, tableName, search, limit);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, null);}/*** 单表查询数据*/public static List<Map<String, Object>> selectFromTable(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 单表查询数据*/public static List<Map<String, Object>> selectFromForBackUp(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 单表查询数据*/public static List<Map<String, Object>> selectFromFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 查询单表是否存在文件名*/public static List<Map<String, Object>> getExistOldName(String datasourceCode, String tableName, String search) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getExistOldName( tableName, search);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, null);}/*** 单表查询数据(查询归集表专门使用)*/public static List<Map<String, Object>> selectCollectTable(CollectConditionVo vo) {IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());// 获取查询语句String selectSql = client.getCollectTable(vo);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, vo.getParams());}/*** 单表查询数据量*/public static Map<String, Object> getFormCount(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getCountSql(columns, searchColumns, tableName, search, params);log.info("执行语句:" + selectSql);return client.getCount(selectSql, params);}/*** 查询区县库表的数据量*/public static Map<String, Object> getCountryCount(String datasourceCode, String tableName, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql ="select count(1) as count from "+tableName;log.info("执行语句:" + selectSql);return client.getCount(selectSql, params);}public static Map<String, Object> getFormCountForFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getCountSqlForFile(columns, searchColumns, tableName, search, params);log.info("执行语句:" + selectSql);return client.getCount(selectSql, params);}/*** 查询表数据量*/public static Long getTableRows(String datasourceCode, String tableName) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getTableRows(tableName);}/*** 查询表对应分区数据量*/public static Long getTablePartitionRows(String datasourceCode, String tableName, List<PartitionColumn> partitionColumns) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getTablePartitionRows(tableName, partitionColumns);}/*** 查询表数据量*/public static Integer getTablePhysicalSize(String datasourceCode, String tableName) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getPhysicalSize(tableName);}/*** 获取表最大值** @param datasourceCode 数据源编码* @param tableName 表名* @param incColumnName 自增列名* @return {@link Integer}*/public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition);}public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition);}public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition);}/*** 字段存在** @param datasourceCode 数据源编码* @param tableName 表名* @param fieldName 字段名* @return {@link Boolean}*/public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) {List<ColumnDto> columns = getColumns(datasourceCode, tableName);return columns.stream().anyMatch(s -> s.getName().equalsIgnoreCase(fieldName));}/*** 数据预览 获取前十条** @return*/public static String dataView(String datasourceCode, String tableName, String condition) {return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition);}/*** 创建分区临时表* odps适用*/public static void createPartitionedTableByColumns(List<ColumnDto> columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) {DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField);}/*** 同步执行命令*/public static void executeCommandSyn(String datasourceCode, String command, Map<String, Object> params) {DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params);}/*** 异步执行命令* odps适用*/public static Instance executeCommandASyn(String datasourceCode, String command, Map<String, Object> params) {return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params);}/*** 是否有导出权限* odps适用** @param datasourceCode 数据源编码* @param tableName 表名* @return {@link Boolean}*/public static Boolean exportEnable(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName);}/*** 插入单条数据** @param datasourceCode* @param vo* @return*/public static Integer insert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).insert(vo);}/*** 批量插入数据** @param datasourceCode* @param vo* @return*/public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo);}/*** 批量插入数据** @param datasourceCode* @param vo* @return*/public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo);}/*** 这个方法不需要分装参数,直接传字段名称list就好了* @param datasourceCode* @param vo* @return*/public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo);}/*** 删除数据** @param datasourceCode* @param vo* @return*/public static Integer delete(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).delete(vo);}/*** 这个删除方法可以自定义条件服号* @param datasourceCode* @param vo* @return*/public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo);}public static Integer deleteForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo);}public static String deleteForPre(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo);}/*** 修改数据** @param datasourceCode* @param vo* @return*/public static Integer update(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).update(vo);}/*** 修改数据** @param datasourceCode* @param vo* @return*/public static Integer updateForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo);}/*** 获取表单基本信息** @param vo* @return*/public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo);}/*** 根据字段type建表*/public static void createCollectTable(List<CatalogColumnInfo> columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition);log.info("执行建表语句为:" + JSON.toJSONString(sqls));try {sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));} catch (Exception e) {e.printStackTrace();String message = e.getMessage();if (e instanceof BizException) {BizException exception = (BizException) e;message = exception.getMsg();}log.error("建表错误=======================>{}:", message);ThreadPoolExecutor instance = ThreadPoolUtil.instance();String finalMessage = message;instance.submit(() -> {CreateTableLog createTableLog = new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(tableName);CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");}}/*** 根据字段type建表*/public static void updateCollectTable(CreateCollectVo vo) {IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());List<String> sqls = client.buildTableSqlForUpdate(vo);log.info("执行更新表语句为:" + JSON.toJSONString(sqls));try {sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));} catch (Exception e) {e.printStackTrace();String message = e.getMessage();if (e instanceof BizException) {BizException exception = (BizException) e;message = exception.getMsg();}log.error("建表错误=======================>{}:", message);ThreadPoolExecutor instance = ThreadPoolUtil.instance();String finalMessage = message;instance.submit(() -> {CreateTableLog createTableLog = new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(vo.getTableName());CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});log.info("建表失败了开始准备抛出了-------------------------------------->");throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");}}/*** 获取数据源下所有表信息(包括表名,表字段数,表创建时间)** @param datasourceCode* @param tableNameLike* @return*/public static List<TableDto> getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) {return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName);}/*** 获取表数量* @param datasourceCode* @param tableName* @return*/public static Long getTableCountSchema(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName);}public static Integer getTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName);}public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName);}/*** 获取符号* @return*/public static String getSymbol(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getSymbol();}}
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class ReflectionUtils {private static final Map<String, Set<?>> clazzMap = new ConcurrentHashMap<>();private static final ReentrantLock clazzLock = new ReentrantLock();/*** 通过反射获取接口/抽象类的所有实现类* 通过缓存类信息减少查找时间* 接口与抽象类必须放在实现类的同级目录或者父目录*/@SuppressWarnings("unchecked")public static <T> Set<Class<? extends T>> getReflections(Class<T> clazz) {if (clazzMap.containsKey(clazz.getName())) {return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());}try {clazzLock.lock();if (clazzMap.containsKey(clazz.getName())) {return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());}Reflections reflections = new Reflections(clazz.getPackage().getName());Set<Class<? extends T>> subTypesOf = reflections.getSubTypesOf(clazz);clazzMap.put(clazz.getName(), subTypesOf);return subTypesOf;} catch (Exception e) {log.error("getReflections error", e);} finally {clazzLock.unlock();}return new HashSet<>();}/*** 通过反射获取新对象* @param type type* @param methodName methodName* @param clazz clazz* @return <T>*/public static <T> T getInstance(String type, String methodName, Class<T> clazz) {Set<Class<? extends T>> set = getReflections(clazz);for (Class<? extends T> t : set) {try {//排除抽象类if (Modifier.isAbstract(t.getModifiers())) {continue;}Object obj = t.getMethod(methodName).invoke(t.newInstance());if (type.equalsIgnoreCase(obj.toString())) {return t.newInstance();}} catch (Exception e) {log.error("getInstance error", e);}}throw new RuntimeException("implement class not exist");}/*** 通过反射获取新对象* @param type type* @param methodName methodName* @param clazz clazz* @return <T>*/public static <T> T getInstanceFromCache(String type, String methodName, Class<T> clazz) {return getInstance(type, methodName, clazz);}}
client客户接口端适配多种数据源
import com.ws.websocket.entity.DatasourceInfo;/*** Description:* User: wangsheng* Date: 2022-12-30* Time: 10:31*/
public interface IClient {/*** 连接数据源** @param dataSourceInfo 数据源信息* @return {@link IClient}*/IClient open(DatasourceInfo dataSourceInfo);/*** 关闭数据源*/void close();/*** 驱动类型** @return*/String driverName();/*** 数据源类型** @return {@link String}*/String type();/*** 测试数据源** @param datasourceInfo* @return*/boolean testSource(DatasourceInfo datasourceInfo);}
import com.ws.websocket.entity.DatasourceInfo;
//公共查询
public abstract class AbsClient implements IClient {protected DatasourceInfo datasourceInfo;
}
package com.ws.websocket.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;@Slf4j
public abstract class AbsRdbmsClient extends AbsClient {protected DruidDataSource druidDataSource;@Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig rdbmsConfig = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);DruidDataSource druidDataSource = new DruidDataSource();druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(30);druidDataSource.setMaxActive(300);druidDataSource.setMaxWait(10000);druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重试循环druidDataSource.setConnectionErrorRetryAttempts(3);// 重试三次druidDataSource.setTimeBetweenConnectErrorMillis(3000);druidDataSource.setLoginTimeout(3);druidDataSource.setUrl(rdbmsConfig.getJdbcUrl());druidDataSource.setDriverClassName(driverName());druidDataSource.setUsername(rdbmsConfig.getUsername());//解密// druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword()));druidDataSource.setPassword(rdbmsConfig.getPassword());// 设置 MetaUtil 工具类所需参数Properties properties = new Properties();properties.put("remarks", "true");properties.put("useInformationSchema", "true");druidDataSource.setConnectProperties(properties);this.druidDataSource = druidDataSource;this.datasourceInfo = datasourceInfo;return this;}@Overridepublic void close() {druidDataSource.close();}@Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {Connection connection = null;try {Class.forName(driverName());String linkJson = datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);connection = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword());// 有效if (connection.isValid(3)) {return true;} else {return false;}} catch (SQLException e) {log.error("数据源测试失败", e);return false;} catch (ClassNotFoundException e) {log.error("未找到驱动信息:{}", driverName());return false;} finally {if (connection != null) {try {connection.close();} catch (SQLException ex) {ex.printStackTrace();}}}}@Dataclass RdbmsConfig {private String jdbcUrl;private String username;private String password;public void setSSL() {String lowerCase = this.jdbcUrl.toLowerCase();if (!lowerCase.contains("usessl")) {if (this.jdbcUrl.contains("?")) {this.jdbcUrl = this.jdbcUrl + "&useSSL=false";} else {this.jdbcUrl = this.jdbcUrl + "?useSSL=false";}}}}
}
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;@Slf4j
public class DmClient extends AbsRdbmsClient {private String schema;@Overridepublic String type() {return "DMDB";}@Overridepublic String driverName() {return "dm.jdbc.driver.DmDriver";}@Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig commonLinkParams = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);this.schema = StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase();datasourceInfo.setSchemaName(schema);return super.open(datasourceInfo);}@Overridepublic void close() {}@Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {return false;}
}
4.创建redis订阅数据源操作频道配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** @Author: wangsheng* @Data: 2022/8/16 16:40*/
@Slf4j
@Configuration
public class RedisListenerConfig {/*** 订阅数据源操作频道** @param connectionFactory connectionFactory* @param dataSourceMonitor 数据源监视器* @return RedisMessageListenerContainer*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,DataSourceMonitor dataSourceMonitor){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(dataSourceMonitor, new PatternTopic("DATASOURCE_CHANNEL"));log.info(dataSourceMonitor.getClass().getName() + " 订阅频道 {}", "DATASOURCE_CHANNEL");return container;}
}
5.redis监听数据源操作
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;/*** Description: redis监听数据源操作* User: wangsheng* Date: 2022-08-12* Time: 17:07*/
@Slf4j
@Component
public class DataSourceMonitor implements MessageListener {@Overridepublic void onMessage(Message message, byte[] bytes) {JSONObject box = JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8));String operation = box.getString("key");if ("SAVE_OR_UPDATE".equals(operation)) {// 更新 DataSourceContextDatasourceInfo datasourceInfo = box.getObject("value", DatasourceInfo.class);if (datasourceInfo.getType().equals(0)) {String datasourceCode = datasourceInfo.getDatasourceCode();DataSourceContext.setConfigMap(datasourceCode, datasourceInfo);DataSourceContext.setClientMap(datasourceInfo);log.info("redis 监听到数据源 {} 新增或更新,更新 DataSourceContext 完成", datasourceCode);}} else {String datasourceCode = box.getString("value");// 更新 DataSourceContextDataSourceContext.removeConfigMap(datasourceCode);DataSourceContext.removeClientMap(datasourceCode);log.info("redis 监听到数据源 {} 删除,更新 DataSourceContext 完成", datasourceCode);}}}
6.创建AOP自动监听数据源变化
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** @Author: wangsheng* @Data: 2022/8/15 16:37*/
@Slf4j
@Aspect
@Component
public class DatasourceAspect {@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 新增或编辑数据源时发布 Redis 消息*/@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..))", returning = "datasourceInfo")public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) {HashMap<String, Object> box = new HashMap<>(4);box.put("key", "SAVE_OR_UPDATE");box.put("value", datasourceInfo);// 发布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL",JSONObject.toJSONString(box));log.info("新增或更新数据源 {} 方法切面发布 Redis 消息完成", datasourceInfo.getDatasourceCode());}/*** 删除数据源时发布 Redis 消息*/@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..))", returning = "datasourceCode")public void delete(JoinPoint joinPoint, String datasourceCode) {Map<String, Object> box = new HashMap<>(4);box.put("key", "DELETE");box.put("value", datasourceCode);// 发布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));log.info("删除数据源 {} 方法切面发布Redis消息完成", datasourceCode);}
}
这样就解决了数据源连接信息自动加载更新同步的问题,但还是有个问题,当数据源重启后,缓存的连接信息会失效,且AOP无法监听到数据源重启变动,这个时候还需要一个定时任务对数据源进行连接测试,如果失效则重新连接缓存上。
7.创建定时任务
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ws.websocket.entity.DatasourceInfo;
import com.ws.websocket.service.DatasourceInfoService;
import com.ws.websocket.util.DBTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;@Component
@RequiredArgsConstructor
@Slf4j
public class DataSourceRetryConnectSchedule {@Resourceprivate DatasourceInfoService datasourceInfoService;@Resourceprivate StringRedisTemplate stringRedisTemplate;//每2小时执行一次@Scheduled(cron = "0 0 */2 * * ?")public void RetryConnect() {log.info("开始监测数据源连接");QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();queryWrapper.eq("type", 0);List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);if (CollectionUtils.isEmpty(list)) {return;}for (DatasourceInfo datasourceInfo : list) {Boolean bb = DBTool.testSource(datasourceInfo);if (!bb) {log.info("数据源重连{}"+datasourceInfo.getDatasourceName());HashMap<String, Object> box = new HashMap<>(4);box.put("key", "SAVE_OR_UPDATE");box.put("value", datasourceInfo);// 发布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));}}}
}