MySQL到Doris的StreamingETL实现(Flink CDC 3.0)
1 环境准备
1)安装FlinkCDC
[root@hadoop1 software]$ tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/
2)拖入MySQL以及Doris依赖包
将flink-cdc-pipeline-connector-doris-3.0.0.jar以及flink-cdc-pipeline-connector-mysql-3.0.0.jar防止在FlinkCDC的lib目录下
2 同步变更
2)编写MySQL到Doris的同步变更配置文件
在FlinkCDC目录下创建job文件夹,并在该目录下创建同步变更配置文件
[root@hadoop1 flink-cdc-3.0.0]$ mkdir job/[root@hadoop1 flink-cdc-3.0.0]$ cd job[root@hadoop1 job]$ vim mysql-to-doris.yamlsource:type: mysqlhostname: hadoop3port: 3306username: rootpassword: "123456"tables: test.\.server-id: 5400-5404server-time-zone: UTC+8sink:type: dorisfenodes: hadoop1:7030username: rootpassword: "123456"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 1
3)启动任务并测试
(1)开启Flink集群,注意:在Flink配置信息中打开CheckPoint
[root@hadoop3 flink-1.18.0]$ vim conf/flink-conf.yaml
添加如下配置信息
execution.checkpointing.interval: 5000
分发该文件至其他Flink机器并启动Flink集群
[root@hadoop3 flink-1.18.0]$ bin/start-cluster.sh
(2)开启Doris FE
[root@hadoop1 fe]$ bin/start_fe.sh
(3)开启Doris BE
[root@hadoop1 be]$ bin/start_be.sh
(4)在Doris中创建test数据库
[root@hadoop3 doris]$ mysql -uroot -p123456 -P9030 -hhadoop1mysql: [Warning] Using a password on the command line interface can be insecure.Welcome to the MySQL monitor. Commands end with ; or g.Your MySQL connection id is 0Server version: 5.7.99 Doris version doris-1.2.4-1-UnknownCopyright (c) 2000, 2022, Oracle and/or its affiliates.Oracle is a registered trademark of Oracle Corporation and/or itsaffiliates. Other names may be trademarks of their respectiveowners.Type 'help;' or 'h' for help. Type 'c' to clear the current input statement.mysql> create database test;Query OK, 1 row affected (0.00 sec)mysql> create database doris_test_route;Query OK, 1 row affected (0.00 sec)
(5)启动FlinkCDC同步变更任务
[root@hadoop3 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml
(6)刷新Doris中test数据库观察结果
(7)在MySQL的test数据中对应的几张表进行新增、修改数据以及新增列操作,并刷新Doris中test数据库观察结果
3 路由变更
1)编写MySQL到Doris的路由变更配置文件
source:type: mysqlhostname: hadoop3port: 3306username: rootpassword: "123456"tables: test_route.\.server-id: 5400-5404server-time-zone: UTC+8sink:type: dorisfenodes: hadoop1:7030benodes: hadoop1:7040username: rootpassword: "123456"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: test_route.t1sink-table: doris_test_route.doris_t1- source-table: test_route.t2sink-table: doris_test_route.doris_t1- source-table: test_route.t3sink-table: doris_test_route.doris_t3pipeline:name: Sync MySQL Database to Dorisparallelism: 1
2)启动任务并测试
[root@hadoop3 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris-route.yaml
3)刷新Doris中test_route数据库观察结果
4)在MySQL的test_route数据中对应的几张表进行新增、修改数据操作,并刷新Doris中doris_test_route数据库观察结果