PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

ops/2025/2/4 5:33:49/

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

python">import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3访问(根据实际环境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 验证参数数量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")# 替换SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名参数部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定义输出路径output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 写入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 写入CSV(自动生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 写入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)

配置文件示例(config.json)

{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx

http://www.ppmy.cn/ops/155486.html

相关文章

012-51单片机CLD1602显示万年历+闹钟+农历+整点报时

1. 硬件设计 硬件是我自己设计的一个通用的51单片机开发平台,可以根据需要自行焊接模块,这是用立创EDA画的一个双层PCB板,所以模块都是插针式,不是表贴的。电路原理图在文末的链接里,PCB图暂时不选择开源。 B站上传的…

java练习(5)

ps:题目来自力扣 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外,这…

单细胞-第四节 多样本数据分析,下游画图

文件在单细胞\5_GC_py\1_single_cell\2_plots.Rmd 1.细胞数量条形图 rm(list ls()) library(Seurat) load("seu.obj.Rdata")dat as.data.frame(table(Idents(seu.obj))) dat$label paste(dat$Var1,dat$Freq,sep ":") head(dat) library(ggplot2) lib…

科技快讯 | OpenAI首次向免费用户开放推理模型;特朗普与黄仁勋会面;雷军回应“10后小学生深情表白小米SU7”

不用开口:谷歌 AI 帮你致电商家,价格、预约一键搞定 谷歌在1月30日推出Search Labs中的“Ask for Me”实验性功能,用户可利用AI代替自己致电商家咨询价格和服务。该功能已与美汽车修理厂和美甲沙龙店合作,用户需加入Search Labs并…

wax到底是什么意思

在很久很久以前,人类还没有诞生文字之前,人类就产生了语言;在诞生文字之前,人类就已经使用了语言很久很久。 没有文字之前,人们的语言其实是相对比较简单的,因为人类的生产和生活水平非常低下,…

跟李沐学AI:视频生成类论文精读(Movie Gen、HunyuanVideo)

Movie Gen:A Cast of Media Foundation Models 简介 Movie Gen是Meta公司提出的一系列内容生成模型,包含了 3.2.1 预训练数据 Movie Gen采用大约 100M 的视频-文本对和 1B 的图片-文本对进行预训练。 图片-文本对的预训练流程与Meta提出的 Emu: Enh…

Git 常用命令汇总

# 推荐一个十分好用的git插件---->GitLens 其实很多命令操作完全界面化了&#xff0c;鼠标点点就可以实现但是命令是必要的&#xff0c;用多了你就知道了 Git 常用命令汇总 1. Git 基础操作 命令作用git init初始化本地仓库git clone <repo-url>克隆远程仓库到本地g…

使用 PyTorch 实现逻辑回归并评估模型性能

1. 逻辑回归简介 逻辑回归是一种用于解决二分类问题的算法。它通过一个逻辑函数&#xff08;Sigmoid 函数&#xff09;将线性回归的输出映射到 [0, 1] 区间内&#xff0c;从而将问题转化为概率预测问题。如果预测概率大于 0.5&#xff0c;则将样本分类为正类&#xff1b;否则分…