编写AWS EMR上的高性能PySpark代码,实现用SQL从Snowflake上下载数据到S3里的parquet文件,并导入Redshift表。
步骤一:配置EMR集群
首先确保您已经在AWS EMR上正确地设置了包含适当权限的角色和安全组的集群。该角色应允许访问Snowflake数据库、S3桶以及Redshift集群。
步骤二:连接至Snowflake并提取数据
接下来,我们将使用snowflake-jdbc-driver.jar
驱动程序来连接Snowflake数据库并将查询结果加载到Dataframe中。请根据实际情况替换占位符值(例如用户名、密码、账户名称等)。
python">from pyspark.sql import SparkSession# 创建Spark会话对象
spark = SparkSession.builder.appName("LoadFromSnowflakeToS3").getOrCreate()options_snowflake = {"sfURL": "<SNOWFLAKE_ACCOUNT>.snowflakecomputing.com","sfUser": "<USERNAME>","sfPassword": "<PASSWORD>","sfDatabase": "<DATABASE_NAME>","sfSchema": "<SCHEMA_NAME>",
}query = "(SELECT * FROM <TABLE>)"data_df = spark.read.format("net.snowflake.spark.SNOWFLAKE_SOURCE_JDBC")\.option(**options_snowflake)\.option("dbtable", query).load()
注意事项: 您可能还需要添加其他选项,比如SSL证书路径或代理设置,具体取决于您的网络环境和安全性需求。
步骤三:保存为Parquet格式于S3
一旦我们有了所需的DataFrame (data_df
) ,就可以轻松地将其序列化成高效的Parquet格式并存储在指定位置的S3 bucket里。
python">output_path = f"s3a://{bucket_name}/{folder}/"
data_df.write.mode('overwrite').format('parquet').save(output_path)
这里的 mode='overwrite'
表示每次运行都会覆盖已存在的同名目录;如果您希望保留旧版记录而不被新版本替代的话,请改为 'append'
。另外别忘了提供正确的IAM策略给EC2节点以便它们能够向目标Bucket写入数据。
步骤四:将Parquet文件载入Redshift
现在我们需要借助COPY命令将上述生成的Parquet文件批量复制进Redshift的一个表格里面。为此目的,通常我们会采用UNLOAD方法配合Manifest文件的方式来进行更复杂的场景管理,但在这里为了简化流程,我仅展示基本语法供参考:
方法A: 使用JDBC Driver 直接插入 Redshift (适用于小规模数据集)
这种方法并不推荐用于大规模作业,因为它依赖单个Executor线程逐条推送数据进去,效率低下且容易导致超时错误。
INSERT INTO redshift_table SELECT * FROM snowflake_view;
但是由于我们在之前已经完成了ETL过程并且得到了优化后的Parquet输出,所以建议采取第二种方案:
方法B: 利用 COPY 命令高效传输大容量 Parquet 文件
这是最佳实践方式,因为它是多线程化的,并能充分利用Redshift内部压缩机制提高吞吐率。
准备阶段:
- 创建外部分区表: 定义一个临时外表指向刚才存放在 S3 上的所有
.parqeut
文件片段。 - 授权 IAM 角色: 确保关联到 Redshift 的 EC2 实例拥有足够的权限去读取相应 Bucket 内容。
示例脚本:
CREATE EXTERNAL SCHEMA IF NOT EXISTS ext_schema_from_parquets
FROM DATA CATALOG DATABASE '<glue_database>'
REGION AS '<region>';GRANT USAGE ON SCHEMA ext_schema_from_parquets TO GROUP your_group;-- 如果没有Glue Catalog可用,则手动定义EXTERNAL TABLE如下所示:
/*
CREATE EXTERNAL TABLE schema.ext_table (column1 datatype,...
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://path/to/output/';
*/-- 执行实际COPY操作
COPY target_redshift_table
FROM '{manifest_file}'
CREDENTIALS 'aws_access_key_id=<access-key-id>;aws_secret_access_key=<secret-access-key>' -- or better practice with iam role arn instead!
MANIFEST GZIP REGION '<region>';
请注意替换 {manifest_file}
为您所拥有的清单文档的实际地址,这个JSON文件应该列出所有的.parquet
分片及其大小信息。此外,出于安全原因强烈建议不要硬编码凭证字符串而是改用预设的服务账号身份验证途径。