Flink CDC入门案例

news/2024/11/20 7:28:32/

由于Flink CDC是基于日志的方式,因此需要开启MySQL的binlog日志。

开启binlog日志的配置如下

#1.编辑MySQL的配置文件
vim /etc/my.cnf

#添加如下内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

#重启MySQL服务
systemctl restart mysqld

MySQL开启binlog后,使用下列查询语句来验证MySQL的binlog是否开启成功:

show variables like '%log_bin%';

数据准备:在MySQL中创建库、表、插入数据

--创建数据库
Create database test character set utf8;--切换数据库
Use test;-- 建表
-- 学生表
CREATE TABLE `Student`(`s_id` VARCHAR(20),`s_name` VARCHAR(20) NOT NULL DEFAULT '',`s_birth` VARCHAR(20) NOT NULL DEFAULT '',`s_sex` VARCHAR(10) NOT NULL DEFAULT '',PRIMARY KEY(`s_id`)
);
-- 成绩表
CREATE TABLE `Score`(`s_id` VARCHAR(20),`c_id` VARCHAR(20),`s_score` INT(3),PRIMARY KEY(`s_id`,`c_id`)
);-- 插入学生表测试数据
insert into Student values('01' , '赵雷' , '1990-01-01' , '男');
insert into Student values('02' , '钱电' , '1990-12-21' , '男');
insert into Student values('03' , '孙风' , '1990-05-20' , '男');
insert into Student values('04' , '李云' , '1990-08-06' , '男');
insert into Student values('05' , '周梅' , '1991-12-01' , '女');
insert into Student values('06' , '吴兰' , '1992-03-01' , '女');
insert into Student values('07' , '郑竹' , '1989-07-01' , '女');
insert into Student values('08' , '王菊' , '1990-01-20' , '女');
-- 成绩表测试数据
insert into Score values('01' , '01' , 80);
insert into Score values('01' , '02' , 90);
insert into Score values('01' , '03' , 99);
insert into Score values('02' , '01' , 70);
insert into Score values('02' , '02' , 60);
insert into Score values('02' , '03' , 80);
insert into Score values('03' , '01' , 80);
insert into Score values('03' , '02' , 80);
insert into Score values('03' , '03' , 80);
insert into Score values('04' , '01' , 50);
insert into Score values('04' , '02' , 30);
insert into Score values('04' , '03' , 20);
insert into Score values('05' , '01' , 76);
insert into Score values('05' , '02' , 87);
insert into Score values('06' , '01' , 31);
insert into Score values('06' , '03' , 34);
insert into Score values('07' , '02' , 89);
insert into Score values('07' , '03' , 98);

启动集群

#0.准备工作
cd $FLINK_HOME/lib,查看是否有如下3个jar包,没有的话需要拷贝进来
commons-cli-1.4.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
flink-sql-parquet_2.12-1.14.5.jar
有的话,表示Flink CDC已经集成了。接下来可以正常登录FlinkSQL客户端。#1.启动HDFS
start-dfs.sh#2.启动Flink集群
start-cluster.sh#3.进入SQL-Client
sql-client.sh

Flink SQL-Client操作

在FlinkSQL中创建映射表

--在FlinkSQL中创建MySQL中Student表的映射表
CREATE TABLE if not exists mysql_cdc_to_test_Student (s_id     STRING,s_name   STRING,s_birth  STRING,s_sex    STRING,PRIMARY KEY (`s_id`) NOT ENFORCED
) WITH ('connector'= 'mysql-cdc','hostname'= '192.168.88.161','port'= '3306','username'= 'root','password'='密码','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'test','table-name'= 'Student'
);
--解释
'connector'='mysql-cdc',指定采用Flink CDC的方式来捕获数据变更'server-time-zone'= 'Asia/Shanghai',更换时区为上海时区
'debezium.snapshot.mode'='initial',指定消费模式为initial,表示从历史数据开始消费---查询语句
select * from mysql_cdc_to_test_Student;


http://www.ppmy.cn/news/40911.html

相关文章

dubbo2.7升级到dubbo3--dubbo2.7升级到dubbo3系列

最近在做老系统升级(springboot2dubbo2.7.1zookeepernacos-config),去掉zookeeper的注册中心,替换成nacos2.1版本(阿里云已经不支持1.X版本了)-对应的需要升级springboot和dubbo3。最终升级完成了,其中遇到的诸多问题&…

zabbix创建自定义监控模板

目录 第一章先行配置zabbix 第二章配置自定义 2.1.案列:自定义监控客户端服务器登录的人数需求:限制登录人数不超过 3 个,超过 3 个就发出报警信息 2.2.在 Web 页面创建自定义监控项模板 2.3.zabbix 自动发现与自动注册 总结 自定义监控…

Python与c语言的区别与联系

Python与c语言都是一种机器学习语言,进过长时间的学习和总结,我将Python与c语言的一些特点总结成以下几点,不全面还望多多指正。 1、因为C语言是编译型语言,python是解释型语言,所以python的执行速度没有C语言那么快。…

深度分析台积电的投资价值:伟大的公司,伟大的投资

来源:猛兽财经 作者:猛兽财经 公司介绍 台积电(TSM)是一家在1987年成立于台湾的半导体公司,并在全球范围内率先实施了“商业晶圆厂”代工模式。该公司为部分或全部外包生产的半导体生产商提供晶圆代工服务。台积电的产…

【SpringMVC】第一个springmvc项目

需求: 用户在页面发起一个请求, 请求交给springmvc的控制器对象,并显示请求的处理结果(在结果页面显示一个欢迎语句)。 实现步骤: 新建web maven工程 加入依赖 spring-webmvc依赖,间接把spri…

Python 进阶指南(编程轻松进阶):二、环境配置和命令行

原文:http://inventwithpython.com/beyond/chapter2.html 环境配置是配置你的计算机环境,以便你写代码的过程。这包括安装任何必要的工具,配置它们,以及处理安装过程中的任何问题。没有一键配置这种傻瓜式操作过程,因为…

Matlab进阶绘图第16期—三维填充折线图

三维填充折线图是在三维折线图的基础上,对其与XOY平面之间的部分进行颜色填充,从而能够更好地刻画细节变化。 由于Matlab中未收录三维填充折线图的绘制函数,因此需要大家自行设法解决。 本文使用自制的FilledPlot3小工具进行三维填充折线图…

Spring Boot 监控

目录 1.概述 2.使用 2.1.依赖 2.2.配置 2.2.1.默认 2.2.2.暴露端点 2.3.常用端点 2.3.1.health 2.3.2.metrics 2.3.3.loggers 2.3.4.beans 2.4.自定端点 1.概述 Spring Boot Actuator提供了对Spring Boot应用进行监控的能力,其提供了4个方面的监控能力…