【数据仓库】Hive 拉链表实践

ops/2024/11/15 6:08:52/

背景

        拉链表是一种数据模型,主要是针对数据仓库设计中表存储数据的方式而定义的;顾名思义,所谓拉链表,就是记录历史。记录一个事务从开始一直到当前状态的所有变化的信息。

        拉链表可以避免按每一天存储所有记录造成的海量存储问题,同时也是处理缓慢变化数据(SCD2)的一种常见方式。

应用场景

        现假设有如下场景:一个企业拥有5000万会员信息,每天有20万会员资料变更,需要在数仓中记录会员表的历史变化以备分析使用,即每天都要保留一个快照供查询,反映历史数据的情况。

        在此场景中,需要反映5000万会员的历史变化,如果保留快照,存储两年就需要2X365X5000W条数据存储空间,数据量为365亿,如果存储更长时间,则无法估计需要的存储空间。而利用拉链算法存储,每日只向历史表中添加新增和变化的数据,每日不过20万条,存储4年也只需要3亿存储空间。

实现步骤

        在拉链表中,每一条数据都有一个生效日期(effective_date)和失效日期(expire_date)。假设在一个用户表中,在2019年11月8日新增了两个用户,如下表所示,则这两条记录的生效时间为当天,由于到2019年11月8日为止,这两条就还没有被修改过,所以失效时间为一个给定的比较大的值,比如:3000-12-31  

member_idphonenocreate_timeupdate_time
10001133000000012019-11-083000-12-31
10002135000000022019-11-083000-12-31

        第二天(2019-11-09),用户10001被删除了,用户10002的电话号码被修改成13600000002.为了保留历史状态,用户10001的失效时间被修改为2019-11-09,用户10002则变成了两条记录,如下表所示: 

member_idphonenocreate_timeupdate_time
10001133000000012019-11-082019-11-09
10002135000000022019-11-082019-11-09
10002136000000022019-11-093000-12-31

        第三天(2019-11-10),又新增了用户10003,则用户表数据如小表所示: 

member_idphonenocreate_timeupdate_time
10001133000000012019-11-082019-11-09
10002135000000022019-11-082019-11-09
10002136000000022019-11-093000-12-31
10003133000000062019-11-103000-12-31

        如果要查询最新的数据,那么只要查询失效时间为3000-12-31的数据即可,如果要查11月8号的历史数据,则筛选生效时间<= 2019-11-08并且失效时间>2019-11-08的数据即可。如果查询11月9号的数据,那么筛选条件则是生效时间<=2019-11-09并且失效时间>2019-11-09

表结构

  • MySQL源member表

CREATE TABLE member(member_id VARCHAR ( 64 ),phoneno VARCHAR ( 20 ),create_time datetime,update_time datetime );

  • ODS层增量表member_delta,每天一个分区

CREATE TABLE member_delta(member_id string,phoneno string,create_time string,update_time string)
PARTITIONED BY (DAY string);
  • 临时表

CREATE TABLE member_his_tmp(member_id string,phoneno string,effective_date date,expire_date date);
  • DW层历史拉链表

CREATE TABLE member_his(member_id string,phoneno string,effective_date date,expire_date date);

Demo数据准备

2019-11-08的数据为: 

member_idphonenocreate_timeupdate_time
10001135000000012019-11-08 14:47:552019-11-08 14:47:55
10002135000000022019-11-08 14:48:332019-11-08 14:48:33
10003135000000032019-11-08 14:48:532019-11-08 14:48:53
10004135000000042019-11-08 14:49:022019-11-08 14:49:02

2019-11-09的数据为:其中蓝色代表新增数据,红色代表修改的数据

member_idphonenocreate_timeupdate_time
10001135000000012019-11-08 14:47:552019-11-08 14:47:55
10002136000000022019-11-08 14:48:332019-11-09 14:48:33
10003135000000032019-11-08 14:48:532019-11-08 14:48:53
10004135000000042019-11-08 14:49:022019-11-08 14:49:02
10005135000000052019-11-09 08:54:032019-11-09 08:54:03
10006135000000062019-11-09 09:54:252019-11-09 09:54:25

2019-11-10的数据:其中蓝色代表新增数据,红色代表修改的数据  

member_idphonenocreate_timeupdate_time
10001135000000012019-11-08 14:47:552019-11-08 14:47:55
10002136000000022019-11-08 14:48:332019-11-09 14:48:33
10003135000000032019-11-08 14:48:532019-11-08 14:48:53
10004136000000042019-11-08 14:49:022019-11-10 14:49:02
10005135000000052019-11-09 08:54:032019-11-09 08:54:03
10006135000000062019-11-09 09:54:252019-11-09 09:54:25
10007135000000072019-11-10 17:41:492019-11-10 17:41:49

全量初始装载

        在启用拉链表时,先对其进行初始装载,比如以2019-11-08为开始时间,那么将MySQL源表全量抽取到ODS层member_delta表的2018-11-08的分区中,然后初始装载DW层的拉链表member_his

INSERT overwrite TABLE member_his
SELECTmember_id,phoneno,to_date ( create_time ) AS effective_date,'3000-12-31'
FROM
member_delta
WHERE
DAY = '2019-11-08'

        查询初始的历史拉链表数据

图片

增量抽取数据

        每天,从源系统member表中,将前一天的增量数据抽取到ODS层的增量数据表member_delta对应的分区中。这里的增量需要通过member表中的创建时间和修改时间来确定,或者使用sqoop job监控update时间来进行增联抽取。比如,本案例中2019-11-09和2019-11-10为两个分区,分别存储了2019-11-09和2019-11-10日的增量数据。2019-11-09分区的数据为:

图片

        2019-11-10分区的数据为:

图片

增量刷新历史拉链数据

  • 2019-11-09增量刷新历史拉链表将数据放进临时表

INSERT overwrite TABLE member_his_tmp
SELECT *
FROM(
-- 2019-11-09增量数据,代表最新的状态,该数据的生效时间是2019-11-09,过期时间为3000-12-31
-- 这些增量的数据需要被全部加载到历史拉链表中
SELECT member_id,phoneno,'2019-11-09' effective_date,'3000-12-31' expire_dateFROM member_deltaWHERE DAY='2019-11-09'UNION ALL 
-- 用当前为生效状态的拉链数据,去left join 增量数据,
-- 如果匹配得上,则表示该数据已发生了更新,
-- 此时,需要将发生更新的数据的过期时间更改为当前时间.
-- 如果匹配不上,则表明该数据没有发生更新,此时过期时间不变
SELECT a.member_id,a.phoneno,a.effective_date,if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_dateFROM(SELECT *FROM member_hisWHERE expire_date='3000-12-31') aLEFT JOIN(SELECT *FROM member_deltaWHERE DAY='2019-11-09') b ON a.member_id=b.member_id)his

        将数据覆盖到历史拉链表

INSERT overwrite TABLE member_his
SELECT *
FROM member_his_tmp

        查看历史拉链表

图片

  • 2019-11-10增量刷新历史拉链表

                将数据放进临时表

INSERT overwrite TABLE member_his_tmp
SELECT *
FROM
(
-- 2019-11-10增量数据,代表最新的状态,该数据的生效时间是2019-11-10,过期时间为3000-12-31
-- 这些增量的数据需要被全部加载到历史拉链表中
SELECT member_id,phoneno,'2019-11-10' effective_date,'3000-12-31' expire_dateFROM member_deltaWHERE DAY='2019-11-10'UNION ALL
-- 用当前为生效状态的拉链数据,去left join 增量数据,
-- 如果匹配得上,则表示该数据已发生了更新,
-- 此时,需要将发生更新的数据的过期时间更改为当前时间.
-- 如果匹配不上,则表明该数据没有发生更新,此时过期时间不变
SELECT a.member_id,a.phoneno,a.effective_date,if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_dateFROM(SELECT *FROM member_hisWHERE expire_date='3000-12-31') aLEFT JOIN(SELECT *FROM member_deltaWHERE DAY='2019-11-10') b ON a.member_id=b.member_id)his

查看历史拉链表

图片

将以上脚本封装成shell调度的脚本

#!/bin/bash

#如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
    do_date=$1
else
    do_date=`date -d "-1 day" +%F`
fi

sql="

INSERT overwrite TABLE member_his_tmp
SELECT *
FROM
  (
-- 2019-11-10增量数据,代表最新的状态,该数据的生效时间是2019-11-10,过期时间为3000-12-31
-- 这些增量的数据需要被全部加载到历史拉链表中
SELECT member_id,
       phoneno,
       '$do_date' effective_date,
       '3000-12-31' expire_date
   FROM member_delta
   WHERE DAY='$do_date'
   UNION ALL
-- 用当前为生效状态的拉链数据,去left join 增量数据,
-- 如果匹配得上,则表示该数据已发生了更新,
-- 此时,需要将发生更新的数据的过期时间更改为当前时间.
-- 如果匹配不上,则表明该数据没有发生更新,此时过期时间不变
SELECT a.member_id,
       a.phoneno,
       a.effective_date,
       if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date
   FROM
     (SELECT *
      FROM member_his
      WHERE expire_date='3000-12-31') a
   LEFT JOIN
     (SELECT *
      FROM member_delta
      WHERE DAY='$do_date') b ON a.member_id=b.member_id)his;
"

$hive -e "$sql"

如需获取更多资料,您可以下载知识星球app,并搜索加入‘数据要素X’。


http://www.ppmy.cn/ops/132301.html

相关文章

【刷题17】最小栈、栈的压入弹出、逆波兰表达式

目录 一、最小栈二、栈的压入弹出三、逆波兰表达式求值 一、最小栈 题目&#xff1a; 思路&#xff1a; 定义两个栈&#xff0c;一个表示普通的栈st&#xff0c;另一个表示最小栈minstpush&#xff1a;入栈st&#xff1b;如果minst为空或者要进入的数据小于等于minst的栈顶元…

linux 定时备份mysql数据库

1. 创建数据库备份专用账号 CREATE USER ‘mysqldump’‘localhost’ IDENTIFIED BY ‘oBAjb8UiGGas#Ky’; GRANT SELECT, LOCK TABLES, SHOW VIEW, PROCESS ON my_database.* TO ‘mysqldump’‘localhost’; FLUSH PRIVILEGES; 2. 编写backup_db.sh 脚本 #!/bin/bash# 设置…

python 爬虫 入门 六、Selenium

Selenium本来是一个自动测试工具&#xff0c;用于模拟用户对网站进行操作。在爬虫领域也有其用处。 一、下载安装Selenium及附属插件 pip install Selenium 安装完成后还需要安装一个浏览器驱动&#xff0c;来让python能启动浏览器。 如果是Edge或者其他基于Chromium的浏览器…

【提高篇】3.1 GPIO(二,结构与工作模式介绍)

目录 一,GPIO的基本结构 1.1 保护二极管 1.2 上拉、下拉电阻 1.3 施密特触发器 1.4 P-MOS 管和 N-MOS 管 P-MOS管和N-MOS管的区别 1.5 片上外设 1.6 IDR,ODR,BSRR寄存器 1.6.1 IDR(Input Data Register) 1.6.2 ODR(Output Data Register) 1.6.3 BSRR(Bit Se…

C#中的同步和异步回调

什么是C#中的异步和同步回调&#xff1f;何时以及在何种情况下可以使用同步和异步回调方法&#xff1f; 在C#中&#xff0c;同步回调和异步回调都是用于处理任务或事件完成的机制。回调允许我们指定在某个操作完成时要执行的函数或委托。它们通常用于处理可能需要大量时间才能…

Hive-testbench套件使用文档

Hive-testbench套件使用文档 hive-testbench 是hortonworks的一个开源项目,用于测试和基准测试 Apache Hive 的工具集。它提供了一系列的测试数据集和查询样例,用于评估和比较 Hive 在不同配置和环境下的性能。hive-testbench 的主要目标是模拟真实的大规模数据集和复杂查询…

Starrocks Compaction的分析

背景 本文基于 Starrocks 3.1.7 结论 Starrocks 会启动一个线程周期性的去进行Compaction&#xff0c;该周期间隔为 200 MS, 该Compaction以table的partition为切入点&#xff0c;tablet(也就是bucket)为粒度进行task的创建。 分析 CompactionMgr start 方法会启动一个Com…

革命性AI搜索引擎!ChatGPT最新功能发布,无广告更智能!

文章目录 零、前言一、ChatGPT最新AI搜索引擎功能操作指导实战1:搜索新闻实战2:搜索天气实战3:搜索体育消息 二、感受 零、前言 大人&#xff0c;时代变了。 最强 AI 助力下的无广告搜索引擎终于问世。我们期待已久的这一刻终于到来了&#xff0c;从今天起&#xff0c;ChatGPT…