FlinkCDC基础篇章2-数据源 SqlServerCDC写入到ES中

server/2025/1/16 2:35:07/

接着 上期FlinkCDC基础篇章1-安装使用  

flink-和所需要的依赖包">下载 Flink 和所需要的依赖包 #

  1. 下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/ 下:

    下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

    • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
    • flink-sql-connector-mysql-cdc-2.4.0.jar
    • flink-sql-connector-postgres-cdc-2.4.0.jar

首先,开启 checkpoint,每隔3秒做一次 checkpoint-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH ('connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器'hostname' = '10.194.183.120',  -- SQL Server主机名'port' = '30027',               -- SQL Server端口'username' = 'sa',              -- SQL Server用户名'password' = 'abc@123456',      -- SQL Server密码'database-name' = 'cdc_test',   -- 数据库名称'schema-name' = 'dbo',          -- 模式名称'table-name' = 'orders'         -- 要捕获更改的表名
);-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED  -- 主键定义(可选)
)
WITH ('connector' = 'jdbc',                        -- 使用JDBC连接器'url' = 'jdbc:mysql://10.194.183.120:30025/test',  -- MySQL的JDBC URL'username' = 'root',                        -- MySQL用户名'password' = 'root',                        -- MySQL密码'table-name' = 'orders'                     -- 要写入的MySQL表名
);-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (serviceCode STRING,accountPeriod STRING,subjectCode STRING,subjectName STRING,amt DECIMAL(13,2),PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://xxxx:9200','index' = 'income_distribution','sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');

可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

参考文献:

使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC

flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客


http://www.ppmy.cn/server/10230.html

相关文章

docker 报错 error adding seccomp filter rule for syscall clone3

网上有一些说法,例如重新安装docker 但是我自己尝试,用 –security-opt seccompunconfined 就可以,但是需要把这个命令放到紧挨着run的位置,如果放到偏后的位置,可能不起作用。 以下命令是其他网友启动是的命令&…

Stable Diffusion是什么

稳定扩散(Stable Diffusion)是一种数学模型和随机过程,用于描述不同粒子之间的随机运动和扩散过程。它是从随机漫步(Random Walk)发展而来,并具有一些特定的性质。 在稳定扩散中,粒子的运动是随…

【行为型模式】模板方法模式

一、模板方法模式概述 模板方法模式定义:在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。(类对象型模式) 模板方法中的基本方法是实现算法的各个步骤,是模板方法的…

Kotlin语法快速入门-区间(3)

Kotlin语法快速入门-区间(3) 文章目录 Kotlin语法快速入门-区间(3)三、区间1、语法2、遍历3、查找是否在区间内4、字符区间 三、区间 1、语法 fun main() {// 1-10的闭区间1..10// 1-10的开区间1 until 10// 10-1的倒序区间10 d…

面试复习基础题目-c#相关

面试复习基础题目 c#相关问题 delegate和event的区别是什么? Delegate用来声明委托类型,event用来声明委托对象; 事件是委托的一种应用,事件是带有event关键词的委托对象,对委托对象进行了封装,本质就是委…

Frida入门笔记

Frida入门笔记 1.背景1.1 概述1.2 主要功能1.3 应用场景1.4 使用方法 1.背景 Frida 是一款强大的动态代码插桩工具,它允许开发者在运行时对目标应用程序进行实时操作和分析。以下是对 Frida 的详细介绍: 1.1 概述 Frida 是一个跨平台的动态代码插桩框…

Linux搭建Discuz论坛

搭建一个论坛 —接上篇博客 改名/etc/httpd/conf.d/vhosts.conf 》/etc/httpd/conf.d/vhosts.conf.bak [rootlocalhost conf.d]# mv /etc/httpd/conf.d/vhosts.conf /etc/httpd/conf.d/vhosts.conf.bak此时的vhosts.conf是一个新创建的文件,之前的vhosts.conf已经…

STM32 串口接收定长,不定长数据

本文为大家介绍如何使用 串口 接收定长 和 不定长 的数据。 文章目录 前言一、串口接收定长数据1. 函数介绍2.代码实现 二、串口接收不定长数据1.函数介绍2. 代码实现 三,两者回调函数的区别比较四,空闲中断的介绍总结 前言 一、串口接收定长数据 1. 函…