Flink CDC系列之:学习理解核心概念——Data Pipeline

news/2024/11/2 0:21:31/

Flink CDC系列之:学习理解核心概念——Data Pipeline

  • 数据管道
  • source
  • sink
  • 管道配置
  • Table ID
  • route
  • transform
  • 案例

数据管道

由于 Flink CDC 中的事件以管道方式从上游流向下游,因此整个 ETL 任务被称为数据管道。

管道对应于 Flink 中的一系列操作。

要描述数据管道,需要以下部分:

  • source
  • sink
  • pipeline

以下部分是可选的:

  • route
  • transform

source

数据源用于访问元数据并从外部系统读取更改的数据。

数据源可以同时从多个表读取数据。

sink

数据接收器用于应用架构更改并将更改数据写入外部系统。

数据接收器可以同时写入多个表。

管道配置

支持以下数据管道级别的配置选项:

参数含义可选/必需
name管道的名称,将作为作业名称提交给Flink集群。可选
parallelism管道的全局并行度。默认为 1。可选
local-time-zone本地时区定义当前会话时区id。可选
   pipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass

Table ID

在连接外部系统时,需要与外部系统的存储对象建立映射关系,这就是 Table Id 所指的。

为了兼容大多数外部系统,Table Id 用三元组表示:(namespace, schemaName, tableName)。

连接器应该建立 Table Id 与外部系统中存储对象的映射。

下表列出了不同数据系统的 Table Id 中的部分:
在这里插入图片描述

route

Route 指定匹配一串 source-table 到 sink-table 的规则,最典型的场景是分库分表合并,将多个上游 source 表路由到同一张 sink 表。

transform

Transform模块帮助用户根据表中的数据列进行数据列的删除和扩展。
此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。

案例

我们可以使用以下 yaml 文件来定义一个简洁的数据管道,描述将 MySQL app_db 数据库下的所有表同步到 Doris:

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01projection: \*, UPPER(product_name) as product_namefilter: id > 10 AND order_id > 100description: project fields and filter- source-table: adb.web_order02projection: \*, UPPER(product_name) as product_namefilter: id > 20 AND order_id > 200description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2

我们可以使用以下 yaml 文件来定义一个复杂的数据管道,描述将 MySQL app_db 数据库下的所有表同步到 Doris,并给出特定的目标数据库名称 ods_db 和特定的目标表名称前缀 ods_ :

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 10 AND order_id > 100description: project fields and filter- source-table: adb.web_order02projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 20 AND order_id > 200description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass

http://www.ppmy.cn/news/1543727.html

相关文章

【STM32+HAL】STM32CubeMX学习目录

一、基础配置篇 【STM32HAL】微秒级延时函数汇总-CSDN博客 【STM32HAL】CUBEMX初始化配置 【STM32HAL】定时器功能小记-CSDN博客 【STM32HAL】PWM呼吸灯实现 【STM32HAL】DACDMA输出波形实现-CSDN博客 【STM32HAL】ADCDMA采集(单通道多通道)-CSDN博客 【STM32HAL】三重A…

TS 项目中给常用的路径定义一个别名 tsconfig.json

TS 项目中给常用的路径定义一个别名 tsconfig.json 在 TS 项目中,可以定义一些自定义的别名,来取代经常需要引用的一些文件路径。 比如 Vue 项目中你可以需要经常从 /src 中取文件,在每个层级的文件中引用时的相对路径 ../../src ../src 都不…

std::optional与函数返回值的讨论

这个话题是因为,最近一段时间看到有人在问std::optional有什么用,以及不理解为什么要有这个类。所以打算简单介绍一下std::optional,重点讨论返回值相关的内容。 1 std::optional 类模板 std::optional 管理一个可选的包含值,即…

模拟算法 (算法详解+例题)

目录 一、什么是模拟二、模拟算法的特点和技巧三、模拟OJ题3.1、替换所有的问号3.2、提莫攻击3.3、N字形变换3.4、外观数列3.5、数青蛙 一、什么是模拟 模拟是对真实事物或者过程的虚拟。在编程时为了实现某个功能,可以用语言来模拟那个功能,模拟成功也…

分享几款开源好用的图片在线编辑,适合做快速应用嵌入

图片生成器是指一种工具或软件,用于自动生成图片或图像内容,通常依据用户设定的参数或模板进行操作。这种工具能够帮助用户快速创建视觉效果丰富的图像,而无需具备专业的设计技能。 在数字化时代,图片编辑已经成为日常工作和生活的…

【ChatGP】让ChatGPT解释和简化复杂的技术概念

让ChatGPT解释和简化复杂的技术概念 在科技迅速发展的今天,许多人面临着理解复杂技术概念的挑战。无论是初学者还是专业人士,能够轻松理解并运用这些概念都是至关重要的。ChatGPT作为一个强大的语言模型,可以帮助用户解释和简化复杂的技术概…

【深度学习基础】深入理解 卷积与卷积核

🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀深度学习_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目录 1. 卷积 1.1 …

Python 自动化运维:安全与合规最佳实践

Python 自动化运维:安全与合规最佳实践 目录 🔒 Python安全编程实践与最佳实践🔑 使用Hashlib与Cryptography进行数据加密📊 安全审计与合规检查的重要性🔍 处理敏感数据与隐私保护的方法 1. 🔒 Python安…