FlinkCDC基础篇章2-数据源 SqlServerCDC写入到ES中

news/2024/10/18 8:19:26/

接着 上期FlinkCDC基础篇章1-安装使用  

flink-和所需要的依赖包">下载 Flink 和所需要的依赖包 #

  1. 下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/ 下:

    下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

    • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
    • flink-sql-connector-mysql-cdc-2.4.0.jar
    • flink-sql-connector-postgres-cdc-2.4.0.jar

首先,开启 checkpoint,每隔3秒做一次 checkpoint-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH ('connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器'hostname' = '10.194.183.120',  -- SQL Server主机名'port' = '30027',               -- SQL Server端口'username' = 'sa',              -- SQL Server用户名'password' = 'abc@123456',      -- SQL Server密码'database-name' = 'cdc_test',   -- 数据库名称'schema-name' = 'dbo',          -- 模式名称'table-name' = 'orders'         -- 要捕获更改的表名
);-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED  -- 主键定义(可选)
)
WITH ('connector' = 'jdbc',                        -- 使用JDBC连接器'url' = 'jdbc:mysql://10.194.183.120:30025/test',  -- MySQL的JDBC URL'username' = 'root',                        -- MySQL用户名'password' = 'root',                        -- MySQL密码'table-name' = 'orders'                     -- 要写入的MySQL表名
);-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (serviceCode STRING,accountPeriod STRING,subjectCode STRING,subjectName STRING,amt DECIMAL(13,2),PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://xxxx:9200','index' = 'income_distribution','sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');

可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

参考文献:

使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC

flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客


http://www.ppmy.cn/news/1425392.html

相关文章

OpenCV杂记(2):图像拼接(hconcat, vconcat)

OpenCV杂记(1):绘制OSD(cv::getTextSize, cv::putText)https://blog.csdn.net/tecsai/article/details/137872058 1. 简述 做图像处理或计算机视觉技术的同学都知道,我们在工作中会经常遇到需要将两幅图像拼…

3、MYSQL-一条sql如何在MYSQL中执行的

MySQL的内部组件结构 大体来说,MySQL 可以分为 Server 层和存储引擎层两部分。 Server层 主要包括连接器、查询缓存、分析器、优化器、执行器等,涵盖 MySQL 的大多数核心服务功能,以及所有的内置函数(如日期、时间、数学和加密函…

【数据结构】图论(图的储存方式,图的遍历算法DFS和BFS、图的遍历算法的应用、图的连通性问题)

目录 图论一、 图的基本概念和术语二、图的存储结构1. 数组(邻接矩阵)存储表示无向图的数组(邻接矩阵)存储表示有向图的数组(邻接矩阵)存储表示 邻接表存储表示有向图的十字链表存储表示无向图的邻接多重表存储表示 三、图的遍历算法图的遍历——深度优先搜索(DFS&a…

Unity导出package

C#代码导出后为一个dll,原有的不同平台的库不变。 以下操作均在build PC 平台下操作。 1.在要导出的文件夹下建assembly definition (Any platform) 2.将项目文件夹下的\Library\ScriptAssemblies中的相应assembly definition的dll复制到要导出的文件夹下 3.在uni…

如何在 Linux 和 Mac 终端命令中添加别名

在本文中,我们将探讨一种简单的技巧,可以节省您在终端中输入重复命令的时间。 作为开发者,我们花费大量时间在终端上执行命令。无论是浏览目录、运行脚本、更改 Node.js 版本还是版本控制命令,手动输入每个命令都是一项耗时的任务…

docker安装并跑通QQ机器人实践(3)-bs-nonebot搭建

NoneBot2 是一个现代、跨平台、可扩展的 Python 聊天机器人框架(下称 NoneBot),它基于 Python 的类型注解和异步优先特性(兼容同步),能够为你的需求实现提供便捷灵活的支持。同时,NoneBot 拥有大…

解析OceanBase v4.2函数索引进行查询优化

一、如何通过函数索引来进行查询优化 函数索引是一种优化查询的技术,其主要作用在于提升包含函数调用的查询语句的执行速度。当查询语句中包含函数调用时,数据库系统需要逐行执行函数计算,这无疑会增加查询的复杂性,导致查询速度…

day22 java多线程 构造方法 常用方法 守护线程

目录 多线程构造方法 常用方法 守护线程 多线程构造方法 public Thread() :分配一个新的线程对象。 public Thread(String name) :分配一个指定名字的新线程对象。 public Thread(Runnable target) :分配一个带有指定目标新的线程对象。 public Thread(Runnable target,S…