Debezium 同步 MySQL 实时数据并解决数据重复消费问题

embedded/2024/11/9 11:48:32/

        我们使用 Debezium 实时同步一个 MySQL 的数据到另一个 MySQL,代码网上基本都有,都是在引入 debezium-api,debezium-embedded 后写 Java 代码,做好了基本配置后启动程序,Debezium 会自动读取 MySQL 的实时 binlog,然后触发相应的事件让我们处理,我们就把事件里的数据读取出来,插入到目标库即可。我们的 MySQL 的版本是 5.7 。

        但我们在其中发现了一个很奇怪的问题,目标库存在多个相同的 sql ,我们以为是 Debezium 重复消费了 binlog 里的事件,就记录下每个事件的 position 并判重,但 sql 还是重复了,我们一开始觉得 MySQL 写的 binlog 肯定没问题,一个事务对应一个事件。之后我们使用 binlog2sql 这个 python 工具读取了已归档的 binlog 文件,发现里面没有重复的 sql ,这说明 MySQL binlog 还是没有问题的,问题在 Debezium,但 Debezium 作为一个成熟的 cdc 工具应该也不会有什么大的问题,可能是 Debezium 的配置问题,但检查了 Debezium 的所有配置后还是没发现有什么问题,配置改了后重新运行结果还是一样。 后面我们怀疑可能和 gtid 有关,我们发现 “Insert into xxx values (xxx) ” 会产生一个 binlog 事件,因为一个事务会产生一个 binlog 事件,但 “Insert into xxx values (xxx),(xxx),(xxx)...” 会产生多个事件,但这些事件的 gtid 还是同一个,事件里的 query 属性还是同一个,事件的 query 属性即原始 sql ,这就破案了,我们一直消费每个事件的query,但可能多个事件里的 query 属性是一样的,因为它们的 gtid 属性相同,它们属于同一个全局事务。后面我们使用 gtid 过滤相同属性就解决了数据重复问题。至于为什么一个批量插入会产生一个多个事件,并且多个事件的 gtid 是同一个,我们猜测 MySQL  的 binlog 就是这样写日志的,修改一行数据就产生一个事件,要是批量修改就产生多个事件,但这些批量事件同属于一个全局事务。

        怎么过滤重复 gtid 问题?因为 gtid 是递增的,相同的 gtid 都会一起出现,所以可以使用自动老元素的 Map,或是设置键过期的 redis,或是 带有 gtid 属性的数据库表,并设置它是唯一索引,或是插入数据之前先检查数据库里是否有本事件的gtid,有就跳过,没有就插入,并把这个过程加锁保证原子性。

        核心代码:

java">// 启动
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(config) .notifying(DataSync::handleChangeEvent).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);
java">private static void handleChangeEvent(ChangeEvent<String, String> event) {JSONObject valueJson = JSON.parseObject(event.value());if (valueJson != null) {JSONObject payload = valueJson.getJSONObject("payload");JSONObject source = payload.getJSONObject("after");// 原始sqlString query = source.getString("data_definition");// 对 sql 字符串进行美化query = query.replaceAll("[\\n\\r\\t\\s]+", " ");String database = source.getString("database");String table = source.getString("table_name");String gtid = source.getString("gtid");synchronized (lock) {// 查询数据库该 gtid 的数量long cnt = queryGtid(gtid);if (cnt == 0) {// 如果数据库不存在该 sql 就插入save(query, database, table, gtid);} else {System.out.println(gtid + " 有重复");}} }}

        

http://www.ppmy.cn/embedded/55822.html

相关文章

排序算法(1)之插入排序----直接插入排序和希尔排序

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 排序之插入排序----直接插入排序和希尔排序(1) 收录于专栏【数据结构初阶】 本专栏旨在分享学习数据结构学习的一点学习笔记&#xff0c;欢迎大家在评论区交流讨…

【C语言】常见的数据排序算法

目录 一、概述 二、常见的排序算法 2.1 冒泡排序 2.1.1 定义 2.1.2 C语言实现 2.2 快速排序 2.2.1 定义 2.2.2 C语言实现 2.3 插入排序 2.3.1 定义 2.3.2 C语言实现 2.4 希尔排序 2.4.1 定义 2.4.2 C语言实现 2.5 归并排序 2.5.1 定义 2.5.2 C语言实现 2.6 基…

基于协同过滤的航空票务推荐系统的设计与实现(飞机票推荐系统)

&#x1f497;博主介绍&#x1f497;&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

非常疑惑文章变成了仅VIP可读

关于博客发布的一些感想 挺久没上 CSDN 了&#xff0c;平时遇到问题都是问 ChatGPT&#xff0c;自行查阅资料的时间也不多了&#xff0c;写博文的频率也随之降低。偶尔会记些笔记自用&#xff0c;也没有再发布出来。 今天在谷歌查了个问题&#xff0c;突然想发个博客&#xf…

lvs+上一章的内容

书接上回这次加了个keepalived 一、集群与分布式 1.1 集群介绍 **集群&#xff08;Cluster&#xff09;**是将多台计算机组合成一个系统&#xff0c;以解决特定问题的计算机集合。集群系统可以分为以下三种类型&#xff1a; **LB&#xff08;Load Balancing&#xff0c;负载…

STM32定时器入门篇——(基本定时器的使用)

一、基本定时器的功能介绍&#xff1a; STM32F103的基本定时器有&#xff1a;TIM6、TIM7。基本定时器TIM6和TIM7各包含一个16位递增自动装载计数器&#xff0c;最大计数到2^16也就是65536&#xff0c;计数值为0~65535&#xff0c;其拥有的功能有&#xff1a;定时中断、主模式触…

分享一个超级实用的东西——巴比达远程访问

前言 &#x1f388;家人们&#xff0c;今天我要和你们分享一个超级实用的东西——巴比达远程访问&#xff01;&#x1f389; &#x1f4bb;有了它&#xff0c;无论你身在何处&#xff0c;都能轻松访问家中的电脑&#x1f4bb;&#xff0c;就像在身边一样方便&#xff01;&…

什么是 JVM( Java 虚拟机),它在 Java 程序执行中扮演什么角色?

JVM&#xff0c;全称Java Virtual Machine&#xff0c;中文译作“Java虚拟机”&#xff0c;它是运行Java程序的软件环境&#xff0c;也是Java语言的核心部分之一。 想象一下&#xff0c;如果你是一位环球旅行家&#xff0c;每到一个新的国家&#xff0c;都需要学习当地的语言才…