PySpark实现Snowflake数据导出到Amazon Redshift

server/2025/2/27 13:40:39/

编写AWS EMR上的高性能PySpark代码,实现用SQL从Snowflake上下载数据到S3里的parquet文件,并导入Redshift表。

步骤一:配置EMR集群

首先确保您已经在AWS EMR上正确地设置了包含适当权限的角色和安全组的集群。该角色应允许访问Snowflake数据库、S3桶以及Redshift集群。

步骤二:连接至Snowflake并提取数据

接下来,我们将使用snowflake-jdbc-driver.jar驱动程序来连接Snowflake数据库并将查询结果加载到Dataframe中。请根据实际情况替换占位符值(例如用户名、密码、账户名称等)。

python">from pyspark.sql import SparkSession# 创建Spark会话对象
spark = SparkSession.builder.appName("LoadFromSnowflakeToS3").getOrCreate()options_snowflake = {"sfURL": "<SNOWFLAKE_ACCOUNT>.snowflakecomputing.com","sfUser": "<USERNAME>","sfPassword": "<PASSWORD>","sfDatabase": "<DATABASE_NAME>","sfSchema": "<SCHEMA_NAME>",  
}query = "(SELECT * FROM <TABLE>)"data_df = spark.read.format("net.snowflake.spark.SNOWFLAKE_SOURCE_JDBC")\.option(**options_snowflake)\.option("dbtable", query).load()

注意事项: 您可能还需要添加其他选项,比如SSL证书路径或代理设置,具体取决于您的网络环境和安全性需求。

步骤三:保存为Parquet格式于S3

一旦我们有了所需的DataFrame (data_df) ,就可以轻松地将其序列化成高效的Parquet格式并存储在指定位置的S3 bucket里。

python">output_path = f"s3a://{bucket_name}/{folder}/"
data_df.write.mode('overwrite').format('parquet').save(output_path)

这里的 mode='overwrite' 表示每次运行都会覆盖已存在的同名目录;如果您希望保留旧版记录而不被新版本替代的话,请改为 'append' 。另外别忘了提供正确的IAM策略给EC2节点以便它们能够向目标Bucket写入数据。

步骤四:将Parquet文件载入Redshift

现在我们需要借助COPY命令将上述生成的Parquet文件批量复制进Redshift的一个表格里面。为此目的,通常我们会采用UNLOAD方法配合Manifest文件的方式来进行更复杂的场景管理,但在这里为了简化流程,我仅展示基本语法供参考:

方法A: 使用JDBC Driver 直接插入 Redshift (适用于小规模数据集)

这种方法并不推荐用于大规模作业,因为它依赖单个Executor线程逐条推送数据进去,效率低下且容易导致超时错误。

INSERT INTO redshift_table SELECT * FROM snowflake_view;

但是由于我们在之前已经完成了ETL过程并且得到了优化后的Parquet输出,所以建议采取第二种方案:

方法B: 利用 COPY 命令高效传输大容量 Parquet 文件

这是最佳实践方式,因为它是多线程化的,并能充分利用Redshift内部压缩机制提高吞吐率。

准备阶段:
  • 创建外部分区表: 定义一个临时外表指向刚才存放在 S3 上的所有 .parqeut 文件片段。
  • 授权 IAM 角色: 确保关联到 Redshift 的 EC2 实例拥有足够的权限去读取相应 Bucket 内容。
示例脚本:
CREATE EXTERNAL SCHEMA IF NOT EXISTS ext_schema_from_parquets
FROM DATA CATALOG DATABASE '<glue_database>'
REGION AS '<region>';GRANT USAGE ON SCHEMA ext_schema_from_parquets TO GROUP your_group;-- 如果没有Glue Catalog可用,则手动定义EXTERNAL TABLE如下所示:
/*
CREATE EXTERNAL TABLE schema.ext_table (column1 datatype,...
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://path/to/output/';
*/-- 执行实际COPY操作
COPY target_redshift_table
FROM '{manifest_file}'
CREDENTIALS 'aws_access_key_id=<access-key-id>;aws_secret_access_key=<secret-access-key>' -- or better practice with iam role arn instead!
MANIFEST GZIP REGION '<region>';

请注意替换 {manifest_file} 为您所拥有的清单文档的实际地址,这个JSON文件应该列出所有的.parquet分片及其大小信息。此外,出于安全原因强烈建议不要硬编码凭证字符串而是改用预设的服务账号身份验证途径。


http://www.ppmy.cn/server/171047.html

相关文章

Python 基本语法的详细解释

目录 &#xff08;1&#xff09;注释 &#xff08;2&#xff09;缩进 &#xff08;3&#xff09;变量和数据类型 变量定义 数据类型 &#xff08;4&#xff09;输入和输出 输出&#xff1a;print() 函数 输入&#xff1a;input() 函数 &#xff08;1&#xff09;注释 注…

如何最小化Spark中的Shuffle开销

在Spark中&#xff0c;Shuffle是指数据在不同节点之间重新分配的过程&#xff0c;这个过程通常会涉及大量的数据传输和磁盘读写&#xff0c;消耗大量的计算和网络资源&#xff0c;因此是Spark作业中的一个性能瓶颈。为了最小化Spark中的Shuffle开销&#xff0c;可以采取以下策略…

智慧物流小程序(论文源码调试讲解)

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的&#xff0c;在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值&#xff0c;吸引更多的访问者访问系统&#xff0c;以及让来访用户可以花费更多时间停留在系统上&#xff0c;则表明该系统设计得比较专…

Mellanox的LAG全称是什么?网卡的创建机制如何?(Link Aggregation Group 链路聚合组)

背景 对于双端口的网卡&#xff0c;有时候有将链路聚合的需求。在Mellanox网卡上通过LAG提供。对于RoCE的报文在Mellanox上也可以通过LAG来完成报文收发&#xff0c;叫做RoCE over LAG。但是仅仅适用于双端口卡。 关键点 LAG&#xff1a; Link Aggregation Group (LAG) 链路…

Spring Boot 与@Bean注解搭配场景

在Spring Boot中&#xff0c;Bean注解通常与其他注解一起使用&#xff0c;以实现更灵活的Bean管理、依赖注入和配置。以下是一些常见的搭配使用场景&#xff1a; 1. Bean与Configuration Bean注解通常用于配置类&#xff08;带有Configuration注解的类&#xff09;中&#xf…

APP自动化实战

APP自动化能做什么&#xff1f; 请看示例&#xff08;实现批量的视频&#xff0c;封面功能复用能力&#xff08;实现效果参考抖音号&#xff1a;71403700901&#xff09; APP自动化实战&#xff0d;操作剪映APP PO模式 1. PO模式介绍 PO&#xff08;Page Object&#xff09;…

Flutter 介绍及安装使用

Flutter 安装 1. 镜像的配置 Flutter 源站在国内可能不太稳定&#xff0c;因此谷歌中国开发者社区(GDG)专门搭建了临时镜像&#xff0c;使得我们的 Flutter 命令行工具可以到该镜像站点下载所需资源。 使用方法 Flutter SDK 默认从 Github 获取更新&#xff0c;如您访问 Github…

esp8266 rtos sdk开发环境搭建

1. 安装必要的工具 1.1 安装 Git Git 用于从远程仓库克隆代码&#xff0c;你可以从Git 官方网站下载 Windows 版本的安装程序。安装过程中可保持默认设置&#xff0c;安装完成后&#xff0c;在命令提示符&#xff08;CMD&#xff09;或 PowerShell 中输入git --version&#…