flink系列之:使用flink cdc3从mysql数据库同步数据到doris和starrocks

ops/2025/2/26 19:01:08/

flink系列之:使用flink cdc3从mysql数据库同步数据到dorisstarrocks

  • 一、下载部署flink
  • 二、下载部署flink cdc3
  • 三、下载mysql-connector-java到flink和flink cdc的lib目录
  • 四、flink设置checkpoint支持增量同步数据
  • 五、mysql到doris和starrocks的yaml配置文件
  • 六、启动flink和flink cdc
  • 七、查看flink cdc任务同步日志
  • 八、查看mysql表和starrocks
  • 九、flink cdc技术生产环境应用

一、下载部署flink

  • 下载flink

解压flink

tar -zxvf flink-1.19.1-bin-scala_2.12.tgz

修改flink配置文件config.yaml

taskmanager:bind-host: localhosthost: localhostnumberOfTaskSlots: 6memory:process:size: 1728mparallelism:default: 1
rest:address: 10.66.77.104# network interface, such as 0.0.0.0.bind-address: 10.66.77.104# port: 8081# # Port range for the REST and web server to bind to.# bind-port: 8080-8090

设置flink 环境变零

cd /etc/profile.d
cat flink.sh #export HADOOP_CLASSPATH=`hadoop classpath`
FLINK_HOME=/data/src/flink/flink-1.19.1
PATH=$PATH:$FLINK_HOME/bin:$FLINK_HOME/sbinexport PATH
export FLINK_HOME

启动flink

./start-cluster.sh

查看jps

jps
760234 StandaloneSessionClusterEntrypoint
390132 Jps
760880 TaskManagerRunner

查看flink web ui,{ip}:{port}
在这里插入图片描述

二、下载部署flink cdc3

  • https://github.com/apache/flink-cdc/releases
    在这里插入图片描述
    解压flink-cdc3
tar -zxvf flink-cdc-3.3.0-bin.tar.gz

下载Pipeline Connectors Jars和Source Connector Jars到lib目录

/data/src/flink/flink-cdc-3.3.0/lib   ls
flink-cdc-dist-3.3.0.jar                              flink-cdc-pipeline-connector-maxcompute-3.3.0.jar  flink-sql-connector-tidb-cdc-3.3.0.jar
flink-cdc-pipeline-connector-doris-3.3.0.jar          flink-cdc-pipeline-connector-mysql-3.3.0.jar       mysql-connector-java-8.0.28.jar
flink-cdc-pipeline-connector-elasticsearch-3.3.0.jar  flink-cdc-pipeline-connector-paimon-3.3.0.jar
flink-cdc-pipeline-connector-kafka-3.3.0.jar          flink-cdc-pipeline-connector-starrocks-3.3.0.jar

三、下载mysql-connector-java到flink和flink cdc的lib目录

https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

在这里插入图片描述

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

四、flink设置checkpoint支持增量同步数据

  • execution.checkpointing.interval: 3000

参数说明

  • execution.checkpointing.interval: 这个参数用于指定 Flink 作业执行检查点的频率。检查点是 Flink 用于实现容错机制的一种机制,通过定期保存作业的状态,可以在发生故障时恢复到最近的一个检查点。
  • 3000: 这个值表示检查点的间隔时间,单位是毫秒(ms)。因此,3000 毫秒等于 3 秒。

starrocksyaml_100">五、mysql到doris和starrocks的yaml配置文件

放到任意目录下

mysql-to-doris.yaml

   source:type: mysqlhostname: ipport: 3306username: *********password: ************tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfoserver-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: dorisfenodes: ip:8030username: ***********password: *************route:- source-table: data_entry_test.debeziumOfflineClusterInfosink-table: optics.debeziumOfflineClusterInfo- source-table: data_entry_test.debeziumRealtimeClusterInfosink-table: optics.debeziumRealtimeClusterInfopipeline:name: Sync MySQL Database to Dorisparallelism: 2

mysql-to-starrocks.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: ipport: 3306username: *********password: **********tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfoserver-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://ip:9030load-url: ip:8030username: ****************password: ****************
route:- source-table: data_entry_test.debeziumOfflineClusterInfosink-table: dd_test_starrocks.debeziumOfflineClusterInfo- source-table: data_entry_test.debeziumRealtimeClusterInfosink-table: dd_test_starrocks.debeziumRealtimeClusterInfo
pipeline:name: MySQL to StarRocks Pipelineparallelism: 6

六、启动flink和flink cdc

启动flink

./start-cluster.sh

启动flink cdc

/data/src/flink/flink-cdc-3.3.0/bin/flink-cdc.sh
/data/src/flink/flink-cdc-3.3.0/conf/mysql-to-starrocks.yaml

flink web ui查看任务
在这里插入图片描述

七、查看flink cdc任务同步日志

2025-02-18 13:48:49,973 INFO  com.starrocks.connector.flink.catalog.StarRocksCatalog       [] - Success to create table dd_test_starrocks.dd_test_starrocks, sql: CREATE TABLE IF NOT EXISTS dd_test_starrocks.debeziumOfflineClusterInfo (
id VARCHAR(21) NOT NULL,
servername VARCHAR(6168) NOT NULL,
connectorname VARCHAR(6168) NOT NULL,
databasename VARCHAR(6168) NOT NULL,
url VARCHAR(6168) NOT NULL,
topicname VARCHAR(6168) NOT NULL,
clustername VARCHAR(6168) NOT NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH (id);
2025-02-18 14:04:25,298 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (1/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:04:25,333 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (2/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_1_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:09:35,729 INFO  com.starrocks.data.load.stream.DefaultStreamLoader           [] - Stream load completed, label : flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97, database : dd_test_starrocks, table : debeziumOfflineClusterInfo, body : {"Status": "OK","Message": "","Label": "flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97","TxnId": 108875857,"LoadBytes": 133959,"StreamLoadPlanTimeMs": 0,"ReceivedDataTimeMs": 0
}

starrocks_209">八、查看mysql表和starrocks

mysql表

-- data_entry_test.debeziumOfflineClusterInfo definitionCREATE TABLE `debeziumOfflineClusterInfo` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',`servername` varchar(2056) NOT NULL COMMENT 'connector标识名',`connectorname` varchar(2056) NOT NULL COMMENT 'connector名称',`databasename` varchar(2056) NOT NULL COMMENT '数据库名',`url` varchar(2056) NOT NULL COMMENT '数据库名',`topicname` varchar(2056) NOT NULL COMMENT 'topic名称',`clustername` varchar(2056) NOT NULL COMMENT '集群名称',`database_server_id` varchar(256) NOT NULL COMMENT '集群名称',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=765 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

starrocks

-- dd_test_starrocks.debeziumOfflineClusterInfo definitionCREATE TABLE `debeziumOfflineClusterInfo` (`id` varchar(21) NOT NULL COMMENT "",`servername` varchar(6168) NOT NULL COMMENT "",`connectorname` varchar(6168) NOT NULL COMMENT "",`databasename` varchar(6168) NOT NULL COMMENT "",`url` varchar(6168) NOT NULL COMMENT "",`topicname` varchar(6168) NOT NULL COMMENT "",`clustername` varchar(6168) NOT NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
);

如上所示,成功在starrocks表中创建了表,并完成了历史数据和增量数据的同步

九、flink cdc技术生产环境应用

  • 阿里云基于 Flink CDC 的现代数据栈云上实践

细粒度变更策略控制:

  • 支持新增表、新增列、修改列名、修改列定义、删除列、删除表和清空表等操作
    在这里插入图片描述

当上游数据库新增表时,CDC YAML 能够自动识别并同步这些表的数据,而无需重新配置作业。此功能分为两种情况:

  • 历史数据同步:通过开启 scan.newly-added-table.enabled 选项,并通过 savepoint 重启作业来读取新增表的历史数据。
  • 增量数据同步:只需开启 scan.binlog.newly-added-table.enabled 选项,自动同步新增表的增量数据。

在这里插入图片描述


http://www.ppmy.cn/ops/161474.html

相关文章

眼见不一定为实之MySQL中的不可见字符

目录 前言 一、问题的由来 1、需求背景 2、数据表结构 二、定位问题 1、初步的问题 2、编码是否有问题 3、依然回到字符本身 三、深入字符本身 1、回归本质 2、数据库解决之道 3、代码层解决 四、总结 前言 在开始今天的博客内容之前,正在看博客的您先…

HarmonyOS Design 介绍

HarmonyOS Design 介绍 文章目录 HarmonyOS Design 介绍一、HarmonyOS Design 是什么?1. 设计系统(Design System)2. UI 框架的支持3. 设计工具和资源4. 开发指南5. 与其他设计系统的对比总结 二、HarmonyOS Design 特点 | 应用场景1. Harmon…

改进的Siddon算法与原算法的区别及具体改进

1. 算法原理 原Siddon算法: 基本原理:Siddon算法是一种射线驱动模型(RDM),用于计算射线通过像素或体素空间的精确路径。它通过计算射线与每个像素或体素的交点,来确定射线在每个像素或体素内的长度&#xf…

ElasticSearch公共方法封装

业务场景 1、RestClientBuilder初始化(同时支持单机与集群) 2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权支持) 3、判断ES索引是否存在(/_cat/indices/${indexName}) 4、判断ES…

前端+nodejs+mysql实现前后端联通

创建项目 目录结构 创建项目 npm init 初始化项目 一路回车即可 当有 package.json这个文件的时候就相当于已经构建完毕 3. 配置package.json文件 {"name": "yes","version": "1.0.0","description": "",&quo…

数据结构(陈越,何钦铭) 第四讲 树(中)

4.1 二叉搜索树 4.1.1 二叉搜索树及查找 Position Find(ElementTyoe X,BinTree BST){if(!BST){return NULL;}if(X>BST->Data){return Find(X,BST->Right)}else if(X<BST->Data){return Find(X,BST->Left)}else{return BST;} } Position IterFind(ElementTyp…

5G网络切片辨析(eMBB,mMTC,uRLLC)

URLLC有三大应用场景&#xff0c;分别是eMBB&#xff08;增强型移动宽带&#xff09;、uRLLC&#xff08;高可靠低延时通信&#xff09;和mMTC&#xff08;海量机器通信&#xff09;。 增强型移动宽带&#xff08;eMBB&#xff09;&#xff1a;需要关注峰值速率&#xff0c;容…

Docker 搭建 Nginx 服务器

系列文章目录 Docker 搭建 Nginx 服务器 系列文章目录前言一、准备工作二、设置 Nginx 容器的目录结构三、启动一个临时的 Nginx 容器来复制配置文件四、复制 Nginx 配置文件到本地目录五、删除临时 Nginx 容器六、创建并运行 Nginx 容器&#xff0c;挂载本地目录七、修改 ngin…