上篇我介绍了环境搭建、duckdb数据准备、sqlmesh数据模型、plan命令运行。本文继续介绍审计、测试、生成血缘关系以及python模型等。
有两种方法可以在SQLMesh中创建宏。一种方法是使用Python,另一种方法是使用Jinja。这里我们创建Python宏。让我们构建简单的Python宏。在宏文件夹下创建一个名为“ custom_calc.py ”的Python文件,并添加以下代码:
from sqlmesh import macro@macro()
def multiply_by_10(evaluator, col):return col * 10
请注意,你必须为SQLMesh添加@macro装饰器,以便将其识别为宏并在模型中使用,还需要添加‘ evaluator ’作为它的函数参数之一。
让我们将这个宏添加到“example.intermediate.sql”模型中:
MODEL (name example.intermediate_model,owner tommy,kind FULL,cron '@daily',grain id,column_descriptions (id = 'primary key',letter = 'alphabet letter',value = 'random value',updated_date = 'updated date',new_col = 'a new column'));SELECTid,letter,value,@multiply_by_10(value) AS big_value,updated_date,'new_col' AS new_colFROMexample.base_model
我们在模型中添加了@multiply_by_10(value) AS big_value。“@”符号用于在SQL模型中引用宏。创建Python宏并将其添加到模型中就像刚才看到的一样简单。
使用Python宏,可以不受限于SQL所能做的事情,你可以为数据转换构建任何逻辑。
提示:在宏Python文件中,也可以创建其他函数,而不需要‘ @macro ’装饰器和‘ evaluator ’参数。这样可以更有效地使用这些函数来帮助组织宏函数中的代码和逻辑。
审计(Audits)
SQLMesh审计基本上是dbt测试。在SQLMesh中有内置审计,如‘ unique ’和‘ not_null ’。你还可以创建自己的自定义审计。
创建自定义审计
你应该创建一个SQL文件,其中包含审计文件夹下的自定义审计。
AUDIT (name assert_positive_ids,);SELECT *FROM @this_modelWHEREid < 0
这是为了检查“id”列只包含正数。
向模型添加自定义审计
接下来,我们将这个自定义审计以及其他一些内置审计添加到“base_model.sql”:
将这些行添加到MODEL块中:
audits (assert_positive_ids,unique_values(columns = id),not_null(columns = id))
完整代码如下:
MODEL (name example.base_model,owner Yuki,kind VIEW,cron '@daily',grain id,column_descriptions (id = 'primary key',letter = 'alphabet letter',value = 'random value',updated_date = 'updated date'),audits (assert_positive_ids,unique_values(columns = id),not_null(columns = id)));SELECTid::INT,letter::TEXT,value::INT,updated_date::DATE,FROMexample.letters
运行的审计
SQLMesh使用‘ SQLMesh plan ’命令(在模型执行之后)自动运行审计。你也可以运行这个命令来只运行审计(你可能需要在运行这个命令之前应用一个计划):
sqlmesh audit
输出结果:
Found 3 audit(s).
assert_positive_ids on model example.base_model ✅ PASS.
unique_values on model example.base_model ✅ PASS.
not_null on model example.base_model ✅ PASS.Finished with 0 audit errors and 0 audits skipped.
Done.
在SQLMesh中审计的一个好处是,默认情况下,如果审计失败,SQLMesh会停止管道的执行,以防止错误的数据继续执行。
测试
SQLMesh测试用于测试代码,而不是测试数据。我们只需要在yaml文件中为测试提供输入和预期输出。
创建测试
SQLMesh通过‘ SQLMesh create_test ’命令简化了这个过程。继续运行下面的代码,指定您想要为其创建测试的模型、它的上游模型和一个示例查询。
sqlmesh create_test example.intermediate_model --query example.base_model "SELECT * FROM example.base_model WHERE updated_date BETWEEN '2025-01-01' and '2025-01-15'"
下面是上面的命令为我生成的测试文件,在tests目录下生成test_intermediate_model.yaml文件,内容如下:
test_intermediate_model:model: '"db"."example"."intermediate_model"'inputs:'"db"."example"."base_model"':- id: 1letter: Avalue: 10updated_date: 2025-01-07- id: 2letter: Bvalue: 20updated_date: 2025-01-07- id: 3letter: Cvalue: 30updated_date: 2025-01-07outputs:query:- id: 1letter: Avalue: 10big_value: 100updated_date: 2025-01-07new_col: new_col- id: 2letter: Bvalue: 20big_value: 200updated_date: 2025-01-07new_col: new_col- id: 3letter: Cvalue: 30big_value: 300updated_date: 2025-01-07new_col: new_col
运行测试
“ sqlmesh plan ”命令运行测试(在执行模型之前)以及“ sqlmesh test ”命令。让我们运行这两个命令:
sqlmesh plan dev
输出结果:
$ sqlmesh plan dev
======================================================================
Successfully Ran 1 tests against duckdb
----------------------------------------------------------------------No changes to plan: project files match the `dev` environment
sqlmesh test
输出结果:
$ sqlmesh test
.
----------------------------------------------------------------------
Ran 1 test in 0.035sOK
如果你还记得,我们为测试连接配置了一个不同的duckdb。这意味着该测试将使用测试连接“test.db”执行,而不是使用“db.db”。如果你想了解更多关于SQLMesh测试的知识,你会发现官方文档很有帮助!
血缘关系(DAG)
许多数据工具的典型特性是能够可视化数据血缘关系。SQLMesh也可以通过CLI或SQLMesh UI提供这种功能。使用CLI,您可以运行‘ sqlmesh dag file_name ’来生成简单的数据血缘关系。
sqlmesh dag dag.html
嗯,这是相当有限的。您只能看到整体的数据流,这对你来说可能不够。当你希望看到更详细的日期时,需要使用SQLMesh UI。为此,你必须安装一个依赖项:
pip install 'sqlmesh[web]'
然后运行以下命令在浏览器中打开UI:
sqlmesh ui
输出内容:
$ sqlmesh ui
INFO: Started server process [465829]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
这里提示你打开UI,你将看到一个漂亮的UI,如下所示(如果你没有看到血缘关系,你可能需要展开该区域或单击左侧的模型文件),如:点击了“base_model.sql”):
要查看列级血缘关系吗?你可以点击列。例如,让我们点击example.intermediate_model中的new_col、big_value和value列:
SQLMesh显示了‘ value ’和‘ big_value ’是如何来自上游表中的‘ value ’的。然而,new_col在我点击后变成了灰色。
当涉及到SQLMesh UI所提供的功能时,数据沿袭只是冰山一角。如果您选择这样做,您可以从这个UI执行所有的开发任务。既然我们在这里,让我们看看如何找到模型元数据/定义。
点击左侧的“数据目录”图标。它把你带到一个视图中,你可以看到模型元数据,包括模型和列描述:
Python模型
我知道你们很多人都是狂热的 Python 爱好者,有些事情用Python可以做得更好。虽然你需要在Python模型中返回pandas或Spark数据框架,但我将使用polar进行转换部分,因为它的整体实用性,如速度和干净的API:
“example.intermediate_py_model.py”:
import typing as t
from datetime import datetimefrom sqlmesh import ExecutionContext, model
import pandas as pd
import polars as pl@model(name="example.intermediate_py_model",owner="Yuki",kind="FULL",cron="@daily",grain="id",columns={"id": "int","letter": "text","value": "int","big_value": "int","updated_date": "date","new_col": "text",},column_descriptions={"id": "primary key","letter": "alphabet letter","value": "random value","big_value": "value * 10","updated_date": "updated date","new_col": "a new column",},
)
def execute(context: ExecutionContext,start: datetime,end: datetime,execution_time: datetime,**kwargs: t.Any,
) -> pd.DataFrame:table = context.resolve_table("example.base_model")df = (pl.from_pandas(context.fetchdf(f"SELECT * FROM {table}")).select("id","letter","value",pl.col("value").mul(10).alias("big_value"),"updated_date",pl.lit("new_col").alias("new_col"),))return df.to_pandas()
注意事项:
- 整体结构是相同的,包括模型属性。
- Python模型要求您返回pandas或Spark数据框架。
- Python模型需要指定列模式。
- 使用‘ ExecutionContext ’是Python模型中的一种典型方法。它提供了对上游表、全局变量等的访问。
- 我没有在这个Python模型中使用宏函数来计算big_value列。原因是我们在Python模型中引用自定义Python宏的方式有点麻烦(在撰写本文时)。如果你愿意,你可以这样做:
-
导入宏函数就像在Python模型中导入Python函数一样。
-
在‘ MacroEvaluator ’类中导入,并将其作为参数传递给宏函数。
- 或者你将你的函数定义为一个通用的Python函数,没有‘ @macro ’装饰器和‘ evaluator ’参数,这样你就不需要导入和传入‘ MacroEvaluator ’类(Tobiko Slack线程)。
SQLMesh中的Python模型非常灵活,因为只要它们返回pandas或Spark数据框架,你就可以做几乎任何事情。如果愿意,你甚至可以在SQLMesh中将数据摄取作业构建为Python模型。
最后总结
显然,本文无法涵盖SQLMesh所提供的所有内容。未来我们继续学习下面列出的相关内容:
- 深入了解模型类型/种类
- Pre/post语句
- 有用的CLI命令(table_dff, sqlmesh evaluate等)
- 开源Github Actions CI/CD Bot
- 等
SQLMesh是一个令人兴奋且不断发展的工具。我将继续与大家分享我的见解。如果有任何关于SQLMesh或其他工具的具体内容,请随时告诉我。