PyDeequ库在AWS EMR启动集群中数据质量检查功能的配置方法和实现代码

devtools/2025/2/2 8:52:59/

PyDeequ是一个基于Apache Spark的Python API,专门用于定义和执行“数据单元测试”,从而在大规模数据集中测量数据质量。
PyDeequ框架在PySpark代码中提供了全面的数据质量检查功能,能够帮助用户&有效地监控和提升大规模数据集的数据质量。它在PySpark代码中的数据质量检查功能主要包括以下几个方面:

核心组件

  1. 指标计算(Metrics Computation):利用分析器(Analyzers)对数据集的每一列进行分析,生成数据概要。

  2. 约束建议:自动提出基于不同分析组的验证约束,以确保数据的一致性。

  3. 约束验证:依据设定的标准对数据集进行实时或批量验证。

  4. 度量存储库:实现对验证历史的跟踪与存储,便于持续监控数据质量。

功能特性

  1. 数据剖析:PyDeequ可以对数据集的每一列进行深入的剖析,包括数据的完整性、空值情况、唯一性统计等关键指标。

  2. 约束定义与验证:用户可以定义各种数据质量约束,如数据的类型、范围、唯一性、非空性等,并使用PyDeequ对这些约束进行验证。验证结果会明确指出哪些数据不符合预设的约束条件。

  3. 灵活性与可扩展性:PyDeequ支持用户根据业务需求自定义约束条件和分析规则,灵活应对各种数据质量挑战。同时,它也易于集成到现有的PySpark工作流中。

  4. 报告与监控:PyDeequ可以生成详细的数据质量报告,帮助用户了解数据集的整体质量情况。此外,它还支持对验证历史的跟踪与存储,便于用户持续监控数据质量的变化趋势。

应用场景

  1. 数据湖管理:在AWS Glue、Athena等服务的支持下,PyDeequ可以帮助用户监控数据湖中的数据质量。

  2. 数据仓库:在数据仓库中,PyDeequ可以用于定期检测数据质量,防止数据质量问题影响业务决策。

  3. 实时数据处理:在实时数据处理系统中,PyDeequ可以用于实时监控数据流的质量。

一、AWS EMR 集群配置 PyDeequ 的具体步骤

1. 创建 Bootstrap Script (引导脚本)

PyDeequ 依赖 Java 库和 Python 包,需在 EMR 集群初始化时自动安装。

#!/bin/bash
# bootstrap.sh# 安装 Python 依赖
sudo pip3 install pydeequ# 下载 Deequ JAR 包到 Spark 类路径
aws s3 cp s3://deequ/jars/deequ-2.0.3-spark-3.1.jar /usr/lib/spark/jars/
2. 启动 EMR 集群时指定 Bootstrap 动作

通过 AWS CLI 或控制台启动集群时添加以下参数:

aws emr create-cluster \
--name "PyDeequ_Cluster" \
--release-label emr-6.9.0 \
--applications Name=Spark Name=Hadoop \
--instance-type m5.xlarge \
--instance-count 3 \
--bootstrap-actions Path="s3://your-bucket/bootstrap.sh" \
--use-default-roles
3. 关键验证点
  • 确保 JAR 文件路径正确:/usr/lib/spark/jars/deequ-*.jar
  • Python 环境需为 3.x,可通过 EMR 配置 emr-release-label >= 6.0

二、PyDeequ 数据质量检查核心代码示例

1. 初始化 SparkSession
python">from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("PyDeequ-Data-Quality") \.config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.3") \.config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") \.getOrCreate()
2. 指标计算(Metrics Computation)
python">from pydeequ.analyzers import *df = spark.read.parquet("s3://your-data-bucket/transactions")analysisResult = AnalysisRunner(spark) \.onData(df) \.addAnalyzer(Size()) \.addAnalyzer(Completeness("customer_id")) \.addAnalyzer(ApproxCountDistinct("order_id")) \.addAnalyzer(Mean("total_amount")) \.run()analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
3. 约束建议(Constraint Suggestion)
python">from pydeequ.suggestions import *suggestionResult = ConstraintSuggestionRunner(spark) \.onData(df) \.addConstraintRule(DEFAULT()) \.run()print("Suggested Constraints:")
for constraint in suggestionResult['constraint_suggestions']:print(f"- {constraint['description']}")
4. 约束验证(Constraint Verification)
python">from pydeequ.checks import *
from pydeequ.verification import *check = Check(spark, CheckLevel.Error, "DataQualityCheck")result = VerificationSuite(spark) \.onData(df) \.addCheck(check.hasSize(lambda x: x >= 1000) \.isComplete("customer_id") \.isUnique("order_id") \.isNonNegative("total_amount") \.hasPattern("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") \).run()result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)
5. 指标存储(Metric Repository)
python">from pydeequ.repository import *
from pydeequ.metrics import *metrics_repository = FileSystemMetricsRepository(spark, path="s3://quality-metrics-bucket/")
result_key = ResultKey(spark, datetime.strptime("2024-01-01", "%Y-%m-%d"))AnalysisRunner(spark) \.onData(df) \.useRepository(metrics_repository) \.saveOrAppendResult(result_key) \.addAnalyzer(Completeness("customer_id")) \.run()

三、关键配置说明

组件配置要点
JAR 依赖Deequ JAR 必须位于 Spark 的 jars 目录,版本需与 Spark 兼容
Python 版本EMR 6.x 默认使用 Python 3.7+,需通过 pip3 安装
权限配置EMR 角色需有权限访问 S3 存储桶(读取数据/写入指标)
优化参数调整 Spark 内存分配(spark.executor.memory)以处理大规模数据

四、高级应用场景扩展

1. 实时数据质量监控(Kafka 集成)
python">stream_df = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "kafka-host:9092") \.option("subscribe", "transactions-topic") \.load()def quality_check_microbatch(df, epoch_id):VerificationSuite(spark).onData(df).addCheck(...).run()stream_df.writeStream \.foreachBatch(quality_check_microbatch) \.start()
2. 自定义分析规则
python">from pydeequ.analyzers import Analyzerclass CustomRangeAnalyzer(Analyzer):def __init__(self, column, min_val, max_val):super().__init__()self.column = columnself.min = min_valself.max = max_valdef to_metric(self, state):# 实现自定义指标计算逻辑passanalysisResult = AnalysisRunner(spark) \.addAnalyzer(CustomRangeAnalyzer("temperature", 0, 100)) \.run()

以上配置和代码实现了 PyDeequ 在 AWS EMR 的完整数据质量流水线。实际部署时需根据数据规模调整 Spark 资源配置(spark-submit 参数),并建议将质量报告存储至 DynamoDB 或 Amazon CloudWatch 实现可视化监控。


http://www.ppmy.cn/devtools/155392.html

相关文章

基于阿里云百炼大模型Sensevoice-1的语音识别与文本保存工具开发

基于阿里云百炼大模型Sensevoice-1的语音识别与文本保存工具开发 摘要 随着人工智能技术的不断发展,语音识别在会议记录、语音笔记等场景中得到了广泛应用。本文介绍了一个基于Python和阿里云百炼大模型的语音识别与文本保存工具的开发过程。该工具能够高效地识别东…

[JavaScript] ES6及以后版本的新特性

文章目录 箭头函数(Arrow Functions)为什么需要箭头函数?箭头函数的完整语法箭头函数中的 this实用场景 解构赋值(Destructuring Assignment)为什么需要解构赋值?数组解构赋值的完整用法对象解构赋值的完整…

帝国CMS8.0终极栏目转换或批量改顺序成功后不能返回地址的解决方案

帝国CMS8.0终极栏目转换或批量改顺序成功后不能返回地址问题的解决办法: 修改 /e/class/classfun.php 文件, 查找2处 “$cache_ecmstourlurlencode(EcmsGetReturnUrl());” 修改为 “$cache_ecmstourlurlencode($_POST[&topic_c449dd39-aa1d-41…

数据分析系列--②RapidMiner导入数据和存储过程

一、下载数据 二、导入数据 1. 在本地计算机中创建3个文件夹 2. 从本地选择.csv或.xlsx 三、界面说明 四、存储过程 1.保存 Congratulations, you are done. 一、下载数据 点击下载AssociationAnalysisData.xlsx数据集 二、导入数据 1. 在本地计算机中创建3个文件夹 2. 从…

跨境支付领域中常用的英文单词(持续更新)

### **1. 支付方式 (Payment Methods)** 1. Credit Card 2. Debit Card 3. Bank Transfer 4. Wire Transfer 5. PayPal 6. Alipay 7. WeChat Pay 8. Apple Pay 9. Google Pay 10. Cryptocurrency 11. Digital Wallet 12. Mobile Payment 13. Cash on D…

BGP协议

BGP作为一种外部网关动态路由协议,其基于TCP的179号端口,其联系不同自治系统间的通讯 关于BGP的版本,最早只有1,2,3这三个版本,后经过改进有了BGP4,目前普遍使用BGP4版本,其可兼容I…

用一个例子详细说明python单例模式

单例模式是一种设计模式,它确保一个类只有一个实例,并提供一个全局访问点来访问该实例。这在需要控制资源(如数据库连接、文件系统等)的访问时非常有用。 下面是一个使用Python实现单例模式的例子: class Singleton:…

4. 劲舞团python解法——2024年省赛蓝桥杯真题

问题描述:4.劲舞团 - 蓝桥云课 小蓝最近迷上了一款名为 “劲舞团” 的游戏,具体来说,只要按照游戏中给出的键位提示依次按出对应的键位,游戏人物便可以跟随节奏跳舞。对于连续的 K 次正确敲击,如果任意连续的两次敲击…