为数据复制或数据迁移构建 ELT 数据管道的一个重要部分是能够在出现错误时进行监视并获得通知。如果您不知道错误,您的数据将包含不一致之处,并且您的报告将不准确。由于使用的工具数量众多,大多数管道的复杂性使得设置监视和警报系统更具挑战性。
在本文中,我将分享为什么为 ELT 数据管道设置监视和警报系统很重要。我解释了要监视的关键指标,以及设置监视和警报系统时将遇到的常见挑战。我进一步强调了不同的监控和警报工具,并展示了如何使用Google的数据验证工具(DVT)和数据构建工具(dbt)实现典型的监视/警报系统。
为什么要监视数据管道?
好吧,问题应该是,“为什么不监视数据管道”?。这是因为让您的管道作为黑匣子运行对业务来说可能非常昂贵。让我分享一个个人故事来解释这一点。我公司用于数据复制的初始管道是使用 AWS 数据迁移服务 (DMS) 设计的,将数据从 RDS (PostgreSQL) 副本复制到 S3 存储桶。然后,我们让 Snowpipe(Snowflake 的 ELT 工具)从 S3 存储桶获取新数据,并将这些数据通过管道传输到 Snowflake(我们的数据仓库)。
这种架构有效,但它是一个完整的黑匣子,因为我们很少或根本看不到引擎盖下发生的事情。没有适当的警报或通知系统来通知我们管道故障。只有当我们在一天或更长时间后看到数据不一致或下游分析不准确时,我们才会知道管道故障。但这对业务有何影响?这对我们的影响之一是客户流失率的提高。由于我们的数据到达较晚且不一致,我们无法及时检测到客户何时遇到 KYC 验证问题。
以下是应监视管道的主要原因:
全面鸟瞰数据运行状况。
防止数据传输不一致。
获取持续的数据测试方法。
及早发现数据质量和数据完整性问题。
跟踪数据处理成本、元数据和整体系统性能。
提供反馈以优化管道性能。
应监视哪些指标?
虽然设置这些监视/警报系统以了解管道非常重要,但确定要衡量的关键指标以及要使用哪些工具并不总是一项简单的任务。这是因为要衡量的指标在很大程度上取决于数据管道的用例和其他几个因素。
例如,如果其中一个管道实时提供用于跟踪应用程序服务器停机时间的关键数据,那么您的首要任务是根据组织或团队定义的 SLA 监控数据到达的延迟。
以下是要监控的不同类别的指标,适用于您或您的组织可能拥有的任何用例:
数据质量监控
通过数据质量监控,建立一个监控系统,以持续验证管道不同阶段的数据质量。首先,在提取加载 (EL) 步骤中,在加载作业完成后,根据目标中的数据验证源中的数据质量。此处监视的关键指标包括源-目标记录计数匹配、源-目标列计数匹配、数据格式错误、数据卷错误、列名更改、引用完整性等。其次,在转换作业运行后的转换步骤中监视数据的质量。此步骤监控的关键指标包括:数据类型错误、空值等。
管道可靠性监测
在这里,监控侧重于管道的端到端可靠性。在管道的不同步骤中监视错误:
提取加载 (EL) 步骤:此步骤由 Airbyte 等 ELT 工具处理。此处会出现错误,例如身份验证问题导致的同步失败、规范化错误、同步期间加载新列 (SCD) 时出错、JSON 架构验证程序错误等。受到监控。
转换步骤:此步骤由 dbt 等转换工具处理。此处会出现转换作业运行失败、数据传输延迟(运行持续时间长于预期)、数据沿袭或数据丢失问题等错误。受到监控。
业务指标监控
这种类型的监视发生在管道的转换阶段之后。在这里,监视转换后的数据,以根据特定的业务需求识别异常。例如,货币价格贬值或升值、基于市场价值的交易损失等。当这些指标达到特定阈值时,将触发警报。
监视数据管道有哪些挑战?
ELT 数据管道是使用多种工具组合构建的,包括 dbt、Airflow、Airbyte、SQL、云服务、数据库、数据仓库和数据湖。这种工具的多样性有利于可扩展性,以及在数据堆栈的每一层使用最有效的工具。但是,这会导致管道中有许多移动部件。这可能会使监视或全面了解数据管道成为一场噩梦。
在设置管道监视和警报系统之前,我强烈建议先简化管道中要监视的进程数。具有多个可能故障点的复杂管道将需要在这些不同层设置监视/警报系统。这将使事情变得非常难以跟踪和管理。
为了简化我上面给出的示例中公司管道的复杂性,我们首先引入了 Airbyte——一个开源数据摄取工具来处理我们的数据复制。Airbyte 帮助我们减少了管道中可能的故障点数量。我们没有首先使用数据迁移服务 (DMS) 将数据复制到 S3 存储桶,而是使用 Airbyte 将数据直接复制到我们的仓库(Snowflake)。借助此架构,我们无需在数据流的三个不同级别进行监控:RDS-DMS 级别、DMS-S3 存储桶级别和 S3-Snowpipe 级别。现在,我们只在仓库级别监控我们的管道。
简化监视进程的数量后,让我们讨论数据管道监视工具以及如何为数据管道设置典型的监视/警报系统。
监控管道中的数据质量指标
市场上有很多工具可用于监视和触发数据管道中的警报。但是,这些工具的功能不同。虽然一些工具专注于监控云基础设施、日志和应用程序安全性,但其他工具则专注于监控数据质量、数据验证和数据沿袭。此外,一些工具是专有的基于云的解决方案,而另一些则是开源的。
一些专有的基于云的解决方案包括Monte Carlo,Databand,Datadog,Datafold,Accel Data。开源替代方案包括普罗米修斯、洛基、远大期望、数据验证工具 (DVT)、dbt 测试、Datafold 的数据差异等......需要注意的重要一点是,您可能需要组合其中两个或多个工具来实现您的目标。
在下一节中,我将介绍如何使用两个开源工具设置这些监视/警报系统:数据验证工具 (DVT) 和数据构建工具 (dbt)。
使用数据验证工具 (DVT) 进行数据验证监控
数据验证是在将数据用于业务运营之前检查数据的完整性、准确性和结构的做法。
数据验证是构建数据管道的关键步骤,因为它提供了在将数据用于下游分析之前检查数据有效性的层。
数据验证工具 (DVT) 是一种开源 Python CLI 工具,可将异构数据源表与多级验证函数进行比较。在数据加载过程完成后,您可以运行 DVT 进程来验证源表和目标表是否匹配且正确。DVT 支持列、行、自定义查询、架构、列数据类型验证以及许多数据仓库和数据库的连接。
Datafold 还提供了一个开源数据差异项目,用于有效地比较数据库和数据仓库之间的表。要了解有关使用 data-diff 的更多信息,请阅读有关验证从 Postgres 到 Snowflake 的数据复制管道的教程。
将 DVT 与 BigQuery 结合使用
您可以在任何云平台中的虚拟机上设置和运行 DVT。您还可以选择在 Docker 容器中运行 DVT。按照此处的说明在本地计算机或云环境中安装和设置 DVT。本节中的代码演练是在 Google Cloud 上的云外壳会话上运行的。DVT 提供了一个命令行界面 (CLI),用于在安装后执行 dvt 命令。
若要根据目标表验证源表,请先创建源连接和目标连接。我们通过 CLI 运行以下代码来做到这一点。
# Create MYSQL connection as Source connectiondata-validation connections add--connection-nameMYSQL_CONN MySQL--hostHOST_IP--portPORT--user-nameUSER-NAME--passwordPASSWORD # Create BigQuery connection as target connectiondata-validation connections add--connection-name$BigQuery_CONNBigQuery--project-id$MY_GCP_PROJECT
上面的代码片段将创建一个 MySQL 连接作为源连接,创建一个 BigQuery 连接作为目标连接。
列验证
列验证对源和目标都运行计数 (*)。这将计算源表中的列数,并验证它是否与目标表上的计数匹配。要运行列验证,请通过 CLI 执行数据验证运行命令。以下是 MySQL 源表和 BigQuery 目标表之间的列验证的外观:
data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
上面的代码片段将执行源表的列计数,并根据目标表的列计数进行验证。--bq-result-handler 标志将有助于将验证结果输出到中间 BigQuery 表。默认情况下,如果没有 --bq-result-handler 标志,验证结果将输出到控制台。
对于启用了表规范化的 Airbyte 同步,您需要指定要在列验证中验证的列的名称。这是为了排除 Airbyte 在同步期间添加的其他元数据列。下面的代码演示如何在验证中指定列:
data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--count column1, column2, column3, column4, column5 \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
行验证
行验证在源和目标上运行计数 (*)。这将计算源表中的行数,并验证它与目标表上的计数匹配。以下是 MySQL 源表和 BigQuery 目标表之间的行验证的外观:
data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50
--use-random-row 和 --random-row-batch-size 标志指定您只想随机验证行的子集。当您有大型表时,这会派上用场,因为行验证需要更多的内存和计算。
架构验证
架构验证将获取源表中每一列的列数据类型,并验证它是否与目标表的列数据类型匹配。源表和目标表中的类型不匹配会导致验证状态失败。
以下是 MySQL 源表和 BigQuery 目标表之间的架构验证的外观:
data-validation validate schema\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
行比较验证
这种类型的验证对源表和目标表中指定列的值执行逐行比较。这些值不匹配会导致验证状态失败。
以下是 MySQL 源表和 BigQuery 目标表之间的架构验证的外观:
data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50
下面是 BigQuery 表中验证结果的示例输出:
名为“difference”的列表示源表中的列/行计数与目标表中的列/行计数之间的差异。validation_status列显示验证的状态。
然后可以查询此表,并将错误通知/警报发送到电子邮件或 Slack 频道。
我将在上一节中介绍向 Slack 频道发送通知/警报。
从 YAML 文件运行验证
运行验证的另一种方法是将验证配置保存到 YAML 文件。这样,您可以存储以前的验证并轻松修改验证配置。此方法还有助于自动执行验证过程,因为验证可以按计划运行。
若要生成用于验证的 YAML 配置文件,请指定 --config-file 标志。请参阅下面的代码:
data-validation validatecolumn\--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \--config-file validation_config.yaml
下面是从上述代码生成的 YAML 配置的外观。
result_handler:project_id:your-project-idtable_id:data_validation.validation_resultstype:BigQuerysource:MYSQL_CONNtarget:BigQuery_CONNvalidations:-aggregates:-field_alias:countsource_column:nulltarget_column:nulltype:countcalculated_fields:[]filter_status:nullfilters:[]format:tablelabels:[]random_row_batch_size:nullschema_name:transportation_datatable_name:citibike_stationstarget_schema_name:onesphere-analytics.master_datatarget_table_name:citibike_stationsthreshold:0.0type:Columnuse_random_rows:false
生成的 YAML 配置文件可以在执行生成验证命令的目录中找到。
现在,可以使用以下代码从 YAML 配置文件运行验证:
data-validation run-config -c validation_config.yaml
使用数据构建工具 (dbt) 进行数据质量监控
dbt 是一种数据转换工具,使数据和分析工程师能够通过简单地编写 SQL 语句来转换其仓库中的数据。DBT 处理将这些 SELECT 语句转换为仓库中的表和视图。要设置 dbt 项目,请按照此设置 dbt 项目指南进行操作。
DBT 提供了用于执行数据质量检查的测试功能,包括数据类型检查、空值检查、重复检查、参照完整性检查等。让我们看看如何使用 dbt 测试执行数据质量检查。dbt 测试定义为单一测试、SQL 文件中的一般测试或具有返回失败记录逻辑的 YAML 配置文件。
下面是使用 YAML 配置文件为源表(订单表)定义测试的示例。
version: 2source: - name: orderscolumns: -name: order_idtests: - unique - not_null -name: statustests: -accepted_values:values: ['placed','shipped','completed','returned'] -name: customer_idtests: -relationships:to: ref('customers')field: id
上面示例中的测试配置首先检查orders_table order_id列中的重复值和非空值,然后在状态列中检查接受的值(“已放置”、“已发货”、“已完成”、“已退回”)。最后,它检查customer_id列中的引用完整性,以确保订单表上的每个customer_id在客户表上都有一个关联的 ID。
若要运行测试,请运行命令:dbt test --store-failures。
--store-failure 标志将测试结果存储在中间表中。然后,可以查询此表以发送错误/失败通知/警报。未通过测试的记录保存在数据仓库中后缀为“dbt_test__audit”的架构中的结果表中。
需要注意的是,如上所述,使用 dbt 监控数据质量的方法也适用于业务指标监控。
使用 dbt 进行管道可靠性监控
DBT 提供用于跟踪管道错误或作业故障的监视和警报系统。通知在 dbt 云上配置,并在作业运行后立即触发。通知可以发送到电子邮件或 Slack 频道。这是这方面的指南。对于管道中的数据提取转换 (EL) 层,Airbyte 提供了一个可靠的监控和警报系统,用于端到端监控并将同步失败/成功通知发送到 Slack 通道。按照此分步指南为空字节同步设置 Slack 通知。
向 Slack 频道发送通知
要发送通知/警报以跟踪管道的问题或故障,您将构建一个简单的 Slack 机器人,该机器人可以在无服务器函数(例如 AWS Lambda 或 GCP CloudFunction)上运行。机器人是一个简单的 Python 脚本,可以计划为按时间间隔运行或基于数据加载事件运行。
机器人将从上述部分查询包含数据质量和数据验证测试结果的任何表,并根据某些定义的逻辑发送通知/警报。
下面的代码片段实现了从 Google Cloud Function 运行的通知/提醒系统,并在源表中的行/列数与上述数据验证示例中的目标表不匹配时将通知/警报推送到 Slack 渠道。以下是创建 Slack 网络钩子网址的指南。
importpandasaspdimportrequestsfromgoogle.oauth2importservice_account# Credentials from GCP service account saved in a json file.credentials = service_account.Credential.from_service_account_file('./google_credentials.json')defslack_notification_bot(credentials, slack_webhook_url): query ='''select *
from validation_result_table
where validation_status = 'fail';'''validation_data = pd.read_gbq(query, project_id='gcp_project_id', credentials= credentials) message ='''Validation Report:\n
Error! Incomplete rows of data loaded'''iflen(validation_data) >0: requests.post(slack_webhook_url, json={'text': message})else:return'done'defmain():# Run the slack notification botslack_notification_bot(credentials, slack_webhook_url)
结论
在本文中,你看到了为 ELT 数据管道设置监视/警报系统的必要性。
数据质量、管道可靠性和业务指标监视是要监视管道的关键指标。管道复杂性是数据团队在计划为 ELT 管道设置监视/警报系统时将面临的主要挑战之一。我建议使用像Airbyte这样的数据复制工具来降低这种复杂性。然后,我查看了不同的专有数据管道监视/警报工具及其开源替代方案。我进一步深入研究了如何使用数据验证工具 (DVT) 和数据构建工具 (dbt) 设置典型的监控/警报系统。最后,我们了解了如何构建一个机器人来触发管道的通知/警报。