一、目的
数据清洗是数据治理的关键,是提高数据质量的核心!数据清洗后,还有错误数据、清洗记录、数据重复性、数据准确性、错误数据修复、缺少数据补全等等
二、清洗步骤(以转向比数据为案例)
2.1 ODS层原始数据
create external table if not exists hurys_db.ods_turnratio(device_no string comment '设备编号',source_device_type string comment '设备类型',sn string comment '设备序列号 ',model string comment '设备型号',create_time string comment '创建时间',cycle int comment '转向比数据周期' ,volume_sum int comment '指定时间段内通过路口的车辆总数',speed_avg float comment '指定时间段内通过路口的所有车辆速度的平均值',volume_left int comment '指定时间段内通过路口的左转车辆总数',speed_left float comment '指定时间段内通过路口的左转车辆速度的平均值',volume_straight int comment '指定时间段内通过路口的直行车辆总数',speed_straight float comment '指定时间段内通过路口的直行车辆速度的平均值',volume_right int comment '指定时间段内通过路口的右转车辆总数',speed_right float comment '指定时间段内通过路口的右转车辆速度的平均值',volume_turn int comment '指定时间段内通过路口的掉头车辆总数',speed_turn float comment '指定时间段内通过路口的掉头车辆速度的平均值' ) comment '转向比数据外部表——静态分区' partitioned by (day string) row format delimited fields terminated by ',' stored as SequenceFile ;
2.2 DWD层原始数据清洗
主要是数据格式、数值范围以及逻辑方面的清洗
2.2.1 建表语句
create table if not exists hurys_db.dwd_turnratio(id string comment '唯一ID',device_no string comment '设备编号',source_device_type string comment '设备类型',sn string comment '设备序列号 ',model string comment '设备型号',create_time string comment '创建时间',cycle int comment '转向比数据周期' ,volume_sum int comment '指定时间段内通过路口的车辆总数',speed_avg decimal(10,2) comment '指定时间段内通过路口的所有车辆速度的平均值',volume_left int comment '指定时间段内通过路口的左转车辆总数',speed_left decimal(10,2) comment '指定时间段内通过路口的左转车辆速度的平均值',volume_straight int comment '指定时间段内通过路口的直行车辆总数',speed_straight decimal(10,2) comment '指定时间段内通过路口的直行车辆速度的平均值',volume_right int comment '指定时间段内通过路口的右转车辆总数',speed_right decimal(10,2) comment '指定时间段内通过路口的右转车辆速度的平均值',volume_turn int comment '指定时间段内通过路口的掉头车辆总数',speed_turn decimal(10,2) comment '指定时间段内通过路口的掉头车辆速度的平均值' ) comment '转向比数据表——动态分区' partitioned by (day string) --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。 stored as orc --表存储数据格式为orc ;
2.2.2 清洗规则
2.2.3 清洗SQL
with t1 as ( selectdevice_no,source_device_type,sn,model,create_time,cycle,case when volume_sum is null then 0 else volume_sum end as volume_sum,case when speed_avg is null then 0 else cast(speed_avg as decimal(10,2)) end as speed_avg,case when volume_left is null then 0 else volume_left end as volume_left ,case when speed_left is null then 0 else cast(speed_left as decimal(10,2)) end as speed_left,case when volume_straight is null then 0 else volume_straight end as volume_straight,case when speed_straight is null then 0 else cast(speed_straight as decimal(10,2)) end as speed_straight,case when volume_right is null then 0 else volume_right end as volume_right,case when speed_right is null then 0 else cast(speed_right as decimal(10,2)) end as speed_right ,case when volume_turn is null then 0 else volume_turn end as volume_turn ,case when speed_turn is null then 0 else cast(speed_turn as decimal(10,2)) end as speed_turn,substr(create_time,1,10) day from hurys_db.ods_turnratio where day ='2024-09-10' ) insert overwrite table hurys_db.dwd_turnratio partition (day) selectUUID() as id,device_no,source_device_type,sn,model,create_time,cycle,volume_sum,speed_avg,volume_left,speed_left,volume_straight,speed_straight,volume_right,speed_right,volume_turn,speed_turn,day from t1 where day ='2024-09-10' and device_no is not null and create_time is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left between 0 and 1000 and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150 and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100 group by device_no, source_device_type, sn, model, create_time, cycle, volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn, day ;
2.3 转向比错误数据表
2.3.1 建表语句
create table if not exists hurys_db.dwd_turnratio_error(id string comment '唯一ID',device_no string comment '设备编号',source_device_type string comment '设备类型',sn string comment '设备序列号 ',model string comment '设备型号',create_time string comment '创建时间',cycle int comment '转向比数据周期' ,volume_sum int comment '指定时间段内通过路口的车辆总数',speed_avg float comment '指定时间段内通过路口的所有车辆速度的平均值',volume_left int comment '指定时间段内通过路口的左转车辆总数',speed_left float comment '指定时间段内通过路口的左转车辆速度的平均值',volume_straight int comment '指定时间段内通过路口的直行车辆总数',speed_straight float comment '指定时间段内通过路口的直行车辆速度的平均值',volume_right int comment '指定时间段内通过路口的右转车辆总数',speed_right float comment '指定时间段内通过路口的右转车辆速度的平均值',volume_turn int comment '指定时间段内通过路口的掉头车辆总数',speed_turn float comment '指定时间段内通过路口的掉头车辆速度的平均值' ) comment '转向比错误数据表——动态分区' partitioned by (day string) --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。 stored as orc --表存储数据格式为orc ;
2.3.2 SQL语句
insert overwrite table hurys_db.dwd_turnratio_error partition(day) select UUID() as id, t2.device_no, t2.source_device_type,t2.sn, t2.model, t2.create_time, t2.cycle, t2.volume_sum, t2.speed_avg, t2.volume_left, t2.speed_left, t2.volume_straight, t2.speed_straight, t2.volume_right, t2.speed_right, t2.volume_turn, t2.speed_turn, t2.day from hurys_db.ods_turnratio as t2 left join hurys_db.dwd_turnratio as t3 on t3.device_no=t2.device_no and t3.create_time=t2.create_time where t3.device_no is null and t3.create_time is null and t2.day='2024-09-10' ;
2.4 转向比数据清洗记录表
2.4.1 建表语句
create table if not exists hurys_db.dwd_data_clean_record_turnratio(id string comment '唯一ID',data_type int comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no string comment '设备编号',create_time string comment '创建时间',field_name string comment '字段名',field_value string comment '字段值' ) comment '转向比数据清洗记录表' partitioned by (day string) stored as orc ;
2.4.2 SQL语句
with t3 as( selectid,device_no,case when device_no is null then CONCAT('device_no:', 'null') END AS device_no_value,create_time,case when volume_sum < 0 or volume_sum >1000 then CONCAT('volume_sum:', CAST(volume_sum AS STRING)) END AS volume_sum_value,case when speed_avg < 0 or speed_avg >150 then CONCAT('speed_avg:', CAST(speed_avg AS STRING)) END AS speed_avg_value,case when volume_left < 0 or volume_left >1000 then CONCAT('volume_left:', CAST(volume_left AS STRING)) END AS volume_left_value,case when speed_left < 0 or speed_left >100 then CONCAT('speed_left:', CAST(speed_left AS STRING)) END AS speed_left_value,case when volume_straight < 0 or volume_straight >1000 then CONCAT('volume_straight:', CAST(volume_straight AS STRING)) END AS volume_straight_value,case when speed_straight < 0 or speed_straight >150 then CONCAT('speed_straight:', CAST(speed_straight AS STRING)) END AS speed_straight_value,case when volume_right < 0 or volume_right >1000 then CONCAT('volume_right:', CAST(volume_right AS STRING)) END AS volume_right_value,case when speed_right < 0 or speed_right >100 then CONCAT('speed_right:', CAST(speed_right AS STRING)) END AS speed_right_value,case when volume_turn < 0 or volume_turn >100 then CONCAT('volume_turn:', CAST(volume_turn AS STRING)) END AS volume_turn_value,case when speed_turn < 0 or speed_turn >100 then CONCAT('speed_turn:', CAST(speed_turn AS STRING)) END AS speed_turn_value,concat_ws(',',case when device_no is null then CONCAT('device_no:','null') END ,case when volume_sum < 0 or volume_sum >1000 then CONCAT('volume_sum:', CAST(volume_sum AS STRING)) END,case when speed_avg < 0 or speed_avg >150 then CONCAT('speed_avg:', CAST(speed_avg AS STRING)) END,case when volume_left < 0 or volume_left >1000 then CONCAT('volume_left:', CAST(volume_left AS STRING)) END,case when speed_left < 0 or speed_left >100 then CONCAT('speed_left:', CAST(speed_left AS STRING)) END,case when volume_straight < 0 or volume_straight >1000 then CONCAT('volume_straight:', CAST(volume_straight AS STRING)) END,case when speed_straight < 0 or speed_straight >150 then CONCAT('speed_straight:', CAST(speed_straight AS STRING)) END,case when volume_right < 0 or volume_right >1000 then CONCAT('volume_right:', CAST(volume_right AS STRING)) END,case when speed_right < 0 or speed_right >100 then CONCAT('speed_right:', CAST(speed_right AS STRING)) END,case when volume_turn < 0 or volume_turn >100 then CONCAT('volume_turn:', CAST(volume_turn AS STRING)) END,case when speed_turn < 0 or speed_turn >100 then CONCAT('speed_turn:', CAST(speed_turn AS STRING)) END) AS kv_pairs ,day from hurys_db.dwd_turnratio_errorwhere day='2024-09-10' ) insert overwrite table hurys_db.dwd_data_clean_record_turnratio partition(day) selectid,'1' data_type,t3.device_no,create_time,split(pair, ':')[0] AS field_name,split(pair, ':')[1] AS field_value,day from t3 lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair where device_no_value is not null or volume_sum_value is not null or volume_left_value is not null or volume_right_value is not null or volume_straight_value is not null or volume_turn_value is not null or speed_avg_value is not null or speed_left_value is not null or speed_right_value is not null or speed_straight_value is not null or speed_turn_value is not null ;
2.5 数据重复性统计表
2.5.1 建表语句
create table if not exists hurys_db.dwd_data_duplicate(data_type int comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no string comment '设备编号',data_duplicate float comment '数据重复率' ) comment '数据重复性统计表' partitioned by (day string) stored as orc ;
2.5.2 SQL语句
insert overwrite table hurys_db.dwd_data_duplicate partition(day) select'1' data_type,device_no,round(sum(num)/count_num,2) data_duplicate,day from (selectdevice_no,create_time,count(1) num,count_num,day from (selectdevice_no,create_time ,count(device_no) over(partition by device_no,day) count_num ,day from hurys_db.ods_turnratiowhere day='2024-09-04' ) as t2 group by device_no, create_time, count_num, day having count(1) > 1 ) as t3 group by device_no, count_num, day;
2.6 数据准确性统计表
2.6.1 建表语句
create table if not exists hurys_db.dwd_data_accuracy(data_type int comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no string comment '设备编号',field_name string comment '字段名',data_unreasonable_rate float comment '数据不合理率',data_null_rate float comment '数据空值率' ) comment '数据准确性统计表' partitioned by (day string) stored as orc ;
2.6.2 SQL语句
insert overwrite table hurys_db.dwd_data_accuracy partition(day) selectt1.data_type,t1.device_no,t1.field_name,round((sum(case when t1.field_value is not null then 1 else 0 end)/t2.count_device_all),2) data_unreasonable_rate,round((sum(case when t1.field_value is null then 1 else 0 end)/t2.count_device_all),2) data_null_rate ,t1.day from hurys_db.dwd_data_clean_record_turnratio as t1 left join (selectdevice_no,day,count(device_no) count_device_allfrom hurys_db.ods_turnratiowhere day='2024-09-04'group by device_no, day) as t2 on t2.device_no=t1.device_no and t2.day=t1.day where t2.count_device_all is not null group by t1.data_type, t1.device_no, t1.field_name, t2.count_device_all, t1.day;
2.7 转向比字段数据修复
2.7.1 修复策略
使用前三周同期数据取平均进行修复
2.7.2 SQL语句
insert into table hurys_db.dwd_turnratio partition(day) selecta3.id,a3.device_no,a3.source_device_type,a3.sn,a3.model,a3.create_time,a3.cycle,case when a3.volume_sum between 0 and 1000 then a3.volume_sum else a2.avg_volume_sum end as volume_sum ,case when a3.speed_avg between 0 and 150 then a3.speed_avg else a2.avg_speed_avg end as speed_avg,case when a3.volume_left between 0 and 1000 then a3.volume_left else a2.avg_volume_left end as volume_left,case when a3.speed_left between 0 and 100 then a3.speed_left else a2.avg_speed_left end as speed_left,case when a3.volume_straight between 0 and 1000 then a3.volume_straight else a2.avg_volume_straight end as volume_straight ,case when a3.speed_straight between 0 and 150 then a3.speed_straight else a2.avg_speed_straight end as speed_straight,case when a3.volume_right between 0 and 1000 then a3.volume_right else a2.avg_volume_right end as volume_right ,case when a3.speed_right between 0 and 100 then a3.speed_right else a2.avg_speed_right end as speed_right,case when a3.volume_turn between 0 and 100 then a3.volume_turn else a2.avg_volume_turn end as volume_turn,case when a3.speed_turn between 0 and 100 then a3.speed_turn else a2.avg_speed_turn end as speed_turn,day from hurys_db.dwd_turnratio_error as a3 right join (selecta1.device_no,a1.create_time,round(avg(volume_sum),0) avg_volume_sum,round(avg(speed_avg),2) avg_speed_avg,round(avg(volume_left),0) avg_volume_left,round(avg(speed_left),2) avg_speed_left,round(avg(volume_straight),0) avg_volume_straight,round(avg(speed_straight),2) avg_speed_straight,round(avg(volume_right),0) avg_volume_right,round(avg(speed_right),2) avg_speed_right,round(avg(volume_turn),0) avg_volume_turn,round(avg(speed_turn),2) avg_speed_turn from(select t1.device_no, t1.create_time start_time, t2.create_time, t1.volume_sum, t1.speed_avg, t1.volume_left, t1.speed_left, t1.volume_straight, t1.speed_straight, t1.volume_right, t1.speed_right, t1.volume_turn, t1.speed_turn from hurys_db.dwd_turnratio as t1 right join hurys_db.dwd_turnratio_error as t2 on t2.device_no=t1.device_no and concat(date_sub(t2.create_time,7),substr(t2.create_time,11,10)) = t1.create_time where t1.device_no is not null union all select t1.device_no, t1.create_time start_time, t2.create_time, t1.volume_sum, t1.speed_avg, t1.volume_left, t1.speed_left, t1.volume_straight, t1.speed_straight, t1.volume_right, t1.speed_right, t1.volume_turn, t1.speed_turn from hurys_db.dwd_turnratio as t1 right join hurys_db.dwd_turnratio_error as t2 on t2.device_no=t1.device_no and concat(date_sub(t2.create_time,14),substr(t2.create_time,11,10)) = t1.create_time where t1.device_no is not null union all select t1.device_no, t1.create_time start_time, t2.create_time, t1.volume_sum, t1.speed_avg, t1.volume_left, t1.speed_left, t1.volume_straight, t1.speed_straight, t1.volume_right, t1.speed_right, t1.volume_turn, t1.speed_turn from hurys_db.dwd_turnratio as t1 right join hurys_db.dwd_turnratio_error as t2 on t2.device_no=t1.device_no and concat(date_sub(t2.create_time,21),substr(t2.create_time,11,10)) = t1.create_time where t1.device_no is not null) as a1 group by a1.device_no, create_time) as a2 on a3.device_no=a2.device_no and a3.create_time=a2.create_time where a3.day='2024-09-04' ;
2.8 转向比整条数据补全
2.8.1 补全策略
使用前三周同期数据取平均进行补全
2.8.2 SQL语句
insert into table hurys_db.dwd_turnratio partition(day) selectUUID() as id,a3.device_no,a3.source_device_type,a3.sn,a3.model,a3.miss_time create_time,round(avg(cycle),0) cycle,round(avg(volume_sum),0) volume_sum,round(avg(speed_avg),2) speed_avg,round(avg(volume_left),0) volume_left,round(avg(speed_left),2) speed_left,round(avg(volume_straight),0) volume_straight,round(avg(speed_straight),2) speed_straight,round(avg(volume_right),0) volume_right,round(avg(speed_right),2) speed_right,round(avg(volume_turn),0) volume_turn,round(avg(speed_turn),2) speed_turn,a3.day from( with a2 as ( selecta1.device_no,a1.day,all_time miss_time, a1.source_device_type, a1.sn, a1.model from (selectt1.device_no,t1.day,t1.source_device_type, t1.sn, t1.model,concat(substr(t1.create_time, 1, 11), t2.frequency_time) all_timefrom hurys_db.dwd_turnratio as t1cross join hurys_db.dwd_frequency_time as t2where t1.day = '2024-09-04' and t2.frequency_rate='5分钟'group by t1.device_no, t1.day, t1.source_device_type, t1.sn, t1.model, concat(substr(t1.create_time, 1, 11), t2.frequency_time)) as a1 left join hurys_db.dwd_turnratio as t3 on a1.device_no=t3.device_no and a1.all_time=t3.create_time and t3.day='2024-09-04'where t3.create_time is null ) select a2.device_no, a2.source_device_type, a2.sn, a2.model, a2.miss_time,t4.create_time, t4.cycle, t4.volume_sum, t4.speed_avg, t4.volume_left, t4.speed_left,t4.volume_straight, t4.speed_straight, t4.volume_right, t4.speed_right, t4.volume_turn, t4.speed_turn,a2.day from a2 left join hurys_db.dwd_turnratio as t4 on a2.device_no=t4.device_no and concat(date_sub(a2.miss_time,7),substr(a2.miss_time,11,10)) = t4.create_time where t4.device_no is not null union all select a2.device_no, a2.source_device_type, a2.sn, a2.model, a2.miss_time,t4.create_time, t4.cycle, t4.volume_sum, t4.speed_avg, t4.volume_left, t4.speed_left,t4.volume_straight, t4.speed_straight, t4.volume_right, t4.speed_right, t4.volume_turn, t4.speed_turn,a2.day from a2 left join hurys_db.dwd_turnratio as t4 on a2.device_no=t4.device_no and concat(date_sub(a2.miss_time,14),substr(a2.miss_time,11,10)) = t4.create_time where t4.device_no is not null union all select a2.device_no, a2.source_device_type, a2.sn, a2.model, a2.miss_time,t4.create_time, t4.cycle, t4.volume_sum, t4.speed_avg, t4.volume_left, t4.speed_left,t4.volume_straight, t4.speed_straight, t4.volume_right, t4.speed_right, t4.volume_turn, t4.speed_turn,a2.day from a2 left join hurys_db.dwd_turnratio as t4 on a2.device_no=t4.device_no and concat(date_sub(a2.miss_time,21),substr(a2.miss_time,11,10)) = t4.create_time where t4.device_no is not null) as a3 group by a3.device_no, a3.source_device_type, a3.sn, a3.model, a3.miss_time, a3.day ;
2.9 数据补全以及数据修复记录表
2.9.1 建表语句
create table if not exists hurys_db.dwd_data_correction_record(data_type int comment '数据类型 1:转向比,2:统计,3:评价,4:区域,6:静态排队,7:动态排队',device_no string comment '设备编号',id string comment '唯一ID',create_time timestamp comment '创建时间',record_type int comment '记录类型 0:补全,1:修复' ) comment '数据补全以及数据修复记录表' partitioned by (day string) stored as orc ;
2.9.2 SQL语句
2.9.2.1 转向比数据修复记录
insert into table hurys_db.dwd_data_correction_record partition(day) select'1' data_type,t1.device_no,t1.id,t1.create_time,'1' record_type,t1.day from hurys_db.dwd_turnratio_error as t1 right join hurys_db.dwd_turnratio as t2 on t1.id=t2.id and t1.device_no=t2.device_no where t1.id is not null and t1.day='2024-09-04' ;
2.9.2.2 转向比补全记录
insert into table hurys_db.dwd_data_correction_record partition(day) select'1' data_type,t2.device_no,t2.id,t2.create_time,'0' record_type,t2.day from hurys_db.dwd_turnratio as t2 left join (selectdevice_no, create_time, dayfrom hurys_db.ods_turnratiowhere day='2024-09-04') as t3 on t3.device_no=t2.device_no and t3.create_time=t2.create_time where t3.device_no is null and t3.create_time is null and t2.day='2024-09-04' ;
搞定!目前数据清洗这一层就这么多,后面继续完善!