10 Flink CDC

ops/2025/2/3 15:42:26/

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}

http://www.ppmy.cn/ops/155341.html

相关文章

对比DeepSeek、ChatGPT和Kimi的学术写作关键词提取能力

关键词 关键词主要从论文标题、摘要及正文中提炼出来&#xff0c;需要准确反映论文的核心主题和专业领域。关键词的选择不仅有助于标引人员进行主题词的选取、数据库的建立以及文献的检索&#xff0c;而且也便于读者高效检索和引用相关学术成果&#xff0c;从而促进学术交流的深…

JavaScript前后端交互-AJAX/fetch

摘自千峰教育kerwin的js教程 AJAX 1、AJAX 的优势 不需要插件的支持&#xff0c;原生 js 就可以使用用户体验好&#xff08;不需要刷新页面就可以更新数据&#xff09;减轻服务端和带宽的负担缺点&#xff1a; 搜索引擎的支持度不够&#xff0c;因为数据都不在页面上&#xf…

中国证券基本知识汇总

中国证券市场是一个多层次、多领域的市场&#xff0c;涉及到各种金融工具、交易方式、市场参与者等内容。以下是中国证券基本知识的汇总&#xff1a; 1. 证券市场概述 证券市场&#xff1a;是指买卖证券&#xff08;如股票、债券、基金等&#xff09;的市场。证券市场可以分为…

神经网络的数据流动过程(张量的转换和输出)

文章目录 1、文本从输入到输出&#xff0c;经历了什么&#xff1f;2、数据流动过程是张量&#xff0c;如何知道张量表达的文本内容&#xff1f;3、词转为张量、张量转为词是唯一的吗&#xff1f;为什么&#xff1f;4、如何保证词张量的质量和合理性5、总结 &#x1f343;作者介…

分享| RL-GPT 框架通过慢agent和快agent结合提高AI解决复杂任务的能力-Arxiv

结论 “RL-GPT: Integrating Reinforcement Learning and Code-as-policy” RL-GPT 框架为解决大语言模型在复杂任务处理中的难题提供了创新有效的途径&#xff0c; 旨在将强化学习&#xff08;RL&#xff09;和代码即策略相结合&#xff0c; 以解决大语言模型&#xff08…

机器学习笔记——特征工程

大家好&#xff0c;这里是好评笔记&#xff0c;公主号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本笔记介绍机器学习中常见的特征工程方法、正则化方法和简要介绍强化学习。 文章目录 特征工程&#xff08;Fzeature Engineering&#xff09;1. 特征提取&#xff…

最新码支付个人免签支付系统源码 三网免挂版本 兼容易支付

介绍: 最新码支付个人免签支付系统源码 三网免挂版本 兼容易支付 本系统是基于thinkphp5.0 FastAdmin 开发的一套新型聚合收款、聚合支付系统 一款专业的聚合免签收款系统,无需对接其余平台,个码就可收款,灰常的方便快捷,集成实现三网免挂功能, 无需挂繁琐的监控软件就可实…

【leetcode详解】T598 区间加法

598. 区间加法 II - 力扣&#xff08;LeetCode&#xff09; 思路分析 核心在于将问题转化&#xff0c; 题目不是要求最大整数本身&#xff0c;而是要求解最大整数的个数 结合矩阵元素的增加原理&#xff0c;我们将抽象问题转为可操作的方法&#xff0c;其实就是再找每组ops中…