目录
一个模板 概述:
create_tabel建表函数,定义日期分区
删除原有分区drop_partition函数
generate_data 数据处理函数,将相关数据写入定义的表中
添加分区函数add_partition
一个模板 概述:
table_name = 'name' # 要写入的目标表
date = '2023-01-21' # 取数据的日期
create_tabel(table_name) # 建表函数,表结构(要写入的数据表) ,建表时注意常用日期来分区
drop_partition(spark, table_name) #删除原有函数, 如果原来有相关分区数据则进行删除
generate_data(date, table_name) # 读取数据函数并写入目标表
add_partition(spark, table_name) # 调整写入的分区 ,完成
create_tabel建表函数,定义日期分区
def create_tabel(table_name) :
create_table_sql = """
CREATE TABLE IF NOT EXISTS DB_NAME.{table_name} (
column1 数据类型,
dt_test ,string , .....
count ,float
) PARTITIONED BY ( year string,
month string,
day string
) STORED AS ORC
LOCATION 'DB_PATH/{table_name}'
""".format(table_name = table_name)
spark.sql(create_table_sql) # 执行建表语句。注意上面的分区形式。
删除原有分区drop_partition函数
def drop_partition(spark, table_name):
alter_table_sql = '''
ALTER TABLE ${DB_NAME}.{table_name} DROP IF EXISTS
PARTITION (year = '{year}',month = '{month}', day = '{day}')
'''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)
spark.sql(alter_table_sql)
generate_data 数据处理函数,将相关数据写入定义的表中
def generate_data(date, table_name):
get_data_part = spark.sql("""
select
concat_ws('-', year, month, day) as dt_test ,
count ,...更多数据列 (可以是经过sql处理解析后后得到的数据列,如json可以直接字典解析param['city'] as city_id
from 已有old表A
where
各种限制条件或者过滤条件 ,如时间限制 concat_ws('-', year, month, day) = '{date}'
""".format(date= date )
).cache() # 缓存数据到内存,后期数据不用再反复执行,减少耗时
## 可以对Dataframe get_data_part再进行各种处理得到spark dataframe get_data_part_final 。
aim_columns = [ 'col1', 'col2' , 'col3'...] # 这里的数据column 一定要和建表时的数据一致,写入前select 后直接写入,以这种写法这样便可以减少对写入数据时列名对应不上等报错问题。另外要注意最后处理的得到的各列数据类型也一致。
## 写入数据
file_path = 'DB_PATH/{table_name}/{year}/{moth}/{day}'.format(year = date[:4],month=date[5:7],day=date[8:10],tabel_name= table_name) # 定义路径
#方式1 : 直接用spark Dataframe 的write来写入。
get_data_part_final.select(aim_columns).coalesce(5).write.orc(path=file_path,mode='overwrite')
上面写入语句中: .coalesce(5)是将数据文件写为指定个数的,这样可以减少数据倾斜现象。 mode= 'overwrite'会覆盖之前的数据,如果将overwrite改为'append'会追加到表中。
关于数据倾斜,可以见自己的另一个总结: xxx
# 方式2 可以用spark sql 的方式来写
get_data_part_final.createOrReplaceTempView("test_temp")
spark.sql("""insert overwrite table db_name.{table_name} partition(date={date}) select * from test_temp""".format(table_name= table_name,date=date )
此处参考:pyspark--写hive分区表覆盖指定分区数据
添加分区函数add_partition
def add_partition(spark, table_name):
alter_table_sql = '''
ALTER TABLE DB_NAME.{table_name} ADD IF NOT EXISTS
PARTITION (year = '{year}',month = '{month}', day = '{day}')
LOCATION 'DB_PATH/{table_name}/{year}/{month}/{day}'
'''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)
spark.sql(alter_table_sql)
这个如果在原来generate_data写入数据函数中有进行分区,其实可以不用再调用。
参考:
Hive/Spark小文件解决方案(企业级实战) - 腾讯云开发者社区-腾讯云