使用Flinkcdc 采集mysql数据

ops/2024/12/19 5:17:19/

1.下载 Flink CDC 连接器

(1)登录官网下载
https://github.com/apache/flink-cdc/releases
(1)或者虚拟机在线下载
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar

(2)将下载的 JAR 文件放到 Flink 集群的 lib/ 目录 中,重启flink集群。

[root@node1 ~]# cd /export/server/flink
[root@node1 flink]# bin/start-cluster.sh

在这里插入图片描述

2.创建mysql表和数据

Drop database if exists test;
Create database test character set utf8;
Use test;
--建表语句:
-- 建表
-- 学生表
CREATE TABLE `Student`(`s_id` VARCHAR(20),`s_name` VARCHAR(20) NOT NULL DEFAULT '',`s_birth` VARCHAR(20) NOT NULL DEFAULT '',`s_sex` VARCHAR(10) NOT NULL DEFAULT '',PRIMARY KEY(`s_id`)
);
-- 成绩表
CREATE TABLE `Score`(`s_id` VARCHAR(20),`c_id` VARCHAR(20),`s_score` INT(3),PRIMARY KEY(`s_id`,`c_id`)
);
-- 插入学生表测试数据
insert into Student values('01' , '赵雷' , '1990-01-01' , '男');
insert into Student values('02' , '钱电' , '1990-12-21' , '男');
insert into Student values('03' , '孙风' , '1990-05-20' , '男');
insert into Student values('04' , '李云' , '1990-08-06' , '男');
insert into Student values('05' , '周梅' , '1991-12-01' , '女');
insert into Student values('06' , '吴兰' , '1992-03-01' , '女');
insert into Student values('07' , '郑竹' , '1989-07-01' , '女');
insert into Student values('08' , '王菊' , '1990-01-20' , '女');
-- 成绩表测试数据
insert into Score values('01' , '01' , 80);
insert into Score values('01' , '02' , 90);
insert into Score values('01' , '03' , 99);
insert into Score values('02' , '01' , 70);
insert into Score values('02' , '02' , 60);
insert into Score values('02' , '03' , 80);
insert into Score values('03' , '01' , 80);
insert into Score values('03' , '02' , 80);
insert into Score values('03' , '03' , 80);
insert into Score values('04' , '01' , 50);
insert into Score values('04' , '02' , 30);
insert into Score values('04' , '03' , 20);
insert into Score values('05' , '01' , 76);
insert into Score values('05' , '02' , 87);
insert into Score values('06' , '01' , 31);
insert into Score values('06' , '03' , 34);
insert into Score values('07' , '02' , 89);
insert into Score values('07' , '03' , 98);

3.使用Flink cdc 采集mysql

!!!注意:开启 binlog 日志功能,如果已开启忽略如下对mysql的配置操作,直接编辑flink sql即可

验证 MySQL 配置: Flink CDC 依赖 MySQL 的 binlog 功能进行数据采集。确保 binlog 已开启,执行以下命令检查:

[root@node1 ~]# mysql -uroot -p在mysql里面执行,如果返回值为 OFF,需要开启 binlog 功能;
SHOW VARIABLES LIKE 'log_bin';

开启方式:

vi /etc/my.cnf
在[mysqld]下面增加如下代码:
server_id=1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

解释:
server_id=1 # MySQL 实例唯一标识符,必须是唯一的
log_bin = mysql-bin # 开启 binlog 功能,文件名为 mysql-bin
binlog_format = ROW # 设置 binlog 格式为 ROW,必需
expire_logs_days = 30 # binlog 日志保留天数,自动清理超过30天的日志

然后重启 MySQL 服务:

systemctl restart mysqld

====================================

启动flink sql客户端

sql-client.sh

在FlinkSQL-Client,执行创建表 mysql_cdc_to_test_Student

CREATE TABLE if not exists mysql_cdc_to_test_Student (s_id     STRING,s_name   STRING,s_birth  STRING,s_sex    STRING,PRIMARY KEY (`s_id`) NOT ENFORCED
) WITH ('connector'= 'mysql-cdc','hostname'= '192.168.77.161','port'= '3306','username'= 'root','password'='123456','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'test','table-name'= 'Student'
);#设置以表形式查看
SET sql-client.execution.result-mode = tableau;select * from mysql_cdc_to_test_Student;

在这里插入图片描述


http://www.ppmy.cn/ops/143086.html

相关文章

基于Vue的乐器教学平台的设计与实现

一、前言 随着互联网技术的飞速发展,在线教育逐渐成为一种重要的教育方式。乐器教学作为艺术教育的重要组成部分,也迎来了新的机遇与挑战。传统的乐器教学主要依赖于面对面授课,受时间、空间和师资资源的限制较大。而开发一个基于 Vue 的乐器…

机器学习基础环境安装与使用

目录 A Neural Network Playground 1、库的安装 2、Jupyter Notebook使用 2.1、快捷键操作 2.2、markdown语法 2.3、安装jupyter_contrib_nbextension库 A Neural Network Playground 1、库的安装 整个机器学习基础阶段会用到Matplotlib、Numpy、Pandas等等,为了统一版…

const和修饰指针的几种用法

昨天闲着没事去面试了一个C岗位,问了很多基础的东西都没答上来。主要原因是这些知识在硬件资源丰富的pc端用的不多,二来确实很久没温习之前的C相关的知识了。在面试官问了几次类似的问题没有答好的情况下(还喜欢问你确不确定)&…

opencv礼帽和黑帽运算

礼帽 原始输入 - 开运算结果,留存的以白色毛刺为主 黑帽 闭运算 - 原始输入,保留的更多是原始轮廓 # 导入OpenCV库,用于图像处理 import cv2 import numpy as np # 从matplotlib库中导入pyplot模块,用于绘制图像 from …

包子凑数(2017年蓝桥杯试题H)

【问题描述】 小明几乎每天早晨都会在一家包子铺吃早餐,他发现这家包子铺有N种蒸笼,其中第i种蒸笼恰好能放Ai(i为下标)个包子。每种蒸笼都有非常多个,可以认为是无限笼。 每当有顾客想买X个包子。卖包子的大叔就会迅速选出若干笼包子&#xf…

【C++】sophus : common.hpp 丰富的字符串格式化、日志记录和数学常量处理功能 (七)...

这段C代码实现了一个名为Sophus的库,提供了各种实用功能。主要内容包括: 宏定义和条件编译: 使用条件编译定义了一些宏,用于在不同编译器下处理函数名称等。定义了用于格式化字符串和打印日志的宏。定义了用于运行时断言的宏&…

Python爬虫获取商品销量详情

在这个数据驱动的时代,获取商品销量详情已经不再是简单的点击和浏览。我们需要的是速度、效率,还有一点点的...偷偷摸摸。没错,今天我们要聊的是如何使用Python爬虫来“偷窥”商品销量详情。别担心,我们保证一切都是合法合规的&am…

revit转gltf,revit转3dtiles,如何将Revit模型转为3DTiles格式并在Cesiumjs中高效可视化

Revit模型导出gltf、glb与3dtiles有多种方式,但一般的商业工具收费普遍较高:Cesiumlab导出3dTile格式数据,Cesiumlab暂时可试用3天,会员版收费每年800;BimAngleEngine导出3dTile格式数据BimAngleEngine暂时可试用30天&…