3.基于 Temporal 的 Couchbase 动态 SQL 执行场景

devtools/2024/12/26 8:40:38/

在使用 TemporalGo 语言 调用 Couchbase 执行 SQL 脚本时,可以通过动态参数传递到 SQL 脚本中,以下是完整实现的指南:


1. Temporal Workflow 的参数传递

Temporal 的 Workflow 支持接收动态参数,将这些参数传递给执行 SQL 的 Activity。在 Go 中可以通过结构体或键值对传递参数。


2. Couchbase N1QL 示例

Couchbase 支持通过 REST API 或 SDK 执行 N1QL 查询,并且可以使用参数化查询来防止 SQL 注入。


3. 工作流和 Activity 的结构

以下是实现动态参数化执行 Couchbase SQL 的完整示例:

Workflow 定义
package workflowsimport ("context""time""go.temporal.io/sdk/workflow"
)// QueryParameters 定义 SQL 参数结构
type QueryParameters struct {StartTime string `json:"start_time"`EndTime   string `json:"end_time"`BrandID   int    `json:"brand_id"`
}// Workflow 定义
func SQLWorkflow(ctx workflow.Context, params QueryParameters) error {ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute,}ctx = workflow.WithActivityOptions(ctx, ao)// 调用 Activity 执行 SQLerr := workflow.ExecuteActivity(ctx, ExecuteSQLActivity, params).Get(ctx, nil)if err != nil {return err}return nil
}

Activity 定义
package activitiesimport ("context""encoding/json""fmt""log""github.com/couchbase/gocb/v2" // Couchbase Go SDK
)// QueryParameters 定义 SQL 参数结构
type QueryParameters struct {StartTime string `json:"start_time"`EndTime   string `json:"end_time"`BrandID   int    `json:"brand_id"`
}// ExecuteSQLActivity 用于执行 Couchbase 的 SQL 脚本
func ExecuteSQLActivity(ctx context.Context, params QueryParameters) error {// 初始化 Couchbase 集群连接cluster, err := gocb.Connect("couchbase://localhost", gocb.ClusterOptions{Username: "username",Password: "password",})if err != nil {return fmt.Errorf("failed to connect to Couchbase: %w", err)}defer cluster.Close()// 定义 SQL 查询query := `SELECTpStatDate,brandId,storeId,SUM(incomeAmount) AS incomeAmt,SUM(refundAmount) AS refundAmt,SUM(actualAmount) AS actualAmtFROM ` + "`origin`.`dwd_meal_order`.`fact_batch_payment_summary`" + `WHERE modifyTime >= $startTime AND modifyTime < $endTime AND brandId = $brandIdGROUP BY pStatDate, brandId, storeId`// 执行 N1QL 查询,绑定参数rows, err := cluster.Query(query, &gocb.QueryOptions{NamedParameters: map[string]interface{}{"startTime": params.StartTime,"endTime":   params.EndTime,"brandId":   params.BrandID,},})if err != nil {return fmt.Errorf("failed to execute query: %w", err)}defer rows.Close()// 解析查询结果for rows.Next() {var result map[string]interface{}err := rows.Row(&result)if err != nil {log.Printf("failed to read row: %v", err)continue}// 输出结果jsonResult, _ := json.Marshal(result)log.Printf("Result: %s", jsonResult)}if err := rows.Err(); err != nil {return fmt.Errorf("query execution error: %w", err)}return nil
}

启动 Workflow
package mainimport ("context""log""go.temporal.io/sdk/client""go.temporal.io/sdk/worker""your_project/activities""your_project/workflows"
)func main() {// 创建 Temporal 客户端c, err := client.Dial(client.Options{})if err != nil {log.Fatalf("unable to create Temporal client: %v", err)}defer c.Close()// 创建 Workerw := worker.New(c, "sql-task-queue", worker.Options{})w.RegisterWorkflow(workflows.SQLWorkflow)w.RegisterActivity(activities.ExecuteSQLActivity)// 启动 Workererr = w.Run(worker.InterruptCh())if err != nil {log.Fatalf("unable to start Worker: %v", err)}// 启动 Workflowparams := workflows.QueryParameters{StartTime: "2024-10-01T00:00:00Z",EndTime:   "2024-10-31T23:59:59Z",BrandID:   123,}workflowOptions := client.StartWorkflowOptions{ID:        "sql-workflow-id",TaskQueue: "sql-task-queue",}we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.SQLWorkflow, params)if err != nil {log.Fatalf("unable to execute Workflow: %v", err)}log.Printf("Workflow started: %s", we.GetID())
}

关键点解析

  1. 参数化 SQL 查询

    • 使用 Couchbase N1QL 的命名参数($paramName)传递动态参数。
    • 在 Activity 中通过 NamedParameters 绑定参数值。
  2. 动态 Workflow 参数传递

    • 在 Workflow 中定义参数结构(如 QueryParameters),通过 Temporal SDK 将参数传递到 Activity。
  3. Activity 执行 SQL

    • Activity 中初始化 Couchbase 连接,执行 SQL 查询,并处理返回结果。
  4. 动态配置时间范围和条件

    • StartTimeEndTime 是动态参数,Workflow 启动时通过输入指定。

运行结果

  • Temporal Workflow 启动时,动态传递参数(如时间范围 StartTimeEndTime 和条件 BrandID)。
  • Workflow 中调用 Activity 执行 SQL 查询,查询结果记录到日志中,或者返回给调用方。

这种实现方式灵活可靠,适用于基于 Temporal 的 Couchbase 动态 SQL 执行场景。


http://www.ppmy.cn/devtools/145491.html

相关文章

【动态规划】按摩师

题目链接&#xff1a; 面试题 17.16. 按摩师 - 力扣&#xff08;LeetCode&#xff09; 1、状态表示 用两个dp表&#xff0c;分别表示到当前位置接受预约和不接受预约 f[i]&#xff1a;表示到 i 位置&#xff0c;接受预约的最优预约集合 g[i]&#xff1a;表示到 i 位置&…

Highcharts 饼图:数据可视化利器

Highcharts 饼图&#xff1a;数据可视化利器 引言 在数据可视化的领域中&#xff0c;饼图作为一种经典且直观的图表类型&#xff0c;被广泛应用于各种行业和场景中。Highcharts&#xff0c;作为一个功能强大且易于使用的JavaScript图表库&#xff0c;为我们提供了创建交互式和…

分布式事务解决方案seata和MQ

seata之XA模式 特点&#xff1a;强一致性、会锁定资源。 seata之AT模式 seata之TCC模式 特点&#xff1a;对代码有侵入 MQ解决分布式事务 特点&#xff1a;效率高、实时性差 分布式事务的消息幂等 1、tokenredis保证幂等 2、分布式锁 分布式任务调度

(八)循环神经网络_门控循环单元GRU

一、提出背景 2014年提出&#xff0c;主要针对LSTM模型计算比较复杂容易出现梯度消失或爆炸的问题进行改进。 二、与LSTM的区别 三、网络结构 1. 整体结构 2. 重置门 3. 更新门 4. 候选隐状态 5. 隐状态 四、总结

Android基于Path的addRoundRect,Canvas剪切clipPath简洁的圆形图实现,Kotlin(2)

Android基于Path的addRoundRect&#xff0c;Canvas剪切clipPath简洁的圆形图实现&#xff0c;Kotlin&#xff08;2&#xff09; import android.content.Context import android.graphics.BitmapFactory import android.graphics.Canvas import android.graphics.Path import a…

在 CentOS 系统上安装 ClickHouse

在 CentOS 系统上安装 ClickHouse 数据库相对简单&#xff0c;可以通过官方提供的安装包来进行。以下是详细的安装步骤。 1. 更新系统 首先&#xff0c;确保你的系统是最新的&#xff0c;更新软件包和系统库&#xff1a; sudo yum update -y2. 安装依赖库 ClickHouse 需要一…

Leetcode 394-字符串解码

给定一个经过编码的字符串&#xff0c;返回它解码后的字符串。 编码规则为: k[encoded_string]&#xff0c;表示其中方括号内部的 encoded_string 正好重复 k 次。注意 k 保证为正整数。 你可以认为输入字符串总是有效的&#xff1b;输入字符串中没有额外的空格&#xff0c;且…

JVM简介—垃圾回收器和内存分配策略

1.垃圾回收概述 2.如何判断对象存活 (1)引用计数算法 给对象添加一个引用计数器&#xff0c;每当一个地方引用它时就将计数器加1&#xff0c;当引用失效时就将计数器减1&#xff0c;任何时刻计数器为0的对象都不再被使用。 这种算法简单&#xff0c;但是有个致命的缺点&#xf…