SQLMesh系列教程-2:SQLMesh入门项目实战(下篇)

embedded/2025/2/13 0:24:42/

上篇我介绍了环境搭建、duckdb数据准备、sqlmesh数据模型、plan命令运行。本文继续介绍审计、测试、生成血缘关系以及python模型等。

在这里插入图片描述

有两种方法可以在SQLMesh中创建宏。一种方法是使用Python,另一种方法是使用Jinja。这里我们创建Python宏。让我们构建简单的Python宏。在宏文件夹下创建一个名为“ custom_calc.py ”的Python文件,并添加以下代码:

from sqlmesh import macro@macro()
def multiply_by_10(evaluator, col):return col * 10

请注意,你必须为SQLMesh添加@macro装饰器,以便将其识别为宏并在模型中使用,还需要添加‘ evaluator ’作为它的函数参数之一。

让我们将这个宏添加到“example.intermediate.sql”模型中:

MODEL (name example.intermediate_model,owner tommy,kind FULL,cron '@daily',grain id,column_descriptions (id = 'primary key',letter = 'alphabet letter',value = 'random value',updated_date = 'updated date',new_col = 'a new column'));SELECTid,letter,value,@multiply_by_10(value) AS big_value,updated_date,'new_col' AS new_colFROMexample.base_model

我们在模型中添加了@multiply_by_10(value) AS big_value。“@”符号用于在SQL模型中引用宏。创建Python宏并将其添加到模型中就像刚才看到的一样简单。

使用Python宏,可以不受限于SQL所能做的事情,你可以为数据转换构建任何逻辑。

提示:在宏Python文件中,也可以创建其他函数,而不需要‘ @macro ’装饰器和‘ evaluator ’参数。这样可以更有效地使用这些函数来帮助组织宏函数中的代码和逻辑。

审计(Audits)

SQLMesh审计基本上是dbt测试。在SQLMesh中有内置审计,如‘ unique ’和‘ not_null ’。你还可以创建自己的自定义审计。

创建自定义审计

你应该创建一个SQL文件,其中包含审计文件夹下的自定义审计。

AUDIT (name assert_positive_ids,);SELECT *FROM @this_modelWHEREid < 0

这是为了检查“id”列只包含正数。

向模型添加自定义审计

接下来,我们将这个自定义审计以及其他一些内置审计添加到“base_model.sql”:

将这些行添加到MODEL块中:

audits (assert_positive_ids,unique_values(columns = id),not_null(columns = id))

完整代码如下:

MODEL (name example.base_model,owner Yuki,kind VIEW,cron '@daily',grain id,column_descriptions (id = 'primary key',letter = 'alphabet letter',value = 'random value',updated_date = 'updated date'),audits (assert_positive_ids,unique_values(columns = id),not_null(columns = id)));SELECTid::INT,letter::TEXT,value::INT,updated_date::DATE,FROMexample.letters

运行的审计

SQLMesh使用‘ SQLMesh plan ’命令(在模型执行之后)自动运行审计。你也可以运行这个命令来只运行审计(你可能需要在运行这个命令之前应用一个计划):

sqlmesh audit

输出结果:

Found 3 audit(s).
assert_positive_ids on model example.base_model ✅ PASS.
unique_values on model example.base_model ✅ PASS.
not_null on model example.base_model ✅ PASS.Finished with 0 audit errors and 0 audits skipped.
Done.

在SQLMesh中审计的一个好处是,默认情况下,如果审计失败,SQLMesh会停止管道的执行,以防止错误的数据继续执行。

测试

SQLMesh测试用于测试代码,而不是测试数据。我们只需要在yaml文件中为测试提供输入和预期输出。

创建测试

SQLMesh通过‘ SQLMesh create_test ’命令简化了这个过程。继续运行下面的代码,指定您想要为其创建测试的模型、它的上游模型和一个示例查询。

sqlmesh create_test example.intermediate_model --query example.base_model "SELECT * FROM example.base_model WHERE updated_date BETWEEN '2025-01-01' and '2025-01-15'" 

下面是上面的命令为我生成的测试文件,在tests目录下生成test_intermediate_model.yaml文件,内容如下:

test_intermediate_model:model: '"db"."example"."intermediate_model"'inputs:'"db"."example"."base_model"':- id: 1letter: Avalue: 10updated_date: 2025-01-07- id: 2letter: Bvalue: 20updated_date: 2025-01-07- id: 3letter: Cvalue: 30updated_date: 2025-01-07outputs:query:- id: 1letter: Avalue: 10big_value: 100updated_date: 2025-01-07new_col: new_col- id: 2letter: Bvalue: 20big_value: 200updated_date: 2025-01-07new_col: new_col- id: 3letter: Cvalue: 30big_value: 300updated_date: 2025-01-07new_col: new_col

运行测试

sqlmesh plan ”命令运行测试(在执行模型之前)以及“ sqlmesh test ”命令。让我们运行这两个命令:

sqlmesh plan dev

输出结果:

$ sqlmesh plan dev
======================================================================
Successfully Ran 1 tests against duckdb
----------------------------------------------------------------------No changes to plan: project files match the `dev` environment
sqlmesh test

输出结果:

$ sqlmesh test
.
----------------------------------------------------------------------
Ran 1 test in 0.035sOK

如果你还记得,我们为测试连接配置了一个不同的duckdb。这意味着该测试将使用测试连接“test.db”执行,而不是使用“db.db”。如果你想了解更多关于SQLMesh测试的知识,你会发现官方文档很有帮助!

血缘关系(DAG)

许多数据工具的典型特性是能够可视化数据血缘关系。SQLMesh也可以通过CLI或SQLMesh UI提供这种功能。使用CLI,您可以运行‘ sqlmesh dag file_name ’来生成简单的数据血缘关系。

sqlmesh dag dag.html

在这里插入图片描述

嗯,这是相当有限的。您只能看到整体的数据流,这对你来说可能不够。当你希望看到更详细的日期时,需要使用SQLMesh UI。为此,你必须安装一个依赖项:

pip install 'sqlmesh[web]'

然后运行以下命令在浏览器中打开UI:

sqlmesh ui

输出内容:

$ sqlmesh ui
INFO:     Started server process [465829]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

这里提示你打开UI,你将看到一个漂亮的UI,如下所示(如果你没有看到血缘关系,你可能需要展开该区域或单击左侧的模型文件),如:点击了“base_model.sql”):
在这里插入图片描述

要查看列级血缘关系吗?你可以点击列。例如,让我们点击example.intermediate_model中的new_col、big_value和value列:

在这里插入图片描述

SQLMesh显示了‘ value ’和‘ big_value ’是如何来自上游表中的‘ value ’的。然而,new_col在我点击后变成了灰色。

当涉及到SQLMesh UI所提供的功能时,数据沿袭只是冰山一角。如果您选择这样做,您可以从这个UI执行所有的开发任务。既然我们在这里,让我们看看如何找到模型元数据/定义。

点击左侧的“数据目录”图标。它把你带到一个视图中,你可以看到模型元数据,包括模型和列描述:

在这里插入图片描述

Python模型

我知道你们很多人都是狂热的 Python 爱好者,有些事情用Python可以做得更好。虽然你需要在Python模型中返回pandas或Spark数据框架,但我将使用polar进行转换部分,因为它的整体实用性,如速度和干净的API:

“example.intermediate_py_model.py”:

import typing as t
from datetime import datetimefrom sqlmesh import ExecutionContext, model
import pandas as pd
import polars as pl@model(name="example.intermediate_py_model",owner="Yuki",kind="FULL",cron="@daily",grain="id",columns={"id": "int","letter": "text","value": "int","big_value": "int","updated_date": "date","new_col": "text",},column_descriptions={"id": "primary key","letter": "alphabet letter","value": "random value","big_value": "value * 10","updated_date": "updated date","new_col": "a new column",},
)
def execute(context: ExecutionContext,start: datetime,end: datetime,execution_time: datetime,**kwargs: t.Any,
) -> pd.DataFrame:table = context.resolve_table("example.base_model")df = (pl.from_pandas(context.fetchdf(f"SELECT * FROM {table}")).select("id","letter","value",pl.col("value").mul(10).alias("big_value"),"updated_date",pl.lit("new_col").alias("new_col"),))return df.to_pandas()

注意事项:

  • 整体结构是相同的,包括模型属性。
  • Python模型要求您返回pandas或Spark数据框架。
  • Python模型需要指定列模式。
  • 使用‘ ExecutionContext ’是Python模型中的一种典型方法。它提供了对上游表、全局变量等的访问。
  • 我没有在这个Python模型中使用宏函数来计算big_value列。原因是我们在Python模型中引用自定义Python宏的方式有点麻烦(在撰写本文时)。如果你愿意,你可以这样做:
  1. 导入宏函数就像在Python模型中导入Python函数一样。

  2. 在‘ MacroEvaluator ’类中导入,并将其作为参数传递给宏函数。

图
在这里插入图片描述

  1. 或者你将你的函数定义为一个通用的Python函数,没有‘ @macro ’装饰器和‘ evaluator ’参数,这样你就不需要导入和传入‘ MacroEvaluator ’类(Tobiko Slack线程)。

SQLMesh中的Python模型非常灵活,因为只要它们返回pandas或Spark数据框架,你就可以做几乎任何事情。如果愿意,你甚至可以在SQLMesh中将数据摄取作业构建为Python模型。

最后总结

显然,本文无法涵盖SQLMesh所提供的所有内容。未来我们继续学习下面列出的相关内容:

  • 深入了解模型类型/种类
  • Pre/post语句
  • 有用的CLI命令(table_dff, sqlmesh evaluate等)
  • 开源Github Actions CI/CD Bot

SQLMesh是一个令人兴奋且不断发展的工具。我将继续与大家分享我的见解。如果有任何关于SQLMesh或其他工具的具体内容,请随时告诉我。


http://www.ppmy.cn/embedded/161727.html

相关文章

火热的大模型: AIGC架构解析

文章目录 一、背景介绍二、架构描述数据层模型层&#xff08;MaaS&#xff09;服务层&#xff08;PaaS&#xff09;基础设施层&#xff08;IaaS&#xff09;应用层 三、架构分析四、应用场景与价值4.1 典型场景4.2 价值体现 五、总结 一、背景介绍 火热的大模型&#xff0c;每…

服务器芯片合封电源解决方案

方案简介 为满足人工智能、机器学习、大数据挖掘等高性能应用日益增长 的需求,处理器工作电流现已提高到数百安培。将大电流供电单元部署在处理器附近的负载点电源架构,可减少主板上的配电损耗,但不能减少处理器与主板之间的互连挑战。随着处理器电流的增大,与处理器的剩余…

前端性能分析常见内容

前端性能分析是前端开发中的重要部分&#xff0c;以下是对前端常考性能分析题目的详解&#xff1a; 一、性能指标 前端性能优化的核心目标是提升用户体验&#xff0c;常见的性能指标包括&#xff1a; 加载时间&#xff08;Load Time&#xff09;&#xff1a;指从用户发出请求…

Spring Boot 配置JPA数据库主从读写分离失败及解决办法

因为是老项目, Spring Boot 是1.4, 使用 AbstractRoutingDataSource 来做主从切换, 配置切面类在进入事务时切换成主库, 但实际运行起来却失败, 写操作路由到了从库 查了很多文章, 试了很多方法都无效, 包括修改注解 Transactional 的 propagation 属性, 清空主从标记等等 打…

从小白开始的动态规划

一、动态规划的核心思想 动态规划&#xff08;DP&#xff09;通过拆分问题记忆化计算解决复杂问题&#xff0c;核心步骤为&#xff1a; 定义状态&#xff1a;用变量&#xff08;如dp[i]&#xff09;表示子问题的解 状态转移方程&#xff1a;建立子问题之间的关系式 初始化&a…

ASP.NET Core SignalR的协议协商

SignalR支持多种服务器推送方式&#xff1a;Websocket、Server-Sent Events、长轮询。默认按顺序尝试。F12查看协商过程。websocket和HTTP是不同的协议&#xff0c;为什么能用同一个端口。在【开发人员工具】的【网络】页签中看WebSocket通信过程。 协议协商问题 集群中协议协…

关于SoC产品介绍:ICNM8001

这个是一款关于QHD显示器&#xff0c;主控芯片scaler IC。 功能&#xff1a;支持2路HDMI2.0接收机 支持HDCP1.4和HDCP2.22&#xff1b;支持HDCP1.4和HDCP2.2。 分辨率&#xff1a;2560*1440 刷新率&#xff1a;75Hz HDR&#xff1a;HDR10 封装&#xff1a;QFP216 接口&#xf…

如何在WPF中实现软件内嵌效果

1.创建Process进程&#xff0c;设置运行程序文件路径 Process proc new Process(); proc.StartInfo.FileName "C:\Users\hdevelop.exe"; proc.Start(); proc.WaitForInputIdle(); 2.根据创建的进程获取窗口句柄 IntPtr hWnd proc.MainWindowHandle; 3.开启线程…