调度系统:使用 Apache Airflow 管理和调度 Couchbase SQL 脚本的实际例子

ops/2024/12/13 21:04:15/

假设场景如下:

每天定时执行一组 Couchbase SQL 脚本,用于数据同步、聚合和清洗。

脚本包括:

同步数据到 Couchbase 集群。

执行数据聚合查询。

清理过期数据。

要求:

支持任务依赖管理。

提供任务失败后的重试机制。

支持日志和运行状态的监控。

使用 Airflow 实现

Airflow 提供了强大的调度和任务依赖管理能力,可以将上述流程定义为一个 DAG(有向无环图)。

  1. 创建 Couchbase SQL 脚本

创建三个 SQL 脚本:

sync_data.sql

INSERT INTO bucket-name (KEY, VALUE)
SELECT META().id, new_data.*
FROM source-bucket new_data
WHERE META().id NOT IN (SELECT RAW META().id FROM bucket-name);

aggregate_data.sql

SELECT category, COUNT(*) AS count
FROM bucket-name
WHERE type = “product”
GROUP BY category;

cleanup_expired_data.sql

DELETE FROM bucket-name
WHERE expiration_date < NOW_STR();

  1. 安装 Couchbase 的 Python 客户端

通过 pip 安装所需的 Couchbase 依赖:

pip install couchbase

  1. 定义 Airflow DAG 和任务

couchbase_workflow.py:

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta

from couchbase.cluster import Cluster, ClusterOptions

from couchbase_core.cluster import PasswordAuthenticator

\

Couchbase 连接函数

def execute_couchbase_query(sql_file_path):
# 连接 Couchbase 集群
cluster = Cluster(
‘couchbase://localhost’,
ClusterOptions(PasswordAuthenticator(‘username’, ‘password’))
)
bucket = cluster.bucket(‘bucket-name’)
query_service = cluster.query_indexes()

# 读取并执行 SQL 脚本
with open(sql_file_path, 'r') as file:query = file.read()
result = query_service.query(query)
print(f"Executed query from {sql_file_path}: {result}")

定义默认参数

default_args = {
‘owner’: ‘admin’,
‘depends_on_past’: False,
‘email_on_failure’: True,
‘email’: [‘admin@example.com’],
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5),
}

定义 DAG

with DAG(
dag_id=‘couchbase_sql_workflow’,
default_args=default_args,
description=‘A workflow to execute Couchbase SQL scripts’,
schedule_interval=‘0 3 * * *’, # 每天凌晨 3 点运行
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[‘couchbase’, ‘sql’],
) as dag:

# 任务 1: 同步数据
sync_data_task = PythonOperator(task_id='sync_data',python_callable=execute_couchbase_query,op_args=['/path/to/sql_scripts/sync_data.sql']
)# 任务 2: 数据聚合
aggregate_data_task = PythonOperator(task_id='aggregate_data',python_callable=execute_couchbase_query,op_args=['/path/to/sql_scripts/aggregate_data.sql']
)# 任务 3: 清理过期数据
cleanup_data_task = PythonOperator(task_id='cleanup_data',python_callable=execute_couchbase_query,op_args=['/path/to/sql_scripts/cleanup_expired_data.sql']
)# 定义任务依赖
sync_data_task >> aggregate_data_task >> cleanup_data_task
  1. 部署 DAG 到 Airflow

将脚本保存为 couchbase_workflow.py 并放置到 Airflow 的 DAG 文件夹中(通常是 /airflow/dags)。

确保 Airflow 服务正常运行:

airflow webserver
airflow scheduler

登录到 Airflow Web 界面,启用并监控 couchbase_sql_workflow。

  1. 优势分析

任务调度:通过 schedule_interval 定时调度任务,支持灵活的 Cron 表达式。

任务依赖管理:通过 >> 定义任务依赖,确保顺序执行。

重试机制:默认支持失败后的自动重试。

可观察性:Airflow 提供任务状态跟踪和日志记录,方便调试和监控。

  1. 扩展优化

参数化 SQL:可在 SQL 中加入参数,通过 PythonOperator 动态替换。

自定义连接器:使用 Airflow 的 Hook 构建更灵活的 Couchbase 连接器。

错误处理:在 Python 函数中捕获异常并记录到外部系统(如日志系统或监控平台)。


http://www.ppmy.cn/ops/141633.html

相关文章

Python+OpenCV系列:图像的几何变换

Python OpenCV 系列&#xff1a;图像的几何变换 引言 在图像处理领域&#xff0c;几何变换是一个非常重要的操作&#xff0c;它可以改变图像的位置、大小、方向或形状。在计算机视觉中&#xff0c;这些操作对于图像预处理、特征提取和图像增强至关重要。本文将介绍如何利用 …

HTML简单贪吃蛇游戏

1.功能说明&#xff1a; 游戏网格&#xff1a;一个20x20的网格&#xff0c;每个格子的大小为20x20像素。 蛇的移动&#xff1a;玩家可以通过方向键&#xff08;左、上、右、下&#xff09;控制蛇的移动。 食物生成&#xff1a;食物会在随机位置生成&#xff0c;当蛇吃到食物时…

CentOS8或docker镜像centos8更换镜像源

因为 CentOS 8 已经结束生命周期&#xff0c;原来的镜像源不可用了。我们需要将镜像源改为 CentOS 8 的替代源。 在容器中运行以下命令&#xff1a; 首先备份原有的源 cd /etc/yum.repos.d/ mkdir backup mv *.repo backup/ 创建新的源文件 cat > /etc/yum.repos.d/Cent…

npm淘宝镜像证书过期

前言 使用 npm 报错&#xff1a; npm ERR! request to https://registry.npm.taobao.org/xxx failed, reason: certificate has expired 错误原因&#xff1a; 早在 2021 年&#xff0c;淘宝就发文称&#xff0c;npm 淘宝镜像已经从 http://registry.npm.taobao.org 切换到了 h…

Linux 常用命令大全:文件管理、系统信息、网络操作

Linux 系统提供了丰富的命令行工具&#xff0c;用于各种操作和管理任务。以下是一些常用的 Linux 命令及其简要说明&#xff1a; 文件和目录操作 ls - 列出目录内容 ls -l /path/to/directorycd - 更改目录 cd /path/to/directorypwd - 显示当前工作目录 pwdmkdir - 创建目录 …

设置笔记本同时连接内外网

原理&#xff1a;通过笔记本和手机相连&#xff0c;实现双网卡功能能。笔记本连接内网wifi、同时手机端开启usb网络共享&#xff0c;笔记本就有了两个网&#xff0c;然配置那个访问外网&#xff0c;那个访问内网。 1.笔记本wifi连接内网wifi 2.手机端共享网络。 手机打开 -【…

Android系统(android app和系统架构)

文章目录 AndroidAndroid Apps四大组件 Android系统Platform API之下&#xff1a;一个微笑内核adb(Android Debug Bridge) Android包管理机制Android的Intent机制参考 Android LinuxFrameworkJVM 在Linux/Java上做了个二次开发&#xff1f;并不完全是&#xff1a;Android定义…

算法日记48 day 图论(拓扑排序,dijkstra)

今天继续图论章节&#xff0c;主要是拓扑排序和dijkstra算法。 还是举例说明。 题目&#xff1a;软件构建 117. 软件构建 (kamacoder.com) 题目描述 某个大型软件项目的构建系统拥有 N 个文件&#xff0c;文件编号从 0 到 N - 1&#xff0c;在这些文件中&#xff0c;某些文件…