Airflow和PySPARK实现带多组参数和标签的Amazon Redshift数据仓库批量数据导出程序

server/2025/3/6 5:39:04/

设计一个基于多个带标签SQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数,用Airflow进行调度,自动批量地将Amazon Redshift数据仓库的数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。PySpark程序需要异常处理,输出带时间戳和每个运行批次和每个导出文件作业运行状态的日志文件,每天单独一个带日期的和.log扩展名日志文件,放在logs子目录中,参数全部设置在json配置文件中。

PySpark解决方案,包含代码结构、配置文件和日志处理:

注意事项:

  1. 确保Spark集群有访问Redshift和S3的权限

  2. 根据实际环境调整Redshift JDBC驱动版本

  3. 测试不同文件格式的导出需求

  4. 监控S3临时目录的存储使用情况

  5. 定期清理日志文件和临时数据

  6. 根据数据量调整Spark资源配置

  7. 项目结构

redshift_exporter/
├── config/
│   └── config.json
├── sql_templates/
│   └── sales_report.sql
├── logs/
├── jobs/
│   └── redshift_exporter.py
└── airflow/└── dags/└── redshift_export_dag.py
  1. config.json 示例
{"redshift_conn": {"url": "jdbc:redshift://cluster:5439/db","user": "user","password": "password","tempdir": "s3a://temp-bucket/redshift_temp"},"s3_output": "s3a://output-bucket/reports","tasks": [{"label": "sales_report","sql_template": "sql_templates/sales_report.sql","parameters": ["${date}", "region1"],"formats": ["parquet", "csv", "xlsx"]}]
}
  1. PySpark程序 (redshift_exporter.py)
python">import json
import logging
import os
import sys
from datetime import datetime
from pyspark.sql import SparkSessionclass RedshiftExporter:def __init__(self, config_path):self.spark = SparkSession.builder \.appName("RedshiftExporter") \.config("spark.jars.packages", "com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()with open(config_path) as f:self.config = json.load(f)self.setup_logging()def setup_logging(self):log_dir = "logs"if not os.path.exists(log_dir):os.makedirs(log_dir)log_file = f"{log_dir}/{datetime.now().strftime('%Y-%m-%d')}.log"logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler(log_file),logging.StreamHandler()])self.logger = logging.getLogger(__name__)def read_sql_template(self, template_path):try:with open(template_path) as f:return f.read()except Exception as e:self.logger.error(f"Error reading SQL template: {str(e)}")raisedef export_data(self, df, output_path, format_type):try:writer = df.write.mode("overwrite")if format_type == "parquet":writer.parquet(f"{output_path}.parquet")elif format_type == "csv":writer.option("header", "true").csv(f"{output_path}.csv")elif format_type == "xlsx":df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.mode("overwrite") \.save(f"{output_path}.xlsx")self.logger.info(f"Successfully exported {format_type} to {output_path}")except Exception as e:self.logger.error(f"Export failed for {format_type}: {str(e)}")raisedef process_task(self, task, params):try:sql = self.read_sql_template(task["sql_template"])formatted_sql = sql.replace("${date}", params[0])df = self.spark.read \.format("com.databricks.spark.redshift") \.option("url", self.config["redshift_conn"]["url"]) \.option("query", formatted_sql) \.option("user", self.config["redshift_conn"]["user"]) \.option("password", self.config["redshift_conn"]["password"]) \.option("tempdir", self.config["redshift_conn"]["tempdir"]) \.load()filename = f"{task['label']}_{'_'.join(params)}"output_base = f"{self.config['s3_output']}/{filename}"for fmt in task["formats"]:self.export_data(df, output_base, fmt)except Exception as e:self.logger.error(f"Task {task['label']} failed: {str(e)}")raisedef run(self, parameters):try:for task in self.config["tasks"]:self.logger.info(f"Processing task: {task['label']}")self.process_task(task, parameters)finally:self.spark.stop()if __name__ == "__main__":if len(sys.argv) < 2:print("Usage: redshift_exporter.py <config_path> <parameters>")sys.exit(1)config_path = sys.argv[1]parameters = sys.argv[2].split("_") if len(sys.argv) > 2 else []exporter = RedshiftExporter(config_path)exporter.run(parameters)
  1. Airflow DAG 示例 (redshift_export_dag.py)
python">from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperatordefault_args = {'owner': 'data_team','start_date': datetime(2023, 1, 1),'retries': 1
}with DAG('redshift_export',default_args=default_args,schedule_interval='@daily',catchup=False) as dag:export_task = BashOperator(task_id='run_redshift_export',bash_command='spark-submit --packages com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:3.3.1_0.18.5 /path/to/jobs/redshift_exporter.py /path/to/config/config.json "{{ ds_nodash }}_region1"')
  1. 实现说明
  • 配置文件管理:使用JSON配置文件管理Redshift连接参数、输出位置和任务配置
  • 参数化处理:支持动态参数替换SQL模板中的变量
  • 多格式导出:支持Parquet、CSV和Excel格式导出
  • 日志记录:每天生成独立的日志文件,包含详细的时间戳和操作状态
  • 异常处理:完善的错误捕获和日志记录机制
  • Spark优化:使用Redshift的Spark连接器进行高效数据传输
  • 文件覆盖:使用Spark的overwrite模式处理已存在文件
  • 依赖管理:通过–packages参数管理Spark依赖

运行方式:

spark-submit --packages com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:3.3.1_0.18.5 jobs/redshift_exporter.py config/config.json "20231001_region1"

http://www.ppmy.cn/server/172795.html

相关文章

密码学(哈希函数)

4.1 Hash函数与数据完整性 数据完整性&#xff1a; 检测传输消息&#xff08;加密或未加密&#xff09;的修改。 密码学Hash函数&#xff1a; 构建某些数据的简短“指纹”&#xff1b;如果数据被篡改&#xff0c;则该指纹&#xff08;以高概率&#xff09;不再有效。Hash函数…

确保初始化和销毁操作的线程安全-初始化和销毁

你想为代码中的每行加上注释解释,以下是详细的注释: // 定义初始化函数,接收一个 InitOptions 类型的参数 int initGBB(InitOptions _opts) {// 使用原子操作检查初始化/销毁计数器,并增加计数。如果当前是第一次初始化,执行以下操作if (initFiniCnt_.fetch_add(1, std

9道Dubbo面试题

Dubbo本身并不复杂&#xff0c;而且官方文档写的非常清楚详细&#xff0c;面试中dubbo的问题一般不会很多&#xff0c;从分层到工作原理、负载均衡策略、容错机制、SPI机制基本就差不多了&#xff0c;最大的一道大题一般就是怎么设计一个RPC框架了&#xff0c;但是如果你工作原…

【html期末作业网页设计】

html期末作业网页设计 作者有话说项目功能介绍 网站结构完整代码网站样图 作者有话说 目前&#xff0c;我们的项目已经搭建了各页面的基本框架&#xff0c;但内容填充还不完善&#xff0c;各页面之间的跳转逻辑也还需要进一步优化。 我们深知&#xff0c;一个好的项目需要不断…

[arXiv 2025]BP-GPT: Auditory Neural Decoding Using fMRI-prompted LLM

论文网址&#xff1a;BP-GPT: Auditory Neural Decoding Using fMRI-prompted LLM 论文代码&#xff1a;https://github.com/1994cxy/BP-GPT 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现…

基于Matlab的语音识别系统设计

摘要 在通信技术的不断进步发展下&#xff0c;语音识别技术也取得了令人瞩目的成就&#xff0c;人们对语音识别技术的性能要求也越来越高。语音识别技术是通常以人们说话的内容作为识别对象的一项技术&#xff0c;凭借其安全高效、价格低廉、易于实现等特点&#xff0c;能与其…

【Hudi-SQL DDL创建表语法】

CREATE TABLE 命令功能 CREATE TABLE命令通过指定带有表属性的字段列表来创建Hudi Table。 命令格式 CREATE TABLE [ IF NOT EXISTS] [database_name.]table_name[ (columnTypeList)]USING hudi[ COMMENT table_comment ][ LOCATION location_path ][ OPTIONS (options_lis…

软考中级-数据库-3.2 数据结构-数组和矩阵

数组 一维数组是长度固定的线性表&#xff0c;数组中的每个数据元素类型相同。n维数组是定长线性表在维数上的扩张&#xff0c;即线性表中的元素又是一个线性表。 例如一维数组a[5][a1,a2,a3,a4,a5] 二维数组a[2][3]是一个2行2列的数组 第一行[a11,a12,a13] 第二行[a21,a22,a23…