SpringBoot MySQL BinLog 监听数据变化(多库多表)

devtools/2024/10/19 4:21:05/

开始

1:引入mysql-binlog-connector-java.jar

    <!-- binlog --><dependency><groupId>com.zendesk</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.27.1</version></dependency><!-- guava --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency>

2:配置文件

#用户必须要有权限
binlog:# 服务器地址host: localhostport: 3306username: rootpassword: 123456# 监听数据库与表,隔开,格式[库.表,,,]dbTable: 库.表,库1.表1,库1.表2serverId: 1

:1:mysql8.0之后binlog是默认开启的

        2:需要一个mysql查看binlog的权限用户

获取配置文件参数 BinLogConfig

java">/*** @Description binlog配置* @Author WangKun* @Date 2024/8/8 15:01* @Version*/
@Data
@Component
public class BinLogConfig {/*** mysql服务地址**/@Value("${binlog.host}")private String host;/*** mysql数据库端口号**/@Value("${binlog.port}")private int port;/*** 查看BinLog权限用户名**/@Value("${binlog.username}")private String username;/*** 查看BinLog权限密码**/@Value("${binlog.password}")private String password;/*** 库表**/@Value("${binlog.dbTable}")private String dbTable;/*** 服务标识**/@Value("${binlog.serverId}")private Integer serverId;/*** 获取所有库表,并转化**/private List<String> tables;public List<String> getTables() {if (StringUtils.hasText(dbTable)){tables = Arrays.asList(dbTable.split(BinLogUtils.COMMA));}return tables;}}

BinLog与字段类型实体对象

java">/*** @Description Binlog实体对象* @Author WangKun* @Date 2024/8/8 16:56* @Version*/
@Data
public class BinLog implements Serializable {/*** 库表**/private String dbTable;/*** 事件类型**/private EventType eventType;/*** 存储字段-之前的值**/private Map<String, Serializable> before;/*** 存储字段-之后的值**/private Map<String, Serializable> after;/*** 存储字段--类型**/private Map<String, Field> fields;}
java">/*** @Description 字段* @Author WangKun* @Date 2024/8/8 16:33* @Version*/
@AllArgsConstructor
@Data
public class Field implements Serializable {/*** 数据库**/public String schema;/*** 表**/public String table;/*** 列索引位置**/public int inx;/*** 列名**/public String colName;/*** 类型**/public String dataType;}

BinLog事件类型枚举(新增,修改,删除)

java">/*** @Description BinLog事件枚举* @Author WangKun* @Date 2024/8/19 15:23* @Version*/
@Getter
@AllArgsConstructor
public enum BinLogEventEnum {WRITE("WRITE"),UPDATE("UPDATE"),DELETE("DELETE");/*** 获取key**/private final String key;}

BinLog工具与BinLog数据操作工具

java">/*** @Description Binlog工具* @Author WangKun* @Date 2024/8/8 17:09* @Version*/
@Slf4j
public class BinLogUtils {/*** 逗号**/public final static String COMMA = ",";/*** 点**/public final static String POINT = ".";/*** 双斜线**/public final static String D_SLASH = "\\";public static final long QUEUE_SLEEP = 1000;/*** @param db* @param table* @Description 拼接DB与Table* @Throws* @Return java.lang.String* @Date 2024-08-12 16:09:10* @Author WangKun**/public static String getDbTable(String db, String table) {return db + "-" + table;}}
java">/*** @Description BinLog数据工具* @Author WangKun* @Date 2024/8/12 16:40* @Version*/
@Slf4j
public class BinLogDataUtils {/*** @param db* @param table* @Description 获取columns集合* @Throws* @Return java.util.Map<java.lang.String, com.harmonywisdom.binlog.entity.Field>* @Date 2024-08-12 16:10:08* @Author WangKun**/public static Map<String, Field> getColumnsMap(String db, String table) {PreparedStatement ps = null;ResultSet rs = null;Connection connection = null;try {//获取数据源DataSource dataSource = SpringUtil.getBean(DataSource.class);connection = dataSource.getConnection();// 执行sql获取表数据String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";ps = connection.prepareStatement(preSql);ps.setString(1, db);ps.setString(2, table);rs = ps.executeQuery();Map<String, Field> map = new HashMap<>(rs.getRow());while (rs.next()) {String column = rs.getString("COLUMN_NAME");int idx = rs.getInt("ORDINAL_POSITION");if (column != null && idx >= 1) {// sql的位置从1开始map.put(column, new Field(rs.getString("TABLE_SCHEMA"), rs.getString("TABLE_NAME"), idx - 1, column, rs.getString("DATA_TYPE")));}}ps.close();rs.close();connection.close();return map;} catch (SQLException e) {log.error("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);} finally {try {if (ps != null) {ps.close();}if (rs != null) {rs.close();}if (connection != null) {connection.close();}} catch (SQLException e) {log.error("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);}}return null;}/*** @param row* @param dbTable* @param columMap* @param eventType* @Description 新增或删除操作数据格式化* @Throws* @Return com.harmonywisdom.binlog.entity.BinLog* @Date 2024-08-12 16:53:07* @Author WangKun**/private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {if (null == row || null == columMap || row.length != columMap.size()) {return null;}// 初始化ItemBinLog item = new BinLog();item.setEventType(eventType);item.setFields(columMap);Map<String, Serializable> beOrAf = new HashMap<>();columMap.forEach((key, colum) -> {Serializable serializable = row[colum.inx];if (serializable instanceof byte[]) {beOrAf.put(key, new String((byte[]) serializable));} else {beOrAf.put(key, serializable);}});// 写操作放after,删操作放beforeif (isWrite(eventType)) {item.setAfter(beOrAf);}if (isDelete(eventType)) {item.setBefore(beOrAf);}return item;}/*** @param mapEntry* @param columMap* @param eventType* @Description 更新操作数据格式化* @Throws* @Return com.harmonywisdom.binlog.entity.BinLog* @Date 2024-08-12 16:52:46* @Author WangKun**/private static BinLog updateColum(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {if (null == mapEntry || null == columMap) {return null;}BinLog item = new BinLog();item.setEventType(eventType);item.setFields(columMap);Map<String, Serializable> be = new HashMap<>();Map<String, Serializable> af = new HashMap<>();columMap.forEach((key, colum) -> {Serializable serializableKey = mapEntry.getKey()[colum.inx];Serializable serializableValue = mapEntry.getValue()[colum.inx];if (serializableKey instanceof byte[]) {be.put(key, new String((byte[]) serializableKey));} else {be.put(key, serializableKey);}if (serializableValue instanceof byte[]) {af.put(key, new String((byte[]) serializableValue));} else {af.put(key, serializableValue);}});item.setBefore(be);item.setAfter(af);return item;}/*** @param data* @param dbTableIdCols* @param dbTableCols* @param eventType* @param queue* @Description 更新数据* @Throws* @Return void* @Date 2024-08-14 17:35:49* @Author WangKun**/public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {if (dbTableIdCols.containsKey(data.getTableId())) {String dbTable = dbTableIdCols.get(data.getTableId());BinLog item = updateColum(row, dbTableCols.get(dbTable), eventType);item.setDbTable(dbTable);try {queue.put(item);} catch (InterruptedException e) {log.error("BinLog 更新数据添加阻塞队列异常:{}", e.getMessage(), e);}}}}/*** @param eventType* @param rows* @param tableId* @param dbTableIdCols* @param dbTableCols* @param queue* @Description 新增与删除数据* @Throws* @Return void* @Date 2024-08-13 17:30:30* @Author WangKun**/public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {for (Serializable[] row : rows) {if (dbTableIdCols.containsKey(tableId)) {String dbTable = dbTableIdCols.get(tableId);BinLog item = insertOrDeletedColum(row, dbTable, dbTableCols.get(dbTable), eventType);item.setDbTable(dbTable);try {queue.put(item);} catch (InterruptedException e) {log.error("BinLog 新增或者删除数据添加阻塞队列异常:{}", e.getMessage(), e);}}}}}

BinLog监听

java">/*** @Description 监听(@FunctionalInterface确保该接口只有以一个抽象方法)* @Author WangKun* @Date 2024/8/8 17:31* @Version*/
@FunctionalInterface
public interface BinLogListener {void onEvent(BinLog binLog);}
java">/*** @Description MySQL监听* @Author WangKun* @Date 2024/8/8 17:32* @Version*/
@Slf4j
public class MySQLBinLogListener implements BinaryLogClient.EventListener {/*** BinLog连接信息**/private final BinaryLogClient client;/*** 阻塞队列,存放信息**/private final BlockingQueue<BinLog> queue;/*** 线程池**/private final ExecutorService executorService;/*** 存放每张数据表对应的listener器,允许将多个值存储在单个键下(每张表一个监听器)**/private final Multimap<String, BinLogListener> listeners;/*** 存放监控所有库表结构**/private final Map<String, Map<String, Field>> dbTableCols;/*** 存放改变的库表结构**/private final Map<Long, String> dbTableIdCols;/*** @param conf* @Description 监听器初始化配置* @Throws* @Return* @Date 2024-08-13 16:53:18* @Author WangKun**/public MySQLBinLogListener(BinLogConfig conf) {BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());EventDeserializer eventDeserializer = new EventDeserializer();// 序列化eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);client.setEventDeserializer(eventDeserializer);client.setServerId(conf.getServerId());this.client = client;this.queue = new ArrayBlockingQueue<>(ThreadPoolConfig.queueCapacity);this.listeners = ArrayListMultimap.create();this.dbTableCols = new ConcurrentHashMap<>();this.dbTableIdCols = new ConcurrentHashMap<>();// 开启线程池this.executorService = ThreadPoolUtils.create().setPrefixName("Binlog-Listener-Thread").setCorePoolSize(6).build();}/*** @param event* @Description 监听处理, 只支持MySQL中BinLog的ROW模式的* @Throws* @Return void* @Date 2024-08-13 16:54:01* @Author WangKun**/@Overridepublic void onEvent(Event event) {EventType eventType = event.getHeader().getEventType();// 装配库表结构if (eventType == EventType.TABLE_MAP) {TableMapEventData tableData = event.getData();String dbTable = BinLogUtils.getDbTable(tableData.getDatabase(), tableData.getTable());if (dbTableCols.containsKey(dbTable)) {dbTableIdCols.put(tableData.getTableId(), dbTable);}}//新增数据if (EventType.isWrite(eventType)) {WriteRowsEventData data = event.getData();BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);} else if (EventType.isUpdate(eventType)) {// 更新数据UpdateRowsEventData data = event.getData();BinLogDataUtils.updateData(data, dbTableIdCols, dbTableCols, eventType, queue);} else if (EventType.isDelete(eventType)) {// 删除数据DeleteRowsEventData data = event.getData();BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);}}/*** @param db* @param table* @param listener* @Description 注册监听* @Throws* @Return void* @Date 2024-08-13 17:32:44* @Author WangKun**/public void registerListener(String db, String table, BinLogListener listener) {String dbTable = BinLogUtils.getDbTable(db, table);// 连接获取字段集合Map<String, Field> cols = BinLogDataUtils.getColumnsMap(db, table);// 保存字段信息dbTableCols.put(dbTable, cols);// 保存当前注册的listenerlisteners.put(dbTable, listener);}/*** @param* @Description 开启异步多线程消费* @Throws* @Return void* @Date 2024-08-13 18:02:48* @Author WangKun**/@Asyncpublic void openThreadConsumeBinLog(){client.registerEventListener(this);for (int i = 0; i < ThreadPoolConfig.corePoolSize*ThreadPoolConfig.CPU_NUMS; i++) {executorService.execute(() -> {// 轮询监控while (true) {if (!queue.isEmpty()) {try {BinLog binLogQueue = queue.take();listeners.get(binLogQueue.getDbTable()).forEach(binLogListener -> binLogListener.onEvent(binLogQueue));} catch (InterruptedException e) {log.error("BinLog多线程消费异常:{}", e.getMessage(), e);}}}});}try {//连接(不设置时间将会使用主线程)client.connect(BinLogUtils.QUEUE_SLEEP);} catch (Exception e) {log.error("BinLog多线程连接消费异常:{}", e.getMessage(), e);}}
}
java">/*** @Description 初始化Binlog监听* @Author WangKun* @Date 2024/8/9 10:36* @Version*/
@Slf4j
@RequiredArgsConstructor
@Component
@Order(value = 1)
public class BinLogInitListener implements CommandLineRunner {/*** 资源注入**/private final BinLogConfig config;/*** @param args* @Description 初始化* @Throws* @Return void* @Date 2024-08-13 14:07:49* @Author WangKun**/@Overridepublic void run(String... args) throws Exception {try {// 初始化监听器MySQLBinLogListener mySqlBinLogListener = new MySQLBinLogListener(config);this.getListMap().forEach((db, tables) -> {tables.forEach(table -> {mySqlBinLogListener.registerListener(db, table, info -> {if(info.getEventType().name().contains(BinLogEventEnum.UPDATE.getKey())){log.info("库.表: {}, 修改之前:{}" ,db+"."+table,info.getBefore().toString());log.info("库.表: {}, 修改之后:{}" ,db+"."+table,info.getAfter().toString());}if(info.getEventType().name().contains(BinLogEventEnum.WRITE.getKey())){log.info("库.表: {}, 新增: {}" ,db+"."+table,info.getAfter().toString());}if(info.getEventType().name().contains(BinLogEventEnum.DELETE.getKey())){log.info("库.表: {}, 删除: {}" ,db+"."+table,info.getBefore().toString());}});});});// 开启多线程消费mySqlBinLogListener.openThreadConsumeBinLog();} catch (Exception e) {log.error("BinLog初始化监听异常:{}", e.getMessage(), e);}}/*** @param* @Description 初始化监听库表* @Throws* @Return java.util.Map<java.lang.String, java.util.List < java.lang.String>>* @Date 2024-08-12 16:19:32* @Author WangKun**/private Map<String, List<String>> getListMap() {Map<String, List<String>> map = new ConcurrentHashMap<>();try {for (String key : config.getTables()) {// 正则转义,要加双斜线String[] split = key.split(BinLogUtils.D_SLASH + BinLogUtils.POINT);if (split.length != 2) {log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", key);throw new Exception("BinLog配置同步,类型错误 [库名.表名]。请正确配置:" + key);}map.computeIfAbsent(split[0], k -> new ArrayList<>()).add(split[1]);}return map;} catch (Exception e) {log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", e.getMessage(), e);}return map;}}

目录结构

启动IDEA,在控制台出现以下信息,成功

2024-08-19 17:40:47.129  INFO 493984 --- [       blc-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient  : Connected to localhost:3306 at log.000004/7294671 (sid:1, cid:800)

效果:


http://www.ppmy.cn/devtools/97043.html

相关文章

使用DOM破坏启动xss

目录 实验环境&#xff1a; 分析&#xff1a; 找破坏点&#xff1a; 查看源码找函数&#xff1a; 找到了三个方法&#xff0c;loadComments、escapeHTM 、displayComments loadComments escapeHTM displayComments&#xff1a; GOGOGO 实验环境&#xff1a; Lab: Exp…

前端面试——js作用域

说一说JS的作用域吧 作用域的分类 作用域分为&#xff1a;全局作用域&#xff0c;函数作用域&#xff0c;块级作用域 作用域的特性 全局作用域&#xff1a; 能够让变量和函数在全局位置访问&#xff0c;其挂载在浏览器的window对象下面 其中var定义的变量和function函数存…

品牌出海新策略:携手TikTok达人,合作孵化IP实现双赢

在当今数字化时代&#xff0c;TikTok达人的IP孵化作为一种创新的合作模式&#xff0c;正逐渐成为品牌出海的新兴策略。通过与有潜力的TikTok达人合作&#xff0c;共同孵化新的IP&#xff0c;品牌不仅能够突破传统营销的局限&#xff0c;还能实现与达人共同成长的双赢局面。本文…

P1587 [NOI2016] 循环之美

[题目通道]([NOI2016] 循环之美 - 洛谷) #include<map> #include<cmath> #include<cstdio> #include<algorithm> #define fp(i,a,b) for(int ia,Ib;i<I;i) #define file(s) freopen(s".in","r",stdin),freopen(s".out&qu…

Go Channel 详解

概述 在 Go 语言中&#xff0c;channel 是一种用于在 goroutine 之间传递数据的机制。它提供了同步和通信的能力&#xff0c;使得并发编程变得更加简单和安全。Channel 在 Go 语言中的设计是类型安全的&#xff0c;并且支持发送和接收两种操作。 基本概念 创建通道 创建一个…

HTML组件上传

<!doctype html> <html> <head> <meta charset"utf-8"> <title>无标题文档</title> </head><fieldset style"width: 200px"><legend>文本组建上传</legend><form action"#" me…

【论文阅读】Enhance Model Stealing Attack via Label Refining(2022)

摘要 With machine learning models(机器学习模型) being increasingly(越来越多) deployed(部署), model stealing attacks(模型窃取攻击) have raised an increasing interest. Extracting decision-based models(基于决策的模型窃取) is a more challenging task…

JavaScript学习笔记(十二):JS Web API

1、Web API - 简介 Web API 是开发人员的梦想。 它可以扩展浏览器的功能它可以极大简化复杂的功能它可以为复杂的代码提供简单的语法 1.1 什么是 Web API&#xff1f; API 指的是应用程序编程接口&#xff08;Application Programming Interface&#xff09;。 Web API 是 …