开始
1:引入mysql-binlog-connector-java.jar
<!-- binlog --><dependency><groupId>com.zendesk</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.27.1</version></dependency><!-- guava --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency>
2:配置文件
#用户必须要有权限
binlog:# 服务器地址host: localhostport: 3306username: rootpassword: 123456# 监听数据库与表,隔开,格式[库.表,,,]dbTable: 库.表,库1.表1,库1.表2serverId: 1
注:1:mysql8.0之后binlog是默认开启的
2:需要一个mysql查看binlog的权限用户
获取配置文件参数 BinLogConfig
java">/*** @Description binlog配置* @Author WangKun* @Date 2024/8/8 15:01* @Version*/
@Data
@Component
public class BinLogConfig {/*** mysql服务地址**/@Value("${binlog.host}")private String host;/*** mysql数据库端口号**/@Value("${binlog.port}")private int port;/*** 查看BinLog权限用户名**/@Value("${binlog.username}")private String username;/*** 查看BinLog权限密码**/@Value("${binlog.password}")private String password;/*** 库表**/@Value("${binlog.dbTable}")private String dbTable;/*** 服务标识**/@Value("${binlog.serverId}")private Integer serverId;/*** 获取所有库表,并转化**/private List<String> tables;public List<String> getTables() {if (StringUtils.hasText(dbTable)){tables = Arrays.asList(dbTable.split(BinLogUtils.COMMA));}return tables;}}
BinLog与字段类型实体对象
java">/*** @Description Binlog实体对象* @Author WangKun* @Date 2024/8/8 16:56* @Version*/
@Data
public class BinLog implements Serializable {/*** 库表**/private String dbTable;/*** 事件类型**/private EventType eventType;/*** 存储字段-之前的值**/private Map<String, Serializable> before;/*** 存储字段-之后的值**/private Map<String, Serializable> after;/*** 存储字段--类型**/private Map<String, Field> fields;}
java">/*** @Description 字段* @Author WangKun* @Date 2024/8/8 16:33* @Version*/
@AllArgsConstructor
@Data
public class Field implements Serializable {/*** 数据库**/public String schema;/*** 表**/public String table;/*** 列索引位置**/public int inx;/*** 列名**/public String colName;/*** 类型**/public String dataType;}
BinLog事件类型枚举(新增,修改,删除)
java">/*** @Description BinLog事件枚举* @Author WangKun* @Date 2024/8/19 15:23* @Version*/
@Getter
@AllArgsConstructor
public enum BinLogEventEnum {WRITE("WRITE"),UPDATE("UPDATE"),DELETE("DELETE");/*** 获取key**/private final String key;}
BinLog工具与BinLog数据操作工具
java">/*** @Description Binlog工具* @Author WangKun* @Date 2024/8/8 17:09* @Version*/
@Slf4j
public class BinLogUtils {/*** 逗号**/public final static String COMMA = ",";/*** 点**/public final static String POINT = ".";/*** 双斜线**/public final static String D_SLASH = "\\";public static final long QUEUE_SLEEP = 1000;/*** @param db* @param table* @Description 拼接DB与Table* @Throws* @Return java.lang.String* @Date 2024-08-12 16:09:10* @Author WangKun**/public static String getDbTable(String db, String table) {return db + "-" + table;}}
java">/*** @Description BinLog数据工具* @Author WangKun* @Date 2024/8/12 16:40* @Version*/
@Slf4j
public class BinLogDataUtils {/*** @param db* @param table* @Description 获取columns集合* @Throws* @Return java.util.Map<java.lang.String, com.harmonywisdom.binlog.entity.Field>* @Date 2024-08-12 16:10:08* @Author WangKun**/public static Map<String, Field> getColumnsMap(String db, String table) {PreparedStatement ps = null;ResultSet rs = null;Connection connection = null;try {//获取数据源DataSource dataSource = SpringUtil.getBean(DataSource.class);connection = dataSource.getConnection();// 执行sql获取表数据String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";ps = connection.prepareStatement(preSql);ps.setString(1, db);ps.setString(2, table);rs = ps.executeQuery();Map<String, Field> map = new HashMap<>(rs.getRow());while (rs.next()) {String column = rs.getString("COLUMN_NAME");int idx = rs.getInt("ORDINAL_POSITION");if (column != null && idx >= 1) {// sql的位置从1开始map.put(column, new Field(rs.getString("TABLE_SCHEMA"), rs.getString("TABLE_NAME"), idx - 1, column, rs.getString("DATA_TYPE")));}}ps.close();rs.close();connection.close();return map;} catch (SQLException e) {log.error("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);} finally {try {if (ps != null) {ps.close();}if (rs != null) {rs.close();}if (connection != null) {connection.close();}} catch (SQLException e) {log.error("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);}}return null;}/*** @param row* @param dbTable* @param columMap* @param eventType* @Description 新增或删除操作数据格式化* @Throws* @Return com.harmonywisdom.binlog.entity.BinLog* @Date 2024-08-12 16:53:07* @Author WangKun**/private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {if (null == row || null == columMap || row.length != columMap.size()) {return null;}// 初始化ItemBinLog item = new BinLog();item.setEventType(eventType);item.setFields(columMap);Map<String, Serializable> beOrAf = new HashMap<>();columMap.forEach((key, colum) -> {Serializable serializable = row[colum.inx];if (serializable instanceof byte[]) {beOrAf.put(key, new String((byte[]) serializable));} else {beOrAf.put(key, serializable);}});// 写操作放after,删操作放beforeif (isWrite(eventType)) {item.setAfter(beOrAf);}if (isDelete(eventType)) {item.setBefore(beOrAf);}return item;}/*** @param mapEntry* @param columMap* @param eventType* @Description 更新操作数据格式化* @Throws* @Return com.harmonywisdom.binlog.entity.BinLog* @Date 2024-08-12 16:52:46* @Author WangKun**/private static BinLog updateColum(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {if (null == mapEntry || null == columMap) {return null;}BinLog item = new BinLog();item.setEventType(eventType);item.setFields(columMap);Map<String, Serializable> be = new HashMap<>();Map<String, Serializable> af = new HashMap<>();columMap.forEach((key, colum) -> {Serializable serializableKey = mapEntry.getKey()[colum.inx];Serializable serializableValue = mapEntry.getValue()[colum.inx];if (serializableKey instanceof byte[]) {be.put(key, new String((byte[]) serializableKey));} else {be.put(key, serializableKey);}if (serializableValue instanceof byte[]) {af.put(key, new String((byte[]) serializableValue));} else {af.put(key, serializableValue);}});item.setBefore(be);item.setAfter(af);return item;}/*** @param data* @param dbTableIdCols* @param dbTableCols* @param eventType* @param queue* @Description 更新数据* @Throws* @Return void* @Date 2024-08-14 17:35:49* @Author WangKun**/public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {if (dbTableIdCols.containsKey(data.getTableId())) {String dbTable = dbTableIdCols.get(data.getTableId());BinLog item = updateColum(row, dbTableCols.get(dbTable), eventType);item.setDbTable(dbTable);try {queue.put(item);} catch (InterruptedException e) {log.error("BinLog 更新数据添加阻塞队列异常:{}", e.getMessage(), e);}}}}/*** @param eventType* @param rows* @param tableId* @param dbTableIdCols* @param dbTableCols* @param queue* @Description 新增与删除数据* @Throws* @Return void* @Date 2024-08-13 17:30:30* @Author WangKun**/public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {for (Serializable[] row : rows) {if (dbTableIdCols.containsKey(tableId)) {String dbTable = dbTableIdCols.get(tableId);BinLog item = insertOrDeletedColum(row, dbTable, dbTableCols.get(dbTable), eventType);item.setDbTable(dbTable);try {queue.put(item);} catch (InterruptedException e) {log.error("BinLog 新增或者删除数据添加阻塞队列异常:{}", e.getMessage(), e);}}}}}
BinLog监听
java">/*** @Description 监听(@FunctionalInterface确保该接口只有以一个抽象方法)* @Author WangKun* @Date 2024/8/8 17:31* @Version*/
@FunctionalInterface
public interface BinLogListener {void onEvent(BinLog binLog);}
java">/*** @Description MySQL监听* @Author WangKun* @Date 2024/8/8 17:32* @Version*/
@Slf4j
public class MySQLBinLogListener implements BinaryLogClient.EventListener {/*** BinLog连接信息**/private final BinaryLogClient client;/*** 阻塞队列,存放信息**/private final BlockingQueue<BinLog> queue;/*** 线程池**/private final ExecutorService executorService;/*** 存放每张数据表对应的listener器,允许将多个值存储在单个键下(每张表一个监听器)**/private final Multimap<String, BinLogListener> listeners;/*** 存放监控所有库表结构**/private final Map<String, Map<String, Field>> dbTableCols;/*** 存放改变的库表结构**/private final Map<Long, String> dbTableIdCols;/*** @param conf* @Description 监听器初始化配置* @Throws* @Return* @Date 2024-08-13 16:53:18* @Author WangKun**/public MySQLBinLogListener(BinLogConfig conf) {BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());EventDeserializer eventDeserializer = new EventDeserializer();// 序列化eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);client.setEventDeserializer(eventDeserializer);client.setServerId(conf.getServerId());this.client = client;this.queue = new ArrayBlockingQueue<>(ThreadPoolConfig.queueCapacity);this.listeners = ArrayListMultimap.create();this.dbTableCols = new ConcurrentHashMap<>();this.dbTableIdCols = new ConcurrentHashMap<>();// 开启线程池this.executorService = ThreadPoolUtils.create().setPrefixName("Binlog-Listener-Thread").setCorePoolSize(6).build();}/*** @param event* @Description 监听处理, 只支持MySQL中BinLog的ROW模式的* @Throws* @Return void* @Date 2024-08-13 16:54:01* @Author WangKun**/@Overridepublic void onEvent(Event event) {EventType eventType = event.getHeader().getEventType();// 装配库表结构if (eventType == EventType.TABLE_MAP) {TableMapEventData tableData = event.getData();String dbTable = BinLogUtils.getDbTable(tableData.getDatabase(), tableData.getTable());if (dbTableCols.containsKey(dbTable)) {dbTableIdCols.put(tableData.getTableId(), dbTable);}}//新增数据if (EventType.isWrite(eventType)) {WriteRowsEventData data = event.getData();BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);} else if (EventType.isUpdate(eventType)) {// 更新数据UpdateRowsEventData data = event.getData();BinLogDataUtils.updateData(data, dbTableIdCols, dbTableCols, eventType, queue);} else if (EventType.isDelete(eventType)) {// 删除数据DeleteRowsEventData data = event.getData();BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);}}/*** @param db* @param table* @param listener* @Description 注册监听* @Throws* @Return void* @Date 2024-08-13 17:32:44* @Author WangKun**/public void registerListener(String db, String table, BinLogListener listener) {String dbTable = BinLogUtils.getDbTable(db, table);// 连接获取字段集合Map<String, Field> cols = BinLogDataUtils.getColumnsMap(db, table);// 保存字段信息dbTableCols.put(dbTable, cols);// 保存当前注册的listenerlisteners.put(dbTable, listener);}/*** @param* @Description 开启异步多线程消费* @Throws* @Return void* @Date 2024-08-13 18:02:48* @Author WangKun**/@Asyncpublic void openThreadConsumeBinLog(){client.registerEventListener(this);for (int i = 0; i < ThreadPoolConfig.corePoolSize*ThreadPoolConfig.CPU_NUMS; i++) {executorService.execute(() -> {// 轮询监控while (true) {if (!queue.isEmpty()) {try {BinLog binLogQueue = queue.take();listeners.get(binLogQueue.getDbTable()).forEach(binLogListener -> binLogListener.onEvent(binLogQueue));} catch (InterruptedException e) {log.error("BinLog多线程消费异常:{}", e.getMessage(), e);}}}});}try {//连接(不设置时间将会使用主线程)client.connect(BinLogUtils.QUEUE_SLEEP);} catch (Exception e) {log.error("BinLog多线程连接消费异常:{}", e.getMessage(), e);}}
}
java">/*** @Description 初始化Binlog监听* @Author WangKun* @Date 2024/8/9 10:36* @Version*/
@Slf4j
@RequiredArgsConstructor
@Component
@Order(value = 1)
public class BinLogInitListener implements CommandLineRunner {/*** 资源注入**/private final BinLogConfig config;/*** @param args* @Description 初始化* @Throws* @Return void* @Date 2024-08-13 14:07:49* @Author WangKun**/@Overridepublic void run(String... args) throws Exception {try {// 初始化监听器MySQLBinLogListener mySqlBinLogListener = new MySQLBinLogListener(config);this.getListMap().forEach((db, tables) -> {tables.forEach(table -> {mySqlBinLogListener.registerListener(db, table, info -> {if(info.getEventType().name().contains(BinLogEventEnum.UPDATE.getKey())){log.info("库.表: {}, 修改之前:{}" ,db+"."+table,info.getBefore().toString());log.info("库.表: {}, 修改之后:{}" ,db+"."+table,info.getAfter().toString());}if(info.getEventType().name().contains(BinLogEventEnum.WRITE.getKey())){log.info("库.表: {}, 新增: {}" ,db+"."+table,info.getAfter().toString());}if(info.getEventType().name().contains(BinLogEventEnum.DELETE.getKey())){log.info("库.表: {}, 删除: {}" ,db+"."+table,info.getBefore().toString());}});});});// 开启多线程消费mySqlBinLogListener.openThreadConsumeBinLog();} catch (Exception e) {log.error("BinLog初始化监听异常:{}", e.getMessage(), e);}}/*** @param* @Description 初始化监听库表* @Throws* @Return java.util.Map<java.lang.String, java.util.List < java.lang.String>>* @Date 2024-08-12 16:19:32* @Author WangKun**/private Map<String, List<String>> getListMap() {Map<String, List<String>> map = new ConcurrentHashMap<>();try {for (String key : config.getTables()) {// 正则转义,要加双斜线String[] split = key.split(BinLogUtils.D_SLASH + BinLogUtils.POINT);if (split.length != 2) {log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", key);throw new Exception("BinLog配置同步,类型错误 [库名.表名]。请正确配置:" + key);}map.computeIfAbsent(split[0], k -> new ArrayList<>()).add(split[1]);}return map;} catch (Exception e) {log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", e.getMessage(), e);}return map;}}
目录结构
启动IDEA,在控制台出现以下信息,成功
2024-08-19 17:40:47.129 INFO 493984 --- [ blc-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3306 at log.000004/7294671 (sid:1, cid:800)
效果: