使用 Temporal 管理和调度 Couchbase SQL 脚本的实际例子

news/2024/12/12 3:47:49/

场景描述

目标:使用 Temporal 管理和调度一组 Couchbase SQL 脚本来完成以下任务:

同步数据:从其他数据源同步数据到 Couchbase。

执行数据聚合:运行统计 SQL 查询。

清理过期数据:定期清理 Couchbase 中过期或无效的数据。

提供任务失败的自动重试、定时调度、任务状态跟踪。

实现步骤

  1. Couchbase SQL 脚本准备

sync_data.sql

INSERT INTO bucket-name (KEY, VALUE)
SELECT META().id, new_data.*
FROM source-bucket new_data
WHERE META().id NOT IN (SELECT RAW META().id FROM bucket-name);

aggregate_data.sql

SELECT category, COUNT(*) AS count
FROM bucket-name
WHERE type = “product”
GROUP BY category;

cleanup_expired_data.sql

DELETE FROM bucket-name
WHERE expiration_date < NOW_STR();

  1. 安装依赖

确保安装了以下库:

pip install temporalio couchbase

  1. Temporal Workflow 和 Activity 实现

Temporal 的核心是 Workflow(描述流程)和 Activity(执行具体任务)。

Activity 实现

创建一个 Activity,用来执行 SQL 脚本。

from couchbase.cluster import Cluster, ClusterOptions

from couchbase.auth import PasswordAuthenticator

Couchbase Activity

class CouchbaseActivities:
def init(self, couchbase_host, username, password):
self.cluster = Cluster(
f’couchbase://{couchbase_host}',
ClusterOptions(PasswordAuthenticator(username, password))
)
self.query_service = self.cluster.query_indexes()

def execute_sql(self, sql_file_path):with open(sql_file_path, 'r') as file:query = file.read()result = self.query_service.query(query)print(f"Executed SQL from {sql_file_path}: {result}")return result

Workflow 定义

定义一个 Workflow,描述任务的执行顺序。

from temporalio import workflow

@workflow.defn

class CouchbaseWorkflow:
@workflow.run
async def run(self):
activities = workflow.ActivityStub(CouchbaseActivities)

    # 1. 同步数据await activities.execute_sql('/path/to/sync_data.sql')# 2. 数据聚合await activities.execute_sql('/path/to/aggregate_data.sql')# 3. 清理过期数据await activities.execute_sql('/path/to/cleanup_expired_data.sql')

Worker 实现

将 Workflow 和 Activity 注册到 Temporal Worker。

from temporalio.worker import Worker

from couchbase_activities import CouchbaseActivities

from couchbase_workflow import CouchbaseWorkflow

async def main():
worker = Worker(
host=“localhost:7233”, # Temporal 服务地址
task_queue=“couchbase_task_queue”,
workflows=[CouchbaseWorkflow],
activities=[CouchbaseActivities(“localhost”, “username”, “password”)]
)
await worker.run()

if name == “main”:
import asyncio
asyncio.run(main())

  1. Workflow 启动代码

使用 Temporal 客户端启动 Workflow。

from temporalio.client import Client

async def main():
client = await Client.connect(“localhost:7233”)

# 启动 Workflow
handle = await client.start_workflow(CouchbaseWorkflow.run,id="couchbase_sql_workflow",task_queue="couchbase_task_queue",
)
print(f"Started workflow with ID: {handle.id}")

if name == “main”:
import asyncio
asyncio.run(main())

Temporal 的特性应用

任务调度:

Temporal 支持定时任务。可以通过 Temporal.schedule 定义定时运行的 Workflow。

自动重试:

每个 Activity 都可以配置重试策略。

from temporalio import activity
@activity.defn(retry_policy=activity.RetryPolicy(max_attempts=3))
async def execute_sql(sql_file_path):

任务依赖:

Workflow 中通过顺序执行 Activity 描述任务依赖关系。

可观察性:

使用 Temporal Web 界面查看 Workflow 和 Activity 的运行状态、历史和日志。

使用 Temporal 的优势

高可靠性:即使 Worker 崩溃,Workflow 的状态也能持久化并恢复。

灵活调度:支持定时任务和动态控制流程。

自动重试:内置的失败重试和错误处理机制。

开发简便:通过 Python SDK,快速实现分布式任务调度和管理。

通过 Temporal,Couchbase SQL 脚本的执行不仅具备高可用性和自动化,还可以轻松应对复杂的业务逻辑需求。


http://www.ppmy.cn/news/1554391.html

相关文章

如何在 cPanel 中创建子域名:分步指南

cPanel 是一个用于管理网站的工具&#xff0c;操作界面简单直观&#xff0c;常用于管理网站的各种功能&#xff0c;包括创建子域名。很多知名的网络服务提供商&#xff0c;如 Hostease&#xff0c;都提供了 cPanel 管理工具。 本文将详细介绍如何在 cPanel 中创建子域名&#x…

如何将CSDN的文章保存为PDF?

目录 1、打开CSDN文章2、按F12或者鼠标右键选择检查并进入控制台3、在控制台输入以下代码4、然后回车&#xff08;Enter&#xff09;如果纵向显示不全就横向 1、打开CSDN文章 2、按F12或者鼠标右键选择检查并进入控制台 3、在控制台输入以下代码 (function(){ $("#side&q…

鸿蒙面试---1208

HarmonyOS 三大技术理念 分布式架构&#xff1a;HarmonyOS 的分布式架构使得设备之间能够无缝协同工作。例如&#xff0c;它允许用户在不同的智能设备&#xff08;如手机、平板、智能手表等&#xff09;之间共享数据和功能。比如&#xff0c;用户可以在手机上开始编辑文档&…

如何解决压测过程中JMeter堆内存溢出问题

如何解决压测过程中JMeter堆内存溢出问题 背景一、为什么会堆内存溢出&#xff1f;二、解决堆内存溢出措施三、堆内存参数应该怎么调整&#xff1f;四、堆内存大小配置建议 背景 Windows环境下使用JMeter压测运行一段时间后&#xff0c;JMeter日志窗口报错“java.lang.OutOfMe…

优化移动端H5:常见问题与解决方案

移动端H5开发中的“坑”与解决方案 本文介绍了开发中遇到的几个关于移动端H5开发中的小问题&#xff0c;以及解决的方法。 一、iOS滑动不流畅问题 在iOS设备上&#xff0c;H5页面的滑动效果有时会出现不流畅的情况&#xff0c;特别是在页面高度超过一屏时。这通常是由于iOS的…

GO泛型

泛型是goSDK1.18版本之后才引入的新特性&#xff0c;即C中的模板。 为什么要有泛型&#xff1f; 我们现在要写一个两数相加的函数&#xff0c;相加的逻辑很简单&#xff0c;但是如果传入不同的类型&#xff0c;那么我们就需要再写一个函数&#xff0c;定义不同的参数类型&#…

2021高等代数【南昌大学】

证明多项式 f ( x ) = 1 + x + x 2 2 ! + ⋯ + x n n ! f(x) = 1 + x + \frac{x^2}{2!} + \cdots + \frac{x^n}{n!} f(x)=1+x+2!x2​+⋯+n!xn​ 无重根。f ( x ) − f ′ ( x ) = x n n ! f(x) - f(x) = \frac{x^n}{n!} f(x)−f′(x)=n!xn​ ( f ( x ) , f ′ ( x ) ) = ( f (…

【Unity高级】如何动态调整物体透明度

本文介绍了如何设置及动态调整物体的透明度。 一、手动设置的方法 我们先来看下如何手动设置物体的透明度。 物体的透明与否是通过材质来设置的。只有我们把具有透明度的材质指给物体的渲染器&#xff08;Render&#xff09;&#xff0c;物体就被设置成相应的透明度了。 看一…