Flink CDC数据同步

news/2025/2/19 8:58:55/

背景

随着信息化程度的不断提高,企业内部系统的数量和复杂度不断增加,因此,数据库系统的同步问题已成为越来越重要的问题。

缓存失效

在缓存中缓存的条目(entry)在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例如Redis,Memcache),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。

简化单体应用

许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用捕获更改数据技术(change data capture,CDC),在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。

共享数据库

当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,尽管非事务性的消息总线总会受上面提到的双写影响。但是,另一种实现方式,变得很直接:每个应用可以直接监控数据库的更改,并且响应更改。

数据集成

数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用数据同步工具加上简单的事件处理逻辑来实现简单的ETL类型的解决方案。

命令查询职责分离

在命令查询职责分离 [Command Query Responsibility Separation (CQRS)]架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧(update-side),这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序(totally-ordered)处理。Debezium和CDC可以使这种方式更可行:写操作被正常记录,但是Debezium捕获数据更改,并且持久化到全序流里,然后供那些需要异步更新只读视图的服务消费。写侧(write-side)表可以表示面向领域的实体(domain-oriented entities),或者当CQRS和 Event Sourcing 结合的时候,写侧表仅仅用做追加操作命令事件的日志。

Flink CDC

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink ®的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

image-20230827094534219

数据抓取

FlinkCDC 使用 MySQL 的 binlog 技术进行数据抓取。binlog 是 MySQL 用于记录数据库变更操作的日志,包括对表的增删改操作。FlinkCDC 通过对 binlog 进行解析和读取,得到最新的增量数据,并将其转换为 Flink 支持的数据格式,如 Avro 或 JSON。

如下代码可以帮我们监听数据库的变更日志:

JdbcIncrementalSource<String> oracleChangeEventSource =new OracleSourceBuilder().hostname("host").port(1521).databaseList("XE").schemaList("DEBEZIUM").tableList("DEBEZIUM.PRODUCTS").username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true) // output the schema changes as well.startupOptions(StartupOptions.initial()).debeziumProperties(debeziumProperties).splitSize(2).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000L);// set the source parallelism to 4env.fromSource(oracleChangeEventSource,WatermarkStrategy.noWatermarks(),"OracleParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print Oracle Snapshot + RedoLog");

数据同步

FlinkCDC 将抓取到的增量数据同步到 Flink 或者其他的计算引擎中进行处理。同步方式有两种:

pull 模式:FlinkCDC 在启动时会向 MySQL 中的某个位置开始读取 binlog,然后通过一个 HTTP 接口将增量数据暴露给 Flink。Flink 每隔一段时间就会调用该接口拉取增量数据。

push 模式:FlinkCDC 将增量数据通过一个 Kafka Topic 推送给 Flink。Flink 在消费 Kafka Topic 时,就可以直接消费到增量数据。

监听到数据变动,能拿到变更前后的数据对比,经过Sink数据转换成相应的INSERT、UPDATE、DELETE等相关SQL语句,并同步到目标数据库。

public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("监听到活动数据:" + LocalDateTime.now() + value);JSONObject jsonObject = JSONObject.parseObject(value);User before = jsonObject.getObject("before", User.class);User after = jsonObject.getObject("after", User.class);try {String table = jsonObject.getJSONObject("source").getString("table");SqlParse sqlParse = new SqlParse();String executeSQL = "";if(before == null){// 插入executeSQL = sqlParse.getInsert(after,table);}else if(after == null){// 删除executeSQL = sqlParse.getDeleteSQL(before,table);}else{// 更新executeSQL = sqlParse.getUpdateSQL(before,after,table);}SpringJDBC.executeSQL(executeSQL);}catch (Exception e){System.out.println("执行错误");}}}

通用形SqlParse只能解析同构数据,异构数据需要单独处理。

增量数据的解析和处理

FlinkCDC 将抓取到的增量数据转换为 Flink 支持的数据格式后,交由 Flink 进行进一步的处理。Flink 可以对数据进行各种运算,如聚合、过滤、变换等,最终将处理结果输出到其他的存储介质中。

总的来说,FlinkCDC 的原理就是通过解析 MySQL 中的 binlog,抓取到最新的增量数据,并将其转换为 Flink 支持的数据格式,然后将增量数据同步到 Flink 或者其他的计算引擎中进行处理。通过 Flink 的强大计算能力,可以对增量数据进行各种计算,从而实现实时数据处理和分析的功能。

优缺点比较

优点:

  • 能监听多种数据源:MySQL、Oracle、PgSQL等;
  • 支持流式处理,可以实现数据的实时处理和分析;
  • 支持增量更新,可以实现数据的实时同步;
  • 支持容错处理,可以实现数据的高可靠性;

缺点:

  • 对Oracle支持不太友好,需要将开启归档日志,并且部分字段解析需要了解其语义;
  • 对于大表的查询性能较差;
  • 对于大规模数据的处理效率较低;

http://www.ppmy.cn/news/1064447.html

相关文章

CrossOver 23 新功能介绍 CrossOver 23 版本更新了哪些功能

本次发布的CrossOver 23为用户带来了许多令人期待的新功能和优化&#xff0c;特别是对游戏方面的支持&#xff0c;更是让广大Mac游戏玩家兴奋。CrossOver 23包括对Wine 8.0.1的更新&#xff0c;带来了5000多处改动&#xff0c;对各种应用程序进行了改进。该版本还包括 Wine Mon…

Vue2向Vue3过度核心技术指令补充

目录 1 指令修饰符1.1 什么是指令修饰符&#xff1f;1.2 按键修饰符1.3 v-model修饰符1.4 事件修饰符 2 v-bind对样式控制的增强-操作class2.1 语法&#xff1a;2.2 对象语法2.3 数组语法2.4 代码练习 3 京东秒杀-tab栏切换导航高亮3.1 需求&#xff1a;3.2 准备代码:3.3 思路&…

芯科科技推出专为Amazon Sidewalk优化的全新片上系统和开发工具,加速Sidewalk网络采用

芯科科技为Sidewalk开发提供专家级支持 中国&#xff0c;北京 - 2023年8月22日 – 致力于以安全、智能无线连接技术&#xff0c;建立更互联世界的全球领导厂商Silicon Labs&#xff08;亦称“芯科科技”&#xff0c;NASDAQ&#xff1a;SLAB&#xff09;今日在其一年一度的第四…

功能强大的网站检测工具Web-Check

什么是 Web-Check &#xff1f; Web-Check是一款功能强大的一体化工具&#xff0c;用于查找有关网站/主机的信息。目前仪表版上可以显示&#xff1a;IP 信息、SSL 信息、DNS 记录、cookie、请求头、域信息、搜索爬虫规则、页面地图、服务器位置、开放端口、跟踪路由、DNS 安全扩…

C#,《小白学程序》第四课:数学计算

1 文本格式 /// <summary> /// 《小白学程序》第四课&#xff1a;数学计算 /// 这节课超级简单&#xff0c;就是计算成绩的平均值&#xff08;平均分&#xff09; /// 这个是老师们经常做的一件事。 /// </summary> /// <param name"sender"></…

202308_思考总结

产品的思考&#xff1a; *.新一代产品的考虑 (1)上一代产品自己发现的问题&#xff0c;关注痛点问题--->可以依据上一代问题集构建测试用例&#xff0c;每代产品发布后迭代测试用例 (2)发布后用户反馈问题--->有两个点可以吸收&#xff1a;a.下一代用户迫切需求是什么…

第9章 【C语言】用户自己建立数据类型

9.1 定义和使用结构体变量 9.1.1 自己建立结构体类型 变量大多数是互相独立的、无内在联系的。C语言允许用户建立由不同类型数据组成的组合型的数据结构&#xff0c;它称为结构体。 在程序中可以自己建立一个结构体类型&#xff1a; struct Student{int num; //学号为整…

rust学习-不安全操作

在 Rust 中,不安全代码块用于避开编译器的保护策略 四种不安全操作 解引用裸指针通过 FFI (Foreign Function Interface,外部语言函数接口)调用函数调用不安全的函数内联汇编(inline assembly)解引用裸指针 原始指针(raw pointer,裸指针)* 和引用 &T 有类似的功…