实时同步
注意mysql表的主键要和FlinkSQL的一致
set execution.checkpointing.checkpoints - after - tasks - finish.enabled = true;
SET pipeline.operator - chaining = false;
set state.backend.type = rocksdb;
set execution.checkpointing.interval = 8000;
set state.checkpoints.num - retained = 10;
set cluster.evenly - spread - out - slots = true;CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
)
WITH('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai',-- 'server-time-zone' = 'UTC',-- 'scan.incremental.snapshot.enabled' = 'true',-- 'debezium.snapshot.mode' = 'initial', -- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据-- 'debezium.datetime.format.date' = 'yyyy-MM-dd',-- 'debezium.datetime.format.time' = 'HH-mm-ss',-- 'debezium.datetime.format.datetime' = 'yyyy-MM-dd HH-mm-ss',-- 'debezium.datetime.format.timestamp' = 'yyyy-MM-dd HH-mm-ss',-- 'debezium.datetime.format.timestamp.zone' = 'UTC+8','table-name' = 'study_score');DROP table IF EXISTS study_score_cdc_sink;CREATE TABLE IF NOT EXISTS study_score_cdc_sink (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
)
WITH('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'study_score_cdc_sink');INSERT INTOstudy_score_cdc_sink
selectcid,sid,cls,score
fromstudy_score_cdc;
实时统计(无历史记录)
mysql表:
CREATE TABLE `score_statistics` (`cls` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`score` int(11) DEFAULT NULL,PRIMARY KEY (`cls`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
注意mysql表的主键要和FlinkSQL的一致
-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai','table-name' = 'study_score'
);-- Sink 表(所有时间字段为 STRING 类型)
CREATE TABLE IF NOT EXISTS study_score_cdc_statistics_sink (cls STRING PRIMARY KEY NOT ENFORCED, -- 主键为业务键 cls,和MySQL的score INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'score_statistics','sink.buffer-flush.max-rows' = '1' -- 控制写入频率
);-- 执行更新逻辑
INSERT INTO study_score_cdc_statistics_sink
WITH
-- 计算最新聚合数据
newa_records AS (SELECT cls,SUM(score) AS scoreFROM study_score_cdcGROUP BY cls
)
-- 合并新旧数据并执行 UPSERT
SELECT cls,score
FROM newa_records;
拉链表
注意目标表的联合主键和FlinkSQL的保持一致
CREATE TABLE `dim_score_statistics` (`id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`cls` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`score` int(11) DEFAULT NULL,`update_time` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`dw_start_time` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`dw_end_time` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,`is_current` tinyint(1) NOT NULL,-- 这儿重点注意以下PRIMARY KEY (`cls`,`dw_end_time`,`is_current`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
实时统计(有历史记录,每次运行如果目标表有数据都会变成历史数据)
-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai','table-name' = 'study_score'
);-- Sink 表(根据 MySQL 主键调整字段顺序)
CREATE TABLE IF NOT EXISTS dim_score_statistics_sink (cls STRING,dw_end_time STRING,is_current BOOLEAN,id STRING,score INT,update_time STRING,dw_start_time STRING,-- 主键必须与 MySQL 的组合主键一致PRIMARY KEY (cls, dw_end_time, is_current) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'dim_score_statistics','sink.buffer-flush.max-rows' = '1' -- 控制写入频率
);-- 执行更新逻辑
INSERT INTO dim_score_statistics_sink
WITH
-- 新记录(当前有效)
newa_records AS (SELECT cls,'9999-12-31 23:59:59' AS dw_end_time,TRUE AS is_current,UUID() AS id,SUM(score) AS score,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_start_timeFROM study_score_cdc GROUP BY cls
),
-- 获取所有当前有效旧记录
olda_records AS (SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_timeFROM dim_score_statistics_sinkWHERE is_current = TRUE
),
-- 生成失效的旧记录(主键组合为 (cls, 当前时间, FALSE))
closed_olda_records AS (SELECT olda.cls AS cls,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_end_time, -- 过期时间设为当前时间FALSE AS is_current,olda.id AS id,olda.score AS score,olda.update_time AS update_time,olda.dw_start_time AS dw_start_timeFROM olda_records AS olda
)
-- 合并新记录和失效的旧记录
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM newa_records
UNION ALL
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM closed_olda_records;
实时统计(有历史记录,只新增score有变动的)
-- 配置参数(原有配置保留)
SET execution.checkpointing.checkpoints-after-tasks-finish.enabled = true;
SET pipeline.operator-chaining = false;
SET state.backend.type = rocksdb;
SET execution.checkpointing.interval = 8000;
SET state.checkpoints.num-retained = 10;
SET cluster.evenly-spread-out-slots = true;
SET execution.time-characteristic = 'ProcessingTime';
SET table.exec.timezone = 'Asia/Shanghai';-- CDC 源表(保持不变)
CREATE TABLE IF NOT EXISTS study_score_cdc (cid INT,sid INT,cls STRING,score INT,PRIMARY KEY (cid) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.20.23','port' = '3306','username' = 'root','password' = 'ccphl123!','database-name' = 'db_eugene_test','server-time-zone' = 'Asia/Shanghai','table-name' = 'study_score'
);-- Sink 表(主键为 (cls, dw_end_time, is_current))
CREATE TABLE IF NOT EXISTS dim_score_statistics_sink (cls STRING,dw_end_time STRING,is_current BOOLEAN,id STRING,score INT,update_time STRING,dw_start_time STRING,PRIMARY KEY (cls, dw_end_time, is_current) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.21.201:3308/db_eugene_test_01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','username' = 'root','password' = 'redrcd@123','table-name' = 'dim_score_statistics','sink.buffer-flush.max-rows' = '1'
);-- 执行更新逻辑
INSERT INTO dim_score_statistics_sink
WITH
-- 计算新记录的 score
newa_score AS (SELECT cls,SUM(score) AS new_scoreFROM study_score_cdc GROUP BY cls
),
-- 获取当前有效旧记录的字段(包含所有字段)
olda_current AS (SELECT cls,score AS old_score,dw_start_time,dw_end_time,is_current,id,update_time FROM dim_score_statistics_sinkWHERE is_current = TRUE
),
-- 筛选出需要更新的 cls(score 变化或新 cls)
changed_cls AS (SELECT newa.cls,newa.new_score,olda.id AS old_id,olda.old_score,olda.dw_start_time AS old_dw_start_time,olda.dw_end_time AS old_dw_end_time,olda.update_time AS old_update_time FROM newa_score AS newaLEFT JOIN olda_current AS olda ON newa.cls = olda.clsWHERE newa.new_score <> olda.old_score OR olda.cls IS NULL
),
-- 生成新记录(仅当需要变化时)
newa_records AS (SELECT cls,'9999-12-31 23:59:59' AS dw_end_time,TRUE AS is_current,UUID() AS id,new_score AS score,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_start_timeFROM changed_cls
),
-- 生成失效的旧记录(仅当 score 变化时)
closed_olda_records AS (SELECT olda.cls AS cls,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS dw_end_time,FALSE AS is_current,olda.id AS id,olda.old_score AS score,olda.update_time AS update_time, olda.dw_start_time AS dw_start_time -- 修正:直接引用 olda_current 的 dw_start_timeFROM changed_cls AS changed JOIN olda_current AS olda ON changed.cls = olda.cls WHERE changed.new_score <> olda.old_score
)
-- 合并新记录和失效的旧记录
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM newa_records
UNION ALL
SELECT cls,dw_end_time,is_current,id,score,update_time,dw_start_time
FROM closed_olda_records;