场景描述
目标:使用 Temporal 管理和调度一组 Couchbase SQL 脚本来完成以下任务:
同步数据:从其他数据源同步数据到 Couchbase。
执行数据聚合:运行统计 SQL 查询。
清理过期数据:定期清理 Couchbase 中过期或无效的数据。
提供任务失败的自动重试、定时调度、任务状态跟踪。
实现步骤
- Couchbase SQL 脚本准备
sync_data.sql:
INSERT INTO bucket-name
(KEY, VALUE)
SELECT META().id, new_data.*
FROM source-bucket
new_data
WHERE META().id NOT IN (SELECT RAW META().id FROM bucket-name
);
aggregate_data.sql:
SELECT category, COUNT(*) AS count
FROM bucket-name
WHERE type = “product”
GROUP BY category;
cleanup_expired_data.sql:
DELETE FROM bucket-name
WHERE expiration_date < NOW_STR();
- 安装依赖
确保安装了以下库:
pip install temporalio couchbase
- Temporal Workflow 和 Activity 实现
Temporal 的核心是 Workflow(描述流程)和 Activity(执行具体任务)。
Activity 实现
创建一个 Activity,用来执行 SQL 脚本。
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
Couchbase Activity
class CouchbaseActivities:
def init(self, couchbase_host, username, password):
self.cluster = Cluster(
f’couchbase://{couchbase_host}',
ClusterOptions(PasswordAuthenticator(username, password))
)
self.query_service = self.cluster.query_indexes()
def execute_sql(self, sql_file_path):with open(sql_file_path, 'r') as file:query = file.read()result = self.query_service.query(query)print(f"Executed SQL from {sql_file_path}: {result}")return result
Workflow 定义
定义一个 Workflow,描述任务的执行顺序。
from temporalio import workflow
@workflow.defn
class CouchbaseWorkflow:
@workflow.run
async def run(self):
activities = workflow.ActivityStub(CouchbaseActivities)
# 1. 同步数据await activities.execute_sql('/path/to/sync_data.sql')# 2. 数据聚合await activities.execute_sql('/path/to/aggregate_data.sql')# 3. 清理过期数据await activities.execute_sql('/path/to/cleanup_expired_data.sql')
Worker 实现
将 Workflow 和 Activity 注册到 Temporal Worker。
from temporalio.worker import Worker
from couchbase_activities import CouchbaseActivities
from couchbase_workflow import CouchbaseWorkflow
async def main():
worker = Worker(
host=“localhost:7233”, # Temporal 服务地址
task_queue=“couchbase_task_queue”,
workflows=[CouchbaseWorkflow],
activities=[CouchbaseActivities(“localhost”, “username”, “password”)]
)
await worker.run()
if name == “main”:
import asyncio
asyncio.run(main())
- Workflow 启动代码
使用 Temporal 客户端启动 Workflow。
from temporalio.client import Client
async def main():
client = await Client.connect(“localhost:7233”)
# 启动 Workflow
handle = await client.start_workflow(CouchbaseWorkflow.run,id="couchbase_sql_workflow",task_queue="couchbase_task_queue",
)
print(f"Started workflow with ID: {handle.id}")
if name == “main”:
import asyncio
asyncio.run(main())
Temporal 的特性应用
任务调度:
Temporal 支持定时任务。可以通过 Temporal.schedule 定义定时运行的 Workflow。
自动重试:
每个 Activity 都可以配置重试策略。
from temporalio import activity
@activity.defn(retry_policy=activity.RetryPolicy(max_attempts=3))
async def execute_sql(sql_file_path):
…
任务依赖:
Workflow 中通过顺序执行 Activity 描述任务依赖关系。
可观察性:
使用 Temporal Web 界面查看 Workflow 和 Activity 的运行状态、历史和日志。
使用 Temporal 的优势
高可靠性:即使 Worker 崩溃,Workflow 的状态也能持久化并恢复。
灵活调度:支持定时任务和动态控制流程。
自动重试:内置的失败重试和错误处理机制。
开发简便:通过 Python SDK,快速实现分布式任务调度和管理。
通过 Temporal,Couchbase SQL 脚本的执行不仅具备高可用性和自动化,还可以轻松应对复杂的业务逻辑需求。