Flink实时数仓同步:拉链表实战详解

news/2024/12/23 5:32:53/

一、背景

在大数据领域,业务数据通常最初存储在关系型数据库,例如MySQL。然而,为了满足日常分析和报表等需求,大数据平台会采用多种不同的存储方式来容纳这些业务数据。这些存储方式包括离线仓库、实时仓库等,根据不同的业务需求和数据特性进行选择。

举例来说,假设业务部门需要在大数据平台中查看历史某一天的表数据,如下:

  1. [Mysql] 业务数据 - 用户表全量数据:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. [大数据平台] 2023-06-03 日业务人员在大数据平台中查看2023-06-02日用户表的历史数据,期望数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00

根据以上需求,业务人员希望既能够查看当天的实时数据,又希望查看以天为粒度的历史数据。这类需求比较常见,通常可以采用两种解决方式:

  1. Lambda架构
  2. 实时同步 + 拉链表架构

二、Lambda架构

实时领域的Lambda架构是一种大数据架构模式,旨在处理实时数据流和历史数据批处理,以满足同时满足实时查询和历史数据分析的需求。Lambda架构的核心思想是将数据分成两个独立的流程:实时流程和批处理流程,并在最终层将它们合并,以提供一致的查询结果,如下:

在这里插入图片描述

  1. 实时流程(Real-time Layer):实时流程负责处理实时产生的数据流。它通常包括以下关键组件:

    • 数据源:实时数据源,如binlog日志等。
    • 实时引擎:用于实时数据的处理和转换,例如Apache Kafka、Apache Flink等。
    • 存储层:用于存储实时数据,特点是插入快,支持OLAP查询。
  2. 离线处理流程(Batch Layer):离线处理流程用于处理历史数据,通常以 T+1 凌晨跑批方式运行,主要包括以下组件:

    • 数据仓库:批处理数据存储,通常使用分布式数据仓库,如Apache Hadoop HDFS、Apache Hive等。
    • 批处理作业:用于处理历史数据的定期批处理作业,例如数据清洗、转换和聚合。
  3. 合并层(Serving Layer):合并层负责将实时和历史数据合并以提供一致的查询接口:

    • 数据服务:根据用户查询内容选择性调用不同存储服务,用于将实时数据和批处理数据合并以生成一致的视图。
  4. Lambda架构的主要优点包括:

    • 实时性:能够提供近实时的数据处理和反馈,适用于需要快速决策和实时监控的场景。

    • 容错性:通过将数据存储在持久性存储中,保证了数据的可靠性和可恢复性。

    • 灵活性:可以应对多种不同的数据类型和查询需求,适用于各种大数据应用。

注意:尽管Lambda架构可以满足业务人员查看用户的实时或历史数据的需求,但离线数据仓库通常采用T+1批处理方式运行,因此在需要高度一致性的场景下会出现数据不一致问题。故本文未采用Lambda架构;

若想详细了解一致性问题的情况,请参考笔者另一篇文章:深入数仓离线数据同步:问题分析与优化措施

三、实时同步+拉链表架构

为了满足业务人员对实时或历史数据的高度一致性需求,并且为了简化架构,这里采用了实时+拉链表的技术方案。在这个架构中,只使用了一种计算引擎,具体的技术组件为 Flink-cdc-2.x + Doris。以下是我们架构的设计概述:

在这里插入图片描述

此架构的关键在于实时同步逻辑及拉链表设计这两块的实现。

3.1、拉链表设计

拉链表是一种维护历史状态以及最新状态数据的表,与快照表类似,算是在快照表的基础上去除了重复状态的数据;使用拉链表在更新频率和比例不是很大的情况下会十分节省存储。

3.1.1、示例

  1. 我们以背景需求为例,[Mysql]业务数据用户表如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Doris]此时实时同步到Doris的拉链表数据为:
idnamephonegendercreate_timeupdate_timeexpirestart dateend date
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31

可以看到拉链表多了expire,start date,end date 三个字段,用于表示该条数据是否过期、开始时间及有效时间,下面会有说明

  1. [Mysql] 2023-06-02 业务数据新增了一名用户,更新了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更改/新增数据

  1. [Doris]此时实时同步到Doris的拉链表数据为:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01(由9999-12-31改为2023-06-01)
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31(新增一条拉链数据)
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31(新增一条最新用户数据)

由于tom的手机号被修改,根据拉链表特性此时会新增一条最新的tom数据,且过期时间为9999-12-31,旧数据不会删除而是将有效时间end date改为2023-06-01

  1. [Mysql] 2023-06-03 当天多次更新业务数据jason用户的手机号,sql及表数据如下:
UPDATE `user` SET `phone`='333', `update_time`='2023-06-03 10:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='444', `update_time`='2023-06-03 12:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='555', `update_time`='2023-06-03 14:00:00' WHERE `id`=2;
idnamephonegendercreate_timeupdate_time备注
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason5552023-06-01 13:00:002023-06-03 14:00:00(jason手机号从222 -> 333 -> 444 -> 555更改了三次)
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. [Doris]此时实时同步到Doris的拉链表数据为:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-02(由9999-12-31改为2023-06-02)
2jason3332023-06-01 13:00:002023-06-03 10:00:0012023-06-032023-06-03(当天更新多次的过期数据)
2jason4442023-06-01 13:00:002023-06-03 12:00:0012023-06-032023-06-03(当天更新多次的过期数据)
2jason5552023-06-01 13:00:002023-06-03 14:00:0002023-06-039999-12-31(新增一条拉链数据)
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31

使用 expire 字段来表示记录是否过期,下面会说明

  1. 说明
  • start_date表示该条记录的生命周期开始时间【第一次全量同步时为系统时间,增量同步时为update_time时间】,end_date表示该条记录的生命周期结束时间

  • end_date = '9999-12-31’表示该条记录为最新数据

  • end_date = '2023-06-02’表示该条记录仅在2023-06-02当日有效

  • expire字段用于标识记录的状态,1表示记录已过期,0表示记录有效。该字段目的是用于过滤那些在一天之内多次更新的数据

  • 如果查询当前的最新记录,sql为:select * from user where end_date = ‘9999-12-31’

  • 如果查询2023-06-02的历史快照,sql为:select * from user where start_date <= ‘2023-06-02’ and end_date >= ‘2023-06-02’ and expire = 0(此处是拉链表比较重要的一块)

  • 解释上一条sql:需求是要查2023-06-02的历史快照,故start_date <= ‘2023-06-02’;而end_date = '2023-06-02’表示该条记录在2023-06-02当日是有效的,又因为end_date = '9999-12-31’表示目前一直处于有效状态【有可能从2023-06-02到目前一直有效的数据】,所以end_date >= ‘2023-06-02’

  • 示例:查询2023-06-01日历史数据:select * from user where start_date <= ‘2023-06-01’ and end_date >= ‘2023-06-01’ and expore = 0

idnamephonegendercreate_timeupdate_timestart dateend date
1jack1112023-06-01 13:00:002023-06-01 13:00:002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-02
3tom3332023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-01
  • 示例:查询2023-06-02日历史数据:select * from user where start_date <= ‘2023-06-02’ and end_date >= ‘2023-06-02’ and expore = 0
idnamephonegendercreate_timeupdate_timestart dateend date
1jack1112023-06-01 13:00:002023-06-01 13:00:002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-02
3tom4442023-06-01 13:00:002023-06-02 09:00:002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:002023-06-029999-12-31
  • 示例:查询最新实时数据:select * from user where end_date = ‘9999-12-31’
idnamephonegendercreate_timeupdate_timestart dateend date
1jack1112023-06-01 13:00:002023-06-01 13:00:002023-06-019999-12-31
2jason5552023-06-01 13:00:002023-06-03 14:00:002023-06-039999-12-31
3tom4442023-06-01 13:00:002023-06-02 09:00:002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:002023-06-029999-12-31

3.1.2、建表设计

在Doris中的表设计中,采用了Unique数据模型,这个决策的背后有一个关键因素,即利用唯一Key来处理Flink作业崩溃和重新启动时的数据覆盖操作,以及通过下游的幂等性来确保端到端的数据一致性。

唯一Key的选择在这里起到了至关重要的作用。在拉链表中,由于用户ID可能重复出现的情况【例如2023-06-02号tom就有两条数据】,故选择了一个组合Key: UNIQUE KEY(id, update_time) 来确保数据的唯一性。这种设计使得无论在什么情况下,我们都能够通过这个唯一Key来维护数据的一致性,即使在处理实时数据时发生了异常情况或重新启动作业时也不会出现问题。

  • 以上述为例建表语句如下:
CREATE TABLE IF NOT EXISTS example_user_zip
(`id` LARGEINT NOT NULL COMMENT "用户id",`update_time` DATETIME COMMENT "用户更新时间",`create_time` DATETIME COMMENT "用户注册时间",`name` VARCHAR(50) NOT NULL COMMENT "用户昵称",`phone` LARGEINT COMMENT "手机号",`gender` VARCHAR(5) COMMENT "用户性别",`expire` TINYINT DEFAULT '0' COMMENT "数据是否过期:0为有效,1为过期",`start_date` DATE COMMENT "开始时间",`end_date` DATE COMMENT "有效时间"
)
UNIQUE KEY(`id`, `update_time`) -- UNIQUE模型
COMMENT "用户拉链表"
DISTRIBUTED BY HASH(`id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

关于doris更多数据模型可参考官网

3.2、实时同步逻辑【重要】

为了更清晰地解释拉链表的同步逻辑,我将以场景的方式逐步说明,如下:

  1. 全量更新

  2. 增量更新

    1. 新增数据
    2. 跨天更新数据
    3. 某条数据当天多次更新
    4. 删除数据
  3. 并发更新

3.2.1、全量更新

  1. 需先明确一点:拉链表的历史数据查询范围是从实时任务同步的那天开始,因为只有在实时任务开始同步的那一天之后,拉链表才正式形成,之前的历史数据是不可查询的。因此,当进行第一次全量同步时,我们会将 start_date 设置为当前系统日期。

  2. 另外,由于实时拉链表同步需要明确区分全量和增量更新,以及后续对 binlog 数据进行解析及判断增量更新操作类型,因此,Flink CDC SQL 方式的表建立不再满足我们的要求。为了更好地实现这一功能,我们需要采用 API 方式来构建解决方案,代码如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*"..tableList("yourDatabaseName.yourTableName") // 设置捕获的表.username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 3s 的 checkpoint 间隔env.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// 设置 source 节点的并行度为 4.setParallelism(4).print().setParallelism(1); // 设置 sink 节点并行度为 1 env.execute("Print MySQL Snapshot + Binlog");}
}

代码摘自mysql-cdc-connector官网示例

  1. 这里我们仍以2023-06-01的[Mysql]业务数据为例:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. 此时Flink应用获取到的数据如下:仅展示一条
{"before": null,"after": {		 # 实际数据"id": 1,"name": "jack","phone": "111","gender": "男","create_time": "2023-06-01T05:00:00Z",  # 该日期是UTC时间,只需增加8小时即可转化为北京时间"update_time": "2023-06-01T05:00:00Z"	# 该日期是UTC时间,只需增加8小时即可转化为北京时间},"source": {		 # 元数据"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 0,"snapshot": "false","db": "yushu_dds","sequence": null,"table": "user","server_id": 0,"gtid": null,"file": "","pos": 0,"row": 0,"thread": null,"query": null},"op": "r",  	 # 记录每条数据的操作类型[重要]"ts_ms": 1705471382867,"transaction": null
}
  1. 在我们使用 Flink CDC MySQL 同步数据时,默认采用 initial 模式,这意味着首先进行全量同步,然后再进行增量同步。因此,在区分全量和增量同步时,关键在于观察获取到的数据中的 op 字段。op 字段是用来记录每条数据的操作类型的标志。具体的操作类型如下:
    • op=d 代表删除操作
    • op=u 代表更新操作
    • op=c 代表新增操作
    • op=r 代表全量读取,而不是来自 binlog 的增量读取
  2. 在 Flink 程序中,只需要通过 op=r 即可筛选出全量数据。在全量数据同步阶段,Doris 拉链表的 start_date 字段设置为系统当前日期,而 end_date 字段则设置为 ‘9999-12-31’。导入语句如下:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(1, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'jack', 111, '男', 0, '2023-06-01', '9999-12-31'),
(2, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'jason', 222, '男', 0, '2023-06-01', '9999-12-31'),
(3, '2023-06-01 13:00:00', '2023-06-01 13:00:00', 'tom', 333, '男', 0, '2023-06-01', '9999-12-31');
  1. 此时doris拉链表数据如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend date
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31

3.2.2、增量更新

当全量更新结束后即为增量更新,请注意以下内容:

  1. 在增量更新时,Doris 拉链表中的 start_date 字段【即开始时间】不再使用系统时间,而是业务数据的 update_time 截取后的日期。例如,update_time 为 “2023-06-02 13:00:00”,则对应的 start_date 为 “2023-06-02”。这么做的目的是为了确保使用事件时间来划分数据的开始时间,而不是系统时间。
  2. 举例来说,如果采用系统时间,假设实时同步任务某一天宕机并且没有重启,等到隔天再重启,那么 start_date 就会变成隔天日期,从而导致昨天的数据丢失。
  3. 为什么不使用业务数据的 create_time 作为拉链表的 start_date 呢?这是因为在业务数据更改时,通常只会更新 update_time。例如,2023-06-02 日更新了 “Tom” 的手机号码,此时同步到 Doris 新增的拉链数据如果使用 create_time,那么 start_date 仍然会是 “2023-06-01”,而实际上该条数据应该从 “2023-06-02” 日开始生效。因此,使用 update_time 更加合理,确保拉链表中的数据始终按照业务数据的更新时间来进行正确的版本管理。

接下来,我们将逐一讲解以下四个场景:新增更新、跨天更新、某条数据当天多次更新以及删除更新。

3.2.2.1、新增更新
  1. 我们仍以最初的示例为例:[Mysql] 2023-06-02 业务数据新增了一名用户,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. 此时Flink应用获取到的数据如下所示:
{"before": null,"after": {"id": 4,"name": "tony","phone": "555","gender": "男","create_time": "2023-06-02T02:00:00Z","update_time": "2023-06-02T02:00:00Z"},"source": {# 此处元数据省略},"op": "c","ts_ms": 1705477497504,"transaction": null
}
  1. 可以看到op=c 代表新增操作,对于新增操作doris拉链表的start_end为业务数据的update_time,而end_date均设置为9999-12-31,导入语句如下:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-02 10:00:00', '2023-06-02 10:00:00', 'tony', 555, '男', 0, '2023-06-02', '9999-12-31');
  1. 此时doris拉链表内容如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31

加粗表示更新/新增数据

3.2.2.2、跨天更新

首先,解释一下为何要需要区分两种不同的更新场景:跨天更新和当天多次更新。这涉及到拉链表的历史数据粒度,拉链表通常以天为单位。如果一条数据在同一天内多次更新,那么每次更新后的数据的生存时间将只有几小时甚至几分钟。在这种情况下,我们希望在拉链表中将这种多次更新的临时数据设为过期数据;细节在后续会有讲解,先来解释跨天更新场景。

  1. 我们仍以最初的示例为例:[Mysql] 2023-06-02 业务数据更新了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. 此时Flink应用获取到的数据如下所示:
{"before": {	 # 更新前的数据"id": 3,"name": "tom","phone": "333","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-01T05:00:00Z"},"after": {	# 更新后的数据"id": 3,"name": "tom","phone": "444",  # 手机号更新"gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-02T01:00:00Z"  # 更新时间更新},"source": {# 此处元数据省略},"op": "u","ts_ms": 1705479637926,"transaction": null
}
  1. 当我们在Flink应用中遇到op=u(代表更新操作)时,首先需要检查beforeafter字段中的update_time是否跨越了天粒度。可能跨越一天,也可能跨越多天,我们将在Doris拉链表中执行两条SQL语句:一条更新语句和一条插入语句。
    1. 对于更新语句,我们将更新拉链表中旧数据id的end_date字段,将其设置为after字段中update_time的前一天2023-06-01
    2. 对于插入语句,我们将插入after字段中的新数据,将start_date设置为update_time的日期,end_date设置9999-12-31),以确保该数据在拉链表中一直有效。
    3. sql如下所示:
-- 更新语句:
UPDATE example_user_zip SET end_date = '2023-06-01' WHERE `id`=3 AND `update_time`='2023-06-01 13:00:00';-- 插入语句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(3, '2023-06-02 09:00:00', '2023-06-01 13:00:00', 'tom', 444, '男', 0, '2023-06-02', '9999-12-31');
  1. 此时doris拉链表内容如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31

加粗表示更新/新增数据

  1. 此时若要查看2023-06-01历史数据只需执行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-01' AND end_date >= '2023-06-01' AND expire = 0;
idnamephonegendercreate_timeupdate_timeexpirestart dateend date
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3.2.2.3、某条数据当天多次更新

在我们的拉链表中,数据的粒度是以天为单位。如果一条数据在同一天内多次更新,我们的处理策略是取最后一次更新为有效数据,而将之前的更新标记为过期数据。为了标记数据是否过期,我们会将过期数据的expire字段设置为1。

  1. 我们仍以最初的示例为例:[Mysql] 2023-06-03 当天多次更新业务数据jason用户的手机号,sql及表数据如下:
UPDATE `user` SET `phone`='333', `update_time`='2023-06-03 10:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='444', `update_time`='2023-06-03 12:00:00' WHERE `id`=2;
UPDATE `user` SET `phone`='555', `update_time`='2023-06-03 14:00:00' WHERE `id`=2;
idnamephonegendercreate_timeupdate_time备注
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason5552023-06-01 13:00:002023-06-03 14:00:00(jason手机号从222 -> 333 -> 444 -> 555更改了三次)
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. 此时Flink应用获取到的数据如下所示:
{"before": {"id": 2,"name": "jason","phone": "222","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-01T05:00:00Z"},"after": {"id": 2,"name": "jason","phone": "333","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-03T02:00:00Z"},"source": {# 元数据忽略		},"op": "u","ts_ms": 1705548298335,"transaction": null
},
{"before": {"id": 2,"name": "jason","phone": "333","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-03T02:00:00Z"},"after": {"id": 2,"name": "jason","phone": "444","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-03T04:00:00Z"},"source": {# 元数据忽略		},"op": "u","ts_ms": 1705548298392,"transaction": null
},
{"before": {"id": 2,"name": "jason","phone": "444","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-03T04:00:00Z"},"after": {"id": 2,"name": "jason","phone": "555","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-03T06:00:00Z"},"source": {# 元数据忽略},"op": "u","ts_ms": 1705548298484,"transaction": null
}
  1. 当我们在Flink应用中遇到op=u(代表更新操作),且检查beforeafter字段中的update_time属于同一天,我们将在Doris拉链表中执行两条SQL语句:一条更新语句和一条插入语句。

    1. 对于更新语句,我们将更新拉链表中旧数据id的expire字段设置为1,将其设置为end_date字段值设置为update_time的当天日期2023-06-03
    2. 对于插入语句,我们将插入after字段中的新数据,将start_date设置为update_time的当天日期,end_date设置9999-12-31),以确保该数据在拉链表中一直有效。
    3. sql如下所示:
    -- 222 -> 333 跨天更新语句:
    UPDATE example_user_zip SET end_date = '2023-06-02' WHERE `id`=2 AND `update_time`='2023-06-01 13:00:00';-- 222 -> 333 跨天插入语句:
    INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
    VALUES 
    (2, '2023-06-03 10:00:00', '2023-06-01 13:00:00', 'jason', 333, '男', 0, '2023-06-03', '9999-12-31');-- 333 -> 444 同一天更新语句:
    UPDATE example_user_zip SET expire = 1, end_date = '2023-06-03' WHERE `id`=2 AND `update_time`='2023-06-03 10:00:00';-- 333 -> 444 同一天插入语句:
    INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
    VALUES 
    (2, '2023-06-03 12:00:00', '2023-06-01 13:00:00', 'jason', 444, '男', 0, '2023-06-03', '9999-12-31');-- 444 -> 555 同一天更新语句:
    UPDATE example_user_zip SET expire = 1, end_date = '2023-06-03' WHERE `id`=2 AND `update_time`='2023-06-03 12:00:00';-- 444 -> 555 同一天插入语句:
    INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
    VALUES 
    (2, '2023-06-03 14:00:00', '2023-06-01 13:00:00', 'jason', 555, '男', 0, '2023-06-03', '9999-12-31');
    
  2. 此时doris拉链表内容如下所示:

idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-02(由9999-12-31改为2023-06-02)
2jason3332023-06-01 13:00:002023-06-03 10:00:0012023-06-032023-06-03(当天更新多次的过期数据)
2jason4442023-06-01 13:00:002023-06-03 12:00:0012023-06-032023-06-03(当天更新多次的过期数据)
2jason5552023-06-01 13:00:002023-06-03 14:00:0002023-06-039999-12-31(新增一条最新数据)
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31
  1. 此时若要查看2023-06-03历史数据只需执行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-03' AND end_date >= '2023-06-03' AND expire = 0;
idnamephonegendercreate_timeupdate_timestart_dateend_date
1jack1112023-06-01 13:00:002023-06-01 13:00:002023-06-019999-12-31
2jason2222023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-02
3tom4442023-06-01 13:00:002023-06-02 09:00:002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:002023-06-029999-12-31
3.2.2.4、删除更新

由于[Mysql]业务数据都具备唯一键,故业务数据的删除同步至拉链表无需判断是否跨天,只需更新删除数据的end_date日期为前一天即可。

  1. [Mysql] 2023-06-04 当天删除业务数据jack,表数据如下:
idnamephonegendercreate_timeupdate_time备注
2jason5552023-06-01 13:00:002023-06-03 14:00:00(jason手机号从222 -> 333 -> 444 -> 555更改了三次)
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. 此时Flink应用获取到的数据如下所示:
{"before": {"id": 1,"name": "jack","phone": "111","gender": "男","create_time": "2023-06-01T05:00:00Z","update_time": "2023-06-01T05:00:00Z"},"after": null,"source": {# 忽略元数据},"op": "d", 	# 操作类型"ts_ms": 1705561813650,"transaction": null
}
  1. 可以看到op=d 代表删除操作,对于删除操作doris拉链表只需将before数据的date_date日期更新为前一日2023-06-03,导入语句如下:
-- 更新语句
UPDATE example_user_zip SET end_date = '2023-06-03' WHERE `id`=1 AND `update_time`='2023-06-01 13:00:00';
  1. 此时doris拉链表内容如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-03(由9999-12-31改为2023-06-03)
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-02
2jason3332023-06-01 13:00:002023-06-03 10:00:0012023-06-032023-06-03
2jason4442023-06-01 13:00:002023-06-03 12:00:0012023-06-032023-06-03
2jason5552023-06-01 13:00:002023-06-03 14:00:0002023-06-039999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31
  1. 此时若要查看2023-06-04数据只需执行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-04' AND end_date >= '2023-06-04' AND expire = 0;
idnamephonegendercreate_timeupdate_timestart_dateend_date
2jason2222023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-02
3tom4442023-06-01 13:00:002023-06-02 09:00:002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:002023-06-029999-12-31

3.2.3、并发更新

这里单独强调并发更新场景是因为在关系型数据库中,例如MySQL,通常使用timestamp类型来表示update_time,而该数据类型的最细粒度是秒。因此,当多个并发操作同时更新同一条数据时,update_time的值只会发生一次变化,但会产生多条binlog日志。由于Doris的拉链表以id + update_time作为唯一键,这种情况下会导致同一条数据多次更新。因此,这里单独讲解并发更新的情况。

需要注意的是,并发问题只存在于更新操作,删除和创建操作不会出现上述问题。

  1. [Mysql] 2023-06-05 当天 15:00:00 并发更新业务数据tony的手机号,表数据如下:
idnamephonegendercreate_timeupdate_time备注
2jason5552023-06-01 13:00:002023-06-03 14:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony7772023-06-02 10:00:002023-06-05 15:00:00(tony手机号从555-> 666-> 777 并发更改两次)
  1. 此时Flink应用获取到的数据如下所示:
{"before": {"id": 4,"name": "tony","phone": "555","gender": "男","create_time": "2023-06-02T02:00:00Z","update_time": "2023-06-02T02:00:00Z"},"after": {"id": 4,"name": "tony","phone": "666","gender": "男","create_time": "2023-06-02T02:00:00Z","update_time": "2023-06-05T07:00:00Z"},"source": {# 元数据忽略},"op": "u","ts_ms": 1705564093414,"transaction": null
},
{"before": {"id": 4,"name": "tony","phone": "666","gender": "男","create_time": "2023-06-02T02:00:00Z","update_time": "2023-06-05T07:00:00Z"},"after": {"id": 4,"name": "tony","phone": "777","gender": "男","create_time": "2023-06-02T02:00:00Z","update_time": "2023-06-05T07:00:00Z"},"source": {# 元数据忽略},"op": "u","ts_ms": 1705564093478,"transaction": null
}
  1. 可以看到op=u 代表更新操作,这里我们仍沿用增量更新的逻辑,第一条日志中业务数据555->666属于跨天更新,第二条日志中业务数据666->777属于一条数据当天多次更新,DorisSql如下所示:
-- 555 -> 666 跨天更新语句:
UPDATE example_user_zip SET end_date = '2023-06-04' WHERE `id`=4 AND `update_time`='2023-06-02 10:00:00';-- 555 -> 666 跨天插入语句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 666, '男', 0, '2023-06-05', '9999-12-31');-- 666 -> 777 同一天更新语句:
UPDATE example_user_zip SET expire = 1, end_date = '2023-06-05' WHERE `id`=4 AND `update_time`='2023-06-05 15:00:00';-- 666 -> 777 同一天插入语句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 777, '男', 0, '2023-06-05', '9999-12-31');
  1. 此时doris拉链表内容如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-03
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-02
2jason3332023-06-01 13:00:002023-06-03 10:00:0012023-06-032023-06-03
2jason4442023-06-01 13:00:002023-06-03 12:00:0012023-06-032023-06-03
2jason5552023-06-01 13:00:002023-06-03 14:00:0002023-06-039999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-022023-06-04(由9999-12-31改为2023-06-04)
4tony7772023-06-02 10:00:002023-06-05 15:00:0002023-06-059999-12-31(新增一条最新数据)

此时可以看到新增了一条Tony的数据,有些人可能注意到少了一条姓名为Tony、手机号为666、expire字段为1的数据。这是因为最后的更新和插入语句中的id + update_time完全一致,触发了Doris的replace替换操作。因此,最后一条插入语句覆盖了前一条更新语句,即"Tony, 666, expire=1"的数据被覆盖掉了。而这种替换操作反而变相解决了并发更新的问题。

  1. 此时若要查看2023-06-04数据只需执行:
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-05' AND end_date >= '2023-06-05' AND expire = 0;
idnamephonegendercreate_timeupdate_timestart_dateend_date
2jason2222023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-02
3tom4442023-06-01 13:00:002023-06-02 09:00:002023-06-029999-12-31
4tony7772023-06-02 10:00:002023-06-05 15:00:002023-06-059999-12-31
  1. 此时有些同学会提出问题,即这个情况和上文中的跨天更新以及当天多次更新的逻辑有何不同? 似乎没有特殊的操作逻辑。确实,从逻辑上看,这两种情况是一致的。这是因为我们先执行更新操作,然后再执行新增操作。如果我们反过来,先执行新增操作,然后再执行更新操作,就会导致数据丢失。接下来,让我们看一下如果先执行新增操作再执行更新操作会发生什么情况。首先,我们将Doris拉链表恢复到前一天,如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-03(由9999-12-31改为2023-06-03)
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-02
2jason3332023-06-01 13:00:002023-06-03 10:00:0012023-06-032023-06-03
2jason4442023-06-01 13:00:002023-06-03 12:00:0012023-06-032023-06-03
2jason5552023-06-01 13:00:002023-06-03 14:00:0002023-06-039999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-029999-12-31
  1. 接下来我们将更新插入操作调换顺序,sql如下所示:
-- 555 -> 666 跨天插入语句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 666, '男', 0, '2023-06-05', '9999-12-31');-- 555 -> 666 跨天更新语句:
UPDATE example_user_zip SET end_date = '2023-06-04' WHERE `id`=4 AND `update_time`='2023-06-02 10:00:00';-- 666 -> 777 同一天插入语句:
INSERT INTO example_user_zip (id, update_time, create_time, name, phone, gender, expire, start_date, end_date)
VALUES 
(4, '2023-06-05 15:00:00', '2023-06-02 10:00:00', 'tony', 777, '男', 0, '2023-06-05', '9999-12-31');-- 666 -> 777 同一天更新语句:
UPDATE example_user_zip SET expire = 1, end_date = '2023-06-05' WHERE `id`=4 AND `update_time`='2023-06-05 15:00:00';
  1. 此时doris拉链表内容如下所示:
idnamephonegendercreate_timeupdate_timeexpirestart_dateend_date备注
1jack1112023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-03
2jason2222023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-02
2jason3332023-06-01 13:00:002023-06-03 10:00:0012023-06-032023-06-03
2jason4442023-06-01 13:00:002023-06-03 12:00:0012023-06-032023-06-03
2jason5552023-06-01 13:00:002023-06-03 14:00:0002023-06-039999-12-31
3tom3332023-06-01 13:00:002023-06-01 13:00:0002023-06-012023-06-01
3tom4442023-06-01 13:00:002023-06-02 09:00:0002023-06-029999-12-31
4tony5552023-06-02 10:00:002023-06-02 10:00:0002023-06-022023-06-04(由9999-12-31改为2023-06-04)
4tony7772023-06-02 10:00:002023-06-05 15:00:0012023-06-052023-06-05(更新拉链数据)

可以看到已经没有tony的最新数据了。

  1. 此时查看2023-06-05数据执行:只有两条数据
SELECT * FROM example_user_zip WHERE start_date <= '2023-06-05' AND end_date >= '2023-06-05' AND expire = 0;
idnamephonegendercreate_timeupdate_timestart_dateend_date
2jason2222023-06-01 13:00:002023-06-01 13:00:002023-06-012023-06-02
3tom4442023-06-01 13:00:002023-06-02 09:00:002023-06-029999-12-31

根据以上的测试结果,我们可以得出以下结论:当涉及到更新操作时,最好的做法是先执行更新,然后再执行插入操作。这种顺序可以有效避免并发更新问题。

此外,在实时引擎中处理数据通常涉及到分布式计算,因此需要特别注意确保相同ID的数据只在一个线程中按顺序执行,而不是让执行器01执行Tony的更新操作,而执行器02执行Tony的插入操作。相反,应该将具有相同ID的数据放置在同一个执行器中执行,以确保顺序性和一致性。这对于处理并发更新场景非常重要。

四、总结

本文我们深入探讨了如何使用Apache Flink实现实时数据仓库中拉链表的同步。拉链表是一种重要的数据模型,用于跟踪数据的历史变化,以便在分析和报告中提供准确的历史视图。我们介绍了如何借助Flink以及其他相关技术构建一个强大的实时同步引擎,以应对多种数据同步场景。

我们首先介绍了传统Lambda架构到实时同步+拉链表单引擎架构它们之间的区别。随后,我们深入讨论了Flink CDC(Change Data Capture)和Doris数据库的结合使用,以实现实时数据同步的基础架构。我们详细讨论了全量同步和增量同步两种关键同步模式,以及如何应对不同的更新场景。

在全文中,强调了以下关键点:

  • 实时同步+拉链表单引擎架构的设计和实施。
  • 全量同步和增量同步是实时数据仓库同步的两种关键模式,详细介绍了它们的实现逻辑。
  • 跨天更新和当天多次更新是需要特别注意的场景,提供了解决方案以确保数据的完整性。
  • 并发更新可能导致数据重复,需要采取适当的措施来应对。

通过深入了解实时同步和拉链表的实现细节,读者可以更好地理解如何构建强大的实时数据仓库,并满足不断变化的业务需求。

五、相关资料

  • Doris 数据模型
  • MySQL CDC Connector
  • 深入数仓离线数据同步:问题分析与优化措施

http://www.ppmy.cn/news/1326433.html

相关文章

代码随想录算法训练营第四十三天| 1049.最后一块石头的重量 II、494.目标和、474.一和零

代码随想录算法训练营第四十三天| 1049.最后一块石头的重量 II、494.目标和、474.一和零 题目 1049.最后一块石头的重量 II 有一堆石头&#xff0c;用整数数组 stones 表示。其中 stones[i] 表示第 i 块石头的重量。 每一回合&#xff0c;从中选出任意两块石头&#xff0c;…

你真的知道如何查看 Elasticsearch 的 Debug 日志吗?!

当我们遇到问题或者需要深入了解 Elasticsearch 的运行机制时&#xff0c;调整日志等级&#xff08; logging level &#xff09;到更详细的级别&#xff0c;比如 DEBUG、TRACE &#xff0c;会是一个有效且必须要掌握的方法。 Elasticsearch 提供了如下的接口来支持动态变更 l…

LeetCode、2336. 无限集中的最小数字(中等,小顶堆)

文章目录 前言LeetCode、2336. 无限集中的最小数字题目链接及类型思路代码题解 前言 博主所有博客文件目录索引&#xff1a;博客目录索引(持续更新) LeetCode、2336. 无限集中的最小数字 题目链接及类型 题目链接&#xff1a;2336. 无限集中的最小数字 类型&#xff1a;数据…

.NET 8 中引入新的 IHostedLifecycleService 接口 实现定时任务

在这篇文章中&#xff0c;我们将了解 .NET 8 中为托管服务引入的一些新生命周期事件。请注意&#xff0c;这篇文章与 .NET 8 相关&#xff0c;在撰写本文时&#xff0c;.NET 8 目前处于预览状态。在 11 月 .NET 8 最终版本发布之前&#xff0c;类型和实现可能会发生变化。要继续…

docker安装 mysql 8.0.32

首先下载 mysql 其次如果虚拟机以前安过mysql 需要把mysql关闭 命令 永久关闭mysql 但是当前不生效 需要重启虚拟机 systemctl enable mysqld 如果不想重启虚拟机 可以执行 systemctl stop mysqld //指定版本 docker pull mysql:8.0.32 // 拉取最新的…

自学Python,需要注意哪些?

为什么要学习Python&#xff1f; 在学习Python之前&#xff0c;你不要担心自己没基础或“脑子笨”&#xff0c;我始终认为&#xff0c;只要你想学并为之努力&#xff0c;就能学好&#xff0c;就能用Python去做很多事情。在这个喧嚣的时代&#xff0c;很多技术或概念会不断兴起…

Mysql:重点且常用的操作和理论知识整理 ^_^

目录 1 基础的命令操作 2 DDL 数据库定义语言 2.1 数据库操作 2.2 数据表操作 2.2.1 创建数据表 2.2.2 修改和删除数据表 2.2.3 添加外键 3 DML 数据库操作语言 3.1 插入语句(INSERT) 3.2 修改语句(UPDATE) 3.3 删除语句 3.3.1 DELETE命令 3.3.2 TRUNCATE命令 4 …

【Java JVM】栈帧

执行引擎是 Java 虚拟机核心的组成部分之一。 在《Java虚拟机规范》中制定了 Java 虚拟机字节码执行引擎的概念模型, 这个概念模型成为各大发行商的 Java 虚拟机执行引擎的统一外观 (Facade)。 不同的虚拟机的实现中, 通常会有 解释执行 (通过解释器执行)编译执行 (通过即时编…