SQLMesh系列教程-2:SQLMesh入门项目实战(下篇)

news/2025/2/13 17:43:58/

上篇我介绍了环境搭建、duckdb数据准备、sqlmesh数据模型、plan命令运行。本文继续介绍审计、测试、生成血缘关系以及python模型等。

在这里插入图片描述

有两种方法可以在SQLMesh中创建宏。一种方法是使用Python,另一种方法是使用Jinja。这里我们创建Python宏。让我们构建简单的Python宏。在宏文件夹下创建一个名为“ custom_calc.py ”的Python文件,并添加以下代码:

from sqlmesh import macro@macro()
def multiply_by_10(evaluator, col):return col * 10

请注意,你必须为SQLMesh添加@macro装饰器,以便将其识别为宏并在模型中使用,还需要添加‘ evaluator ’作为它的函数参数之一。

让我们将这个宏添加到“example.intermediate.sql”模型中:

MODEL (name example.intermediate_model,owner tommy,kind FULL,cron '@daily',grain id,column_descriptions (id = 'primary key',letter = 'alphabet letter',value = 'random value',updated_date = 'updated date',new_col = 'a new column'));SELECTid,letter,value,@multiply_by_10(value) AS big_value,updated_date,'new_col' AS new_colFROMexample.base_model

我们在模型中添加了@multiply_by_10(value) AS big_value。“@”符号用于在SQL模型中引用宏。创建Python宏并将其添加到模型中就像刚才看到的一样简单。

使用Python宏,可以不受限于SQL所能做的事情,你可以为数据转换构建任何逻辑。

提示:在宏Python文件中,也可以创建其他函数,而不需要‘ @macro ’装饰器和‘ evaluator ’参数。这样可以更有效地使用这些函数来帮助组织宏函数中的代码和逻辑。

审计(Audits)

SQLMesh审计基本上是dbt测试。在SQLMesh中有内置审计,如‘ unique ’和‘ not_null ’。你还可以创建自己的自定义审计。

创建自定义审计

你应该创建一个SQL文件,其中包含审计文件夹下的自定义审计。

AUDIT (name assert_positive_ids,);SELECT *FROM @this_modelWHEREid < 0

这是为了检查“id”列只包含正数。

向模型添加自定义审计

接下来,我们将这个自定义审计以及其他一些内置审计添加到“base_model.sql”:

将这些行添加到MODEL块中:

audits (assert_positive_ids,unique_values(columns = id),not_null(columns = id))

完整代码如下:

MODEL (name example.base_model,owner Yuki,kind VIEW,cron '@daily',grain id,column_descriptions (id = 'primary key',letter = 'alphabet letter',value = 'random value',updated_date = 'updated date'),audits (assert_positive_ids,unique_values(columns = id),not_null(columns = id)));SELECTid::INT,letter::TEXT,value::INT,updated_date::DATE,FROMexample.letters

运行的审计

SQLMesh使用‘ SQLMesh plan ’命令(在模型执行之后)自动运行审计。你也可以运行这个命令来只运行审计(你可能需要在运行这个命令之前应用一个计划):

sqlmesh audit

输出结果:

Found 3 audit(s).
assert_positive_ids on model example.base_model ✅ PASS.
unique_values on model example.base_model ✅ PASS.
not_null on model example.base_model ✅ PASS.Finished with 0 audit errors and 0 audits skipped.
Done.

在SQLMesh中审计的一个好处是,默认情况下,如果审计失败,SQLMesh会停止管道的执行,以防止错误的数据继续执行。

测试

SQLMesh测试用于测试代码,而不是测试数据。我们只需要在yaml文件中为测试提供输入和预期输出。

创建测试

SQLMesh通过‘ SQLMesh create_test ’命令简化了这个过程。继续运行下面的代码,指定您想要为其创建测试的模型、它的上游模型和一个示例查询。

sqlmesh create_test example.intermediate_model --query example.base_model "SELECT * FROM example.base_model WHERE updated_date BETWEEN '2025-01-01' and '2025-01-15'" 

下面是上面的命令为我生成的测试文件,在tests目录下生成test_intermediate_model.yaml文件,内容如下:

test_intermediate_model:model: '"db"."example"."intermediate_model"'inputs:'"db"."example"."base_model"':- id: 1letter: Avalue: 10updated_date: 2025-01-07- id: 2letter: Bvalue: 20updated_date: 2025-01-07- id: 3letter: Cvalue: 30updated_date: 2025-01-07outputs:query:- id: 1letter: Avalue: 10big_value: 100updated_date: 2025-01-07new_col: new_col- id: 2letter: Bvalue: 20big_value: 200updated_date: 2025-01-07new_col: new_col- id: 3letter: Cvalue: 30big_value: 300updated_date: 2025-01-07new_col: new_col

运行测试

sqlmesh plan ”命令运行测试(在执行模型之前)以及“ sqlmesh test ”命令。让我们运行这两个命令:

sqlmesh plan dev

输出结果:

$ sqlmesh plan dev
======================================================================
Successfully Ran 1 tests against duckdb
----------------------------------------------------------------------No changes to plan: project files match the `dev` environment
sqlmesh test

输出结果:

$ sqlmesh test
.
----------------------------------------------------------------------
Ran 1 test in 0.035sOK

如果你还记得,我们为测试连接配置了一个不同的duckdb。这意味着该测试将使用测试连接“test.db”执行,而不是使用“db.db”。如果你想了解更多关于SQLMesh测试的知识,你会发现官方文档很有帮助!

血缘关系(DAG)

许多数据工具的典型特性是能够可视化数据血缘关系。SQLMesh也可以通过CLI或SQLMesh UI提供这种功能。使用CLI,您可以运行‘ sqlmesh dag file_name ’来生成简单的数据血缘关系。

sqlmesh dag dag.html

在这里插入图片描述

嗯,这是相当有限的。您只能看到整体的数据流,这对你来说可能不够。当你希望看到更详细的日期时,需要使用SQLMesh UI。为此,你必须安装一个依赖项:

pip install 'sqlmesh[web]'

然后运行以下命令在浏览器中打开UI:

sqlmesh ui

输出内容:

$ sqlmesh ui
INFO:     Started server process [465829]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

这里提示你打开UI,你将看到一个漂亮的UI,如下所示(如果你没有看到血缘关系,你可能需要展开该区域或单击左侧的模型文件),如:点击了“base_model.sql”):
在这里插入图片描述

要查看列级血缘关系吗?你可以点击列。例如,让我们点击example.intermediate_model中的new_col、big_value和value列:

在这里插入图片描述

SQLMesh显示了‘ value ’和‘ big_value ’是如何来自上游表中的‘ value ’的。然而,new_col在我点击后变成了灰色。

当涉及到SQLMesh UI所提供的功能时,数据沿袭只是冰山一角。如果您选择这样做,您可以从这个UI执行所有的开发任务。既然我们在这里,让我们看看如何找到模型元数据/定义。

点击左侧的“数据目录”图标。它把你带到一个视图中,你可以看到模型元数据,包括模型和列描述:

在这里插入图片描述

Python模型

我知道你们很多人都是狂热的 Python 爱好者,有些事情用Python可以做得更好。虽然你需要在Python模型中返回pandas或Spark数据框架,但我将使用polar进行转换部分,因为它的整体实用性,如速度和干净的API:

“example.intermediate_py_model.py”:

import typing as t
from datetime import datetimefrom sqlmesh import ExecutionContext, model
import pandas as pd
import polars as pl@model(name="example.intermediate_py_model",owner="Yuki",kind="FULL",cron="@daily",grain="id",columns={"id": "int","letter": "text","value": "int","big_value": "int","updated_date": "date","new_col": "text",},column_descriptions={"id": "primary key","letter": "alphabet letter","value": "random value","big_value": "value * 10","updated_date": "updated date","new_col": "a new column",},
)
def execute(context: ExecutionContext,start: datetime,end: datetime,execution_time: datetime,**kwargs: t.Any,
) -> pd.DataFrame:table = context.resolve_table("example.base_model")df = (pl.from_pandas(context.fetchdf(f"SELECT * FROM {table}")).select("id","letter","value",pl.col("value").mul(10).alias("big_value"),"updated_date",pl.lit("new_col").alias("new_col"),))return df.to_pandas()

注意事项:

  • 整体结构是相同的,包括模型属性。
  • Python模型要求您返回pandas或Spark数据框架。
  • Python模型需要指定列模式。
  • 使用‘ ExecutionContext ’是Python模型中的一种典型方法。它提供了对上游表、全局变量等的访问。
  • 我没有在这个Python模型中使用宏函数来计算big_value列。原因是我们在Python模型中引用自定义Python宏的方式有点麻烦(在撰写本文时)。如果你愿意,你可以这样做:
  1. 导入宏函数就像在Python模型中导入Python函数一样。

  2. 在‘ MacroEvaluator ’类中导入,并将其作为参数传递给宏函数。

图
在这里插入图片描述

  1. 或者你将你的函数定义为一个通用的Python函数,没有‘ @macro ’装饰器和‘ evaluator ’参数,这样你就不需要导入和传入‘ MacroEvaluator ’类(Tobiko Slack线程)。

SQLMesh中的Python模型非常灵活,因为只要它们返回pandas或Spark数据框架,你就可以做几乎任何事情。如果愿意,你甚至可以在SQLMesh中将数据摄取作业构建为Python模型。

最后总结

显然,本文无法涵盖SQLMesh所提供的所有内容。未来我们继续学习下面列出的相关内容:

  • 深入了解模型类型/种类
  • Pre/post语句
  • 有用的CLI命令(table_dff, sqlmesh evaluate等)
  • 开源Github Actions CI/CD Bot

SQLMesh是一个令人兴奋且不断发展的工具。我将继续与大家分享我的见解。如果有任何关于SQLMesh或其他工具的具体内容,请随时告诉我。


http://www.ppmy.cn/news/1571763.html

相关文章

使用 Express 写接口

在现代 Web 开发中&#xff0c;构建高效的 RESTful API 是非常重要的。Node.js 和其上的 Express 框架为开发者提供了一种简便而强大的方式来创建这些接口。本文将详细介绍如何使用 Express 来编写和部署一个简单的 RESTful API&#xff0c;涵盖从安装到实现增删改查&#xff0…

【ESP32】ESP-IDF开发 | WiFi开发 | HTTP服务器

1. 简介 1.1 HTTP HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;&#xff0c;全称超文本传输协议&#xff0c;用于从网络服务器传输超文本到本地浏览器的传送协议。它可以使浏览器更加高效&#xff0c;使网络传输减少。它不仅保证计算机正确快速地传输超文本文档…

腾讯云HAI部署DeepSeek结合Ollama API搭建智能对话系统

前言 本文将详细介绍如何在腾讯云HAI平台上部署DeepSeek模型&#xff0c;并配置使用Ollama API服务以实现对外部请求的支持。通过对前期准备、部署流程、API服务配置及使用的详细阐述&#xff0c;希望能为读者提供一个全面且实用的指南&#xff0c;助力AI应用的高效开发和部署…

如何从0开始将vscode源码编译、运行、打包桌面APP

** 网上关于此的内容很少&#xff0c;今天第二次的完整运行了&#xff0c;按照下文的顺序走不会出什么问题。最重要的就是环境的安装&#xff0c;否则极其容易报错&#xff0c;请参考我的依赖版本以及文末附上的vscode官方指南 ** 第一步&#xff1a;克隆 VSCode 源码 首先…

使用Python爬虫获取1688 App原数据API接口

一、引言 在电商领域&#xff0c;数据是企业决策、市场分析和产品优化的关键要素。1688作为国内领先的B2B电商平台&#xff0c;汇聚了海量的商品信息和交易数据。通过获取1688 App的原数据API接口&#xff0c;企业可以精准把握市场动态&#xff0c;了解竞争对手的策略&#xf…

smart代理VSwebshare哪家http代理商的IP代理综合质量由于911代理?

在选择HTTP代理商时&#xff0c;综合考虑其IP代理的质量至关重要&#xff0c;本文将比较Smart代理与Webshare两家HTTP代理商在多个方面优于911代理&#xff0c;并解释为何需要进行这种代理商之间的对比。 如何考核一家HTTP代理商的IP代理综合质量&#xff1f; 为了评估一家HTT…

ASP.NET Core SignalR的分布式部署

假设聊天室程序被部署在两台服务器上&#xff0c;客户端1、2连接到了服务器A上的ChatRoomHub&#xff0c;客户端3、4连接到服务器B上的ChatRoomHub&#xff0c;那么客户端1发送群聊消息时&#xff0c;只有客户端1、2能够收到&#xff0c;客户端3、4收不到&#xff1b;在客户端3…

解决MybatisPlus updateById更新数据时将没传的数据也更新成了null

首先&#xff0c;MybatisPlus在调用自带的更新接口updateById时&#xff0c;如果没加任何配置&#xff0c;默认是不会将前端没传的数据也更新成null的。即MyBatisPlus不会更新传入实体中为null的字段&#xff0c;只会更新设置了不为null的值。 如果发现没传的也更新成null了的话…