spark读取数据写入hive数据表

news/2024/10/30 13:38:12/

目录

一个模板 概述: 

create_tabel建表函数,定义日期分区

删除原有分区drop_partition函数

generate_data 数据处理函数,将相关数据写入定义的表中 

添加分区函数add_partition

一个模板 概述:

table_name = 'name'  # 要写入的目标表

date  = '2023-01-21'  # 取数据的日期

create_tabel(table_name) # 建表函数,表结构(要写入的数据表) ,建表时注意常用日期来分区

drop_partition(spark, table_name)  #删除原有函数, 如果原来有相关分区数据则进行删除 

generate_data(date, table_name)  # 读取数据函数并写入目标表 

add_partition(spark, table_name)  # 调整写入的分区 ,完成

create_tabel建表函数,定义日期分区

def create_tabel(table_name) : 

        create_table_sql = """ 

        CREATE TABLE IF NOT EXISTS  DB_NAME.{table_name} (

        column1  数据类型,

        dt_test ,string , .....

        count ,float 

        ) PARTITIONED BY ( year string,

                                            month string,

                                            day string

        )  STORED AS ORC 

        LOCATION  'DB_PATH/{table_name}'

        """.format(table_name = table_name) 

  spark.sql(create_table_sql)   # 执行建表语句。注意上面的分区形式。 

删除原有分区drop_partition函数

 def drop_partition(spark, table_name):

        alter_table_sql = '''

                ALTER TABLE ${DB_NAME}.{table_name} DROP IF EXISTS

                PARTITION (year = '{year}',month = '{month}', day = '{day}')

                '''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)

        spark.sql(alter_table_sql)

generate_data 数据处理函数,将相关数据写入定义的表中 

def generate_data(date, table_name): 

        get_data_part = spark.sql("""

        select 

                concat_ws('-', year, month, day) as dt_test ,

                count ,...更多数据列  (可以是经过sql处理解析后后得到的数据列,如json可以直接字典解析param['city'] as city_id 

        from  已有old表A 

        where 

                各种限制条件或者过滤条件 ,如时间限制 concat_ws('-', year, month, day) = '{date}'

        """.format(date= date )

).cache() # 缓存数据到内存,后期数据不用再反复执行,减少耗时

## 可以对Dataframe get_data_part再进行各种处理得到spark dataframe get_data_part_final  。

aim_columns = [ 'col1', 'col2' , 'col3'...]  # 这里的数据column 一定要和建表时的数据一致,写入前select 后直接写入,以这种写法这样便可以减少对写入数据时列名对应不上等报错问题。另外要注意最后处理的得到的各列数据类型也一致。 

## 写入数据

file_path = 'DB_PATH/{table_name}/{year}/{moth}/{day}'.format(year = date[:4],month=date[5:7],day=date[8:10],tabel_name= table_name) # 定义路径

#方式1 : 直接用spark  Dataframe 的write来写入。 

get_data_part_final.select(aim_columns).coalesce(5).write.orc(path=file_path,mode='overwrite')   

上面写入语句中: .coalesce(5)是将数据文件写为指定个数的,这样可以减少数据倾斜现象。 mode= 'overwrite'会覆盖之前的数据,如果将overwrite改为'append'会追加到表中。 

关于数据倾斜,可以见自己的另一个总结: xxx 

# 方式2  可以用spark sql 的方式来写

get_data_part_final.createOrReplaceTempView("test_temp")

spark.sql("""insert overwrite table db_name.{table_name}  partition(date={date}) select * from test_temp""".format(table_name= table_name,date=date )

此处参考:pyspark--写hive分区表覆盖指定分区数据

添加分区函数add_partition

def add_partition(spark, table_name):

        alter_table_sql = '''    

                ALTER TABLE DB_NAME.{table_name} ADD IF NOT EXISTS    

                PARTITION (year = '{year}',month = '{month}', day = '{day}')    

                LOCATION 'DB_PATH/{table_name}/{year}/{month}/{day}'    

                '''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)             

        spark.sql(alter_table_sql)

这个如果在原来generate_data写入数据函数中有进行分区,其实可以不用再调用。 

参考: 

Hive/Spark小文件解决方案(企业级实战) - 腾讯云开发者社区-腾讯云


http://www.ppmy.cn/news/21428.html

相关文章

STM32的最大中断频率

在查找资料后发现,在STM32F103(主频为72MHz)中最大中断频率为500kHz。从中断触发-压栈-中断向量表-进入中断处理函数是需要执行很多个指令的。 STM32F103的主频是72M的,按照T(arr1)*(PSC1)/Tck 其中TCK为时…

[HDCTF2019]Maze 题解

少欲则心静,心静则事简。 ——人民日报 1.查壳 是一个加了upx壳的32位EXE文件 2.使用Kali Linux脱壳 maze题目脱壳3.去除脏字节 没有找到主函数,发现这段汇编代码标红了,IDA分析崩溃,这是掺杂了花指令 这里jnz,不论判…

Nacos学习:二、配置中心

2. 配置中心 配置中心将配置从各应用中剥离出来,对配置进行统一管理,应用自身不需要自己去管理配置。 配置中心的服务流程如下: 1、用户在配置中心更新配置信息。 2、服务A和服务B及时得到配置更新通知,从配置中心获取配置。 …

揭榜|2022年度“博客之星新星”TOP 10已出炉

目录 「博客之星」TOP 10 「博客新星」TOP 10 评选说明 奖品说明 奖项及奖品 地址填写说明 奖品发放时间说明 2022 年度博客之星活动官网:https://www.csdn.net/blogstar2022 2022年度「博客之星」评选从2022年12月15日活动启动,历经海选报名、线…

RocketMQ源码-NameServer架构设计及启动流程

本文我们来分析NameServer相关代码,在正式分析源码前,我们先来回忆下NameServer的功能: NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能&…

Cesium 渐变长方体实现-Shader

position获取: 1.1 在cesium中,可通过vec4 p = czm_computePosition();获取 模型坐标中相对于眼睛的位置矩阵 1.2 vec4 eyePosition = czm_modelViewRelativeToEye * p; // position in eye coordinates 获取eyePosition 1.3 v_positionEC = czm_inverseModelView * eyePo…

用光盘怎样重装电脑系统

用光盘怎样重装电脑系统?重装系统,听起来好像很难的样子。其实没那么难,用光盘装还是比较容易的。下面一起看看如何用光盘重装系统吧。 工具/原料: 系统版本:win7 品牌型号:联想yoga13 方法/步骤&#xf…

【C++题解】NOIP2015提高组 - 跳石头

✍个人博客:https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 📚专栏地址:杂题讲解 📝原题地址:跳石头 📣专栏定位:在这里我将整理一些其他比赛或面试的题解~ ❤️如果有收获的话&…