调度系统:Prefect 结合 Couchbase SQL 执行和调度 SQL 查询脚本

news/2024/12/11 17:04:14/

使用 Prefect 管理和调度 Couchbase SQL 脚本的实际例子可以帮助你自动化数据提取、转换和加载(ETL)任务,尤其是当你需要执行复杂的 SQL 查询,处理来自 Couchbase 数据库的数据时。以下是一个例子,展示如何使用 Prefect 结合 Couchbase SQL 执行和调度 SQL 查询脚本。

场景:

我们假设你需要从 Couchbase 数据库中提取数据,执行 SQL 查询,然后对结果进行转换或清洗,最后将处理后的数据存储到文件或另一个数据库中。这个过程可以通过 Prefect 管理和调度。

步骤:

安装 Prefect 和 Couchbase 客户端:

pip install prefect couchbase

编写任务和工作流: 在下面的示例中,我们将使用 Prefect 定义三个任务:

从 Couchbase 执行 SQL 查询。

对查询结果进行 数据清洗。

将清洗后的数据 保存到文件(例如 CSV 文件)。

from prefect import task, Flow
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
import pandas as pd

连接到 Couchbase 集群

def get_couchbase_connection():
cluster = Cluster(‘couchbase://localhost’, ClusterOptions(PasswordAuthenticator(‘username’, ‘password’)))
bucket = cluster.bucket(‘your_bucket’)
return bucket

1. 从 Couchbase 执行 SQL 查询

@task
def run_couchbase_query(query: str):
“”“执行 Couchbase N1QL 查询并返回结果”“”
bucket = get_couchbase_connection()
result = bucket.query(query)
# 将结果转换为 Pandas DataFrame
rows = [row for row in result]
return pd.DataFrame(rows)

2. 数据清洗任务

@task
def clean_data(df: pd.DataFrame):
“”“简单数据清洗:删除缺失值和去除空格”“”
df = df.dropna() # 删除缺失值
# 假设某些列是字符串,进行空格去除
df[‘some_column’] = df[‘some_column’].str.strip()
return df

3. 保存数据到文件(例如 CSV)

@task
def save_data_to_file(df: pd.DataFrame, file_path: str):
“”“将数据保存到 CSV 文件”“”
df.to_csv(file_path, index=False)
print(f"数据已保存到 {file_path}")

创建 Prefect 工作流

with Flow(“Couchbase_ETL_Workflow”) as flow:
query = “SELECT * FROM your_bucket WHERE condition = true”
data = run_couchbase_query(query)
cleaned_data = clean_data(data)
save_data_to_file(cleaned_data, “output_data.csv”)

运行工作流

flow.run()

解释:

get_couchbase_connection:

这是一个辅助函数,用于连接到 Couchbase 集群。你需要提供 Couchbase 集群的地址和认证信息(用户名、密码)。在此例中,我们假设连接到本地的 Couchbase 集群。

run_couchbase_query:

该任务负责执行 Couchbase N1QL 查询,从指定的 bucket 中获取数据。查询结果被转换为 Pandas DataFrame 以便后续处理。

clean_data:

对查询结果进行简单的数据清洗操作。我们删除缺失值,并假设某些列是字符串类型,因此进行了去除空格的操作。你可以根据需求进行更复杂的数据清洗。

save_data_to_file:

该任务将清洗后的数据保存为 CSV 文件。你也可以根据需求将其保存到其他存储介质(如数据库、HDFS 等)。

工作流和任务依赖:

任务的依赖关系在 Flow 中定义:首先执行 run_couchbase_query,然后执行 clean_data,最后执行 save_data_to_file。

flow.run():

flow.run() 会触发整个工作流的执行,Prefect 会自动按顺序执行这些任务。

调度任务:

你可以设置 Prefect 工作流的调度功能,让它定期执行,或者通过外部事件触发任务。例如,每天执行一次这个 ETL 流程。

from prefect.schedules import IntervalSchedule

from datetime import timedelta

设置调度,每天运行一次

schedule = IntervalSchedule(interval=timedelta(days=1))

with Flow(“Couchbase_ETL_Workflow”, schedule=schedule) as flow:
query = “SELECT * FROM your_bucket WHERE condition = true”
data = run_couchbase_query(query)
cleaned_data = clean_data(data)
save_data_to_file(cleaned_data, “output_data.csv”)

运行调度器

flow.run()

监控和可视化:

Prefect 提供了一个非常强大的 Web UI,如果你使用 Prefect Cloud 或 Prefect Server,你可以:

查看工作流执行的状态。

跟踪任务执行的日志。

设置任务重试、失败通知等。

查看任务执行的历史数据。

总结:

使用 Prefect 管理和调度 Couchbase SQL 脚本可以帮助你自动化整个 ETL 流程,确保数据从 Couchbase 数据库提取、清洗后可以无缝地加载到目标存储。Prefect 提供了强大的任务调度、容错、监控功能,使得这种数据处理任务能够高效、可靠地执行。


http://www.ppmy.cn/news/1554276.html

相关文章

es实现上传文件查询

es实现上传文件查询 上传文件,获取文件内容base64,使用es的ingest-attachment文本抽取管道转换为文字存储 安装插件 通过命令行安装(推荐) 1.进入 Elasticsearch 安装目录 2.使用 elasticsearch-plugin 命令安装 bin/elastics…

基于spring boot的高校专业实习管理系统的设计与实现

文末获取源码和万字论文,制作不易,感谢点赞支持。 设计题目:基于spring boot的高校专业实习管理系统的设计与实现 摘 要 随着国内市场经济这几十年来的蓬勃发展,突然遇到了从国外传入国内的互联网技术,互联网产业从开…

如何避免缓存击穿?超融合常驻缓存和多存储池方案对比

作者:SmartX 解决方案专家 钟锦锌 很多运维人员都知道,混合存储介质配置可能会带来“缓存击穿”的问题,尤其是大数据分析、数据仓库等需要频繁访问“冷数据”的应用场景,缓存击穿可能会更频繁地出现,影响业务运行。除…

Distance in Tree 树形dp练习(树中两点距离为k的数量板子)

Distance in Tree 题面翻译 题目大意 输入点数为 N N N一棵树 求树上长度恰好为 K K K的路径个数 输入格式 第一行两个数字 N , K N,K N,K,如题意 接下来的 N − 1 N-1 N−1行中,每行两个整数 u , v u,v u,v表示一条树边 ( u , v ) (u,v) (u,v) 输出格式 一个整数 a n…

k8s折腾笔记

k8s折腾笔记 k8s安装、部署、运行demo1.系统环境2.开始安装2.1 先从master节点开始2.2 worker节点 3.遇到的问题4.集群demo k8s安装、部署、运行demo 1.系统环境 两台服务器,都是ubuntu22版本, 一台2核4g,作为master节点 一台2核2g&#xf…

Hyper-V创建虚拟机配置IP等网络配置原理(Linux、Windows为例)

Hyper-V创建虚拟机配置IP等网络配置原理(Linux、Windows为例) 大家知道Windows系统里面内置了Hyper-V管理器,用来创建和管理本地虚拟机环境。今天我创建了两台虚拟机,一台是CentOS7.9(Linux),另…

使用 Streamlit +gpt-4o实现有界面的图片内容分析

在上一篇利用gpt-4o分析图像的基础上,进一步将基于 Python 的 Streamlit 库,结合 OpenAI 的 API,构建一个简洁易用的有界面图片内容分析应用。通过该应用,用户可以轻松浏览本地图片,并获取图片的详细描述。 调用gpt-4o…

springboot系列--拦截器加载原理

一、拦截器加载原理 拦截器是在容器启动时,就创建并加载好,此时并未放入拦截器链中,只是放在一个拦截器集合当中,当一个请求进来之后,会通过匹配路径,查看是否有命中集合中的拦截器的拦截路径,如…