FlinkSQL实现实时同步和实时统计过程(MySQL TO MySQL)

news/2025/3/16 5:27:32/

实时同步

注意mysql表的主键要和FlinkSQL的一致

set execution.checkpointing.checkpoints - after - tasks - finish.enabled = true;
SET pipeline.operator - chaining = false;
set state.backend.type = rocksdb;
set execution.checkpointing.interval = 8000;
set state.checkpoints.num - retained = 10;
set cluster.evenly - spread - out - slots = true;CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
)
WITH('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai',--  'server-time-zone' = 'UTC',--  'scan.incremental.snapshot.enabled' = 'true',--  'debezium.snapshot.mode' = 'initial', -- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据--  'debezium.datetime.format.date' = 'yyyy-MM-dd',--  'debezium.datetime.format.time' = 'HH-mm-ss',--  'debezium.datetime.format.datetime' = 'yyyy-MM-dd HH-mm-ss',--  'debezium.datetime.format.timestamp' = 'yyyy-MM-dd HH-mm-ss',--  'debezium.datetime.format.timestamp.zone' = 'UTC+8','table-name' = 'study_score');DROP table IF EXISTS study_score_cdc_sink;CREATE TABLE IF NOT EXISTS study_score_cdc_sink (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
)
WITH('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'study_score_cdc_sink');INSERT INTOstudy_score_cdc_sink 
selectcid,sid,cls,score
fromstudy_score_cdc;

实时统计(无历史记录)

mysql表:

CREATE TABLE `score_statistics` (`cls` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`score` int(11) DEFAULT NULL,PRIMARY KEY (`cls`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

注意mysql表的主键要和FlinkSQL的一致

-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai','table-name' = 'study_score'
);-- Sink 表(所有时间字段为 STRING 类型)
CREATE TABLE IF NOT EXISTS study_score_cdc_statistics_sink (cls STRING PRIMARY KEY NOT ENFORCED,  -- 主键为业务键 cls,和MySQL的score INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'score_statistics','sink.buffer-flush.max-rows' = '1'  -- 控制写入频率
);-- 执行更新逻辑
INSERT INTO study_score_cdc_statistics_sink 
WITH
-- 计算最新聚合数据
newa_records AS (SELECT cls,SUM(score) AS scoreFROM study_score_cdcGROUP BY cls
)
-- 合并新旧数据并执行 UPSERT
SELECT cls,score
FROM newa_records;

拉链表

注意目标表的联合主键和FlinkSQL的保持一致

CREATE TABLE `dim_score_statistics` (`id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`cls` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`score` int(11) DEFAULT NULL,`update_time` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`dw_start_time` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`dw_end_time` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`is_current` tinyint(1) NOT NULL,-- 这儿重点注意以下PRIMARY KEY (`cls`,`dw_end_time`,`is_current`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

实时统计(有历史记录,每次运行如果目标表有数据都会变成历史数据)

-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai','table-name' = 'study_score'
);-- Sink 表(根据 MySQL 主键调整字段顺序)
CREATE TABLE IF NOT EXISTS dim_score_statistics_sink (cls STRING,dw_end_time STRING,is_current BOOLEAN,id STRING,score INT,update_time STRING,dw_start_time STRING,-- 主键必须与 MySQL 的组合主键一致PRIMARY KEY (cls, dw_end_time, is_current) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'dim_score_statistics','sink.buffer-flush.max-rows' = '1'  -- 控制写入频率
);-- 执行更新逻辑
INSERT INTO dim_score_statistics_sink 
WITH
-- 新记录(当前有效)
newa_records AS (SELECT cls,'9999-12-31 23:59:59' AS dw_end_time,TRUE AS is_current,UUID() AS id,SUM(score) AS score,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_start_timeFROM study_score_cdc GROUP BY cls
),
-- 获取所有当前有效旧记录
olda_records AS (SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_timeFROM dim_score_statistics_sinkWHERE is_current = TRUE
),
-- 生成失效的旧记录(主键组合为 (cls, 当前时间, FALSE))
closed_olda_records AS (SELECT olda.cls AS cls,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_end_time,  -- 过期时间设为当前时间FALSE AS is_current,olda.id AS id,olda.score AS score,olda.update_time AS update_time,olda.dw_start_time AS dw_start_timeFROM olda_records AS olda
)
-- 合并新记录和失效的旧记录
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM newa_records
UNION ALL
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM closed_olda_records;

实时统计(有历史记录,只新增score有变动的)

-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc   (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai','table-name' = 'study_score'
);-- Sink 表(主键为 (cls, dw_end_time, is_current))
CREATE TABLE IF NOT EXISTS dim_score_statistics_sink (cls STRING,dw_end_time STRING,is_current BOOLEAN,id STRING,score INT,update_time STRING,dw_start_time STRING,PRIMARY KEY (cls, dw_end_time, is_current) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'dim_score_statistics','sink.buffer-flush.max-rows' = '1'
);-- 执行更新逻辑
INSERT INTO dim_score_statistics_sink
WITH
-- 计算新记录的 score
newa_score AS (SELECT cls,SUM(score) AS new_scoreFROM study_score_cdc  GROUP BY cls
),
-- 获取当前有效旧记录的字段(包含所有字段)
olda_current AS (SELECT cls,score AS old_score,dw_start_time,dw_end_time,is_current,id,update_time  FROM dim_score_statistics_sinkWHERE is_current = TRUE
),
-- 筛选出需要更新的 cls(score 变化或新 cls)
changed_cls AS (SELECT newa.cls,newa.new_score,olda.id AS old_id,olda.old_score,olda.dw_start_time AS old_dw_start_time,olda.dw_end_time AS old_dw_end_time,olda.update_time AS old_update_time  FROM newa_score AS newaLEFT JOIN olda_current AS olda ON newa.cls = olda.clsWHERE newa.new_score <> olda.old_score OR olda.cls IS NULL
),
-- 生成新记录(仅当需要变化时)
newa_records AS (SELECT cls,'9999-12-31 23:59:59' AS dw_end_time,TRUE AS is_current,UUID() AS id,new_score AS score,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_start_timeFROM changed_cls
),
-- 生成失效的旧记录(仅当 score 变化时)
closed_olda_records AS (SELECT olda.cls AS cls,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_end_time,FALSE AS is_current,olda.id AS id,olda.old_score AS score,olda.update_time AS update_time,  olda.dw_start_time AS dw_start_time  -- 修正:直接引用 olda_current 的 dw_start_timeFROM changed_cls AS changed JOIN olda_current AS olda ON changed.cls = olda.cls WHERE changed.new_score <> olda.old_score
)
-- 合并新记录和失效的旧记录
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM newa_records
UNION ALL
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM closed_olda_records;

http://www.ppmy.cn/news/1579503.html

相关文章

RocketMQ面试题:进阶部分

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

Docker+Flask 实战:打造高并发微服务架构

DockerFlask 实战&#xff1a;打造高并发微服务架构 今天我们要深入探讨一个非常热门且实用的主题&#xff1a;基于 Docker 部署 Python Flask 应用。Docker 作为当下最流行的容器化技术&#xff0c;已经广泛应用于各种开发和部署场景&#xff0c;尤其是在微服务架构中。而 Fl…

深入理解 HTML 链接:网页导航的核心元素

在网页开发的广袤领域中&#xff0c;HTML 链接无疑扮演着举足轻重的角色&#xff0c;它是实现网页之间无缝跳转、构建互联网络世界的核心部分。无论是引导用户在不同页面间穿梭&#xff0c;还是关联各类资源&#xff0c;HTML 链接都发挥着关键作用。 一、HTML 链接基础认知 HT…

Pycharm中脚本执行的3种模式——unittest框架、pytest框架及普通模式

一. Python 运行脚本的三种模式 a. unittest 框架 b. pytest 框架 c. 普通模式 二、PyCharm 默认使用 pytest 框架执行 unittest 框架的测试用例 三、如何修改Pycharm的脚本运行的模式? 方法1. 修改 PyCharm 默认的测试框架 方法2. 设置运行脚本时的默认框架 四、mai…

【每日学点HarmonyOS Next知识】拖拽调整列表顺序、tab回弹、自定义弹窗this、状态变量修饰枚举

1、HarmonyOS 功能实现&#xff08;拖拽调整列表顺序&#xff09;&#xff1f; 可参考&#xff1a; import curves from ohos.curves; import Curves from ohos.curvesEntry Component struct ListItemExample {State private arr: number[] [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]…

【openGauss】物理备份恢复

文章目录 1. gs_backup&#xff08;1&#xff09;备份&#xff08;2&#xff09;恢复&#xff08;3&#xff09;手动恢复的办法 2. gs_basebackup&#xff08;1&#xff09;备份&#xff08;2&#xff09;恢复① 伪造数据目录丢失② 恢复 3. gs_probackup&#xff08;1&#xf…

MySQL隐式依赖引发的字段长度溢出:一次触发器事故的深度剖析

MySQL隐式依赖引发的字段长度溢出&#xff1a;一次触发器事故的深度剖析 场景还原&#xff1a;诡异的字段不存在报错 某日接到生产环境报警&#xff0c;发现核心业务表order_main&#xff08;A表&#xff09;的插入操作频繁报错&#xff0c;错误提示却显示ERROR 1406 (22001)…

极客天成 NVFile 并行文件存储:端到端无缓存新范式,为 AI 训练按下“快进键”

在人工智能的世界里&#xff0c;AI 训练就像一场“数据马拉松”。模型需要从海量数据中学习规律&#xff0c;而这些数据的读取速度往往决定了训练的效率。今天&#xff0c;我们就来聊聊一个有趣的话题&#xff1a;极客天成的 NVFile 并行文件存储&#xff0c;以及它的端到端无缓…