Flink使用SQL Gateway提交SQL Job到远程集群

news/2024/12/25 1:51:28/

从Flink 1.16.0开始集成了SQL Gateway功能,提供了多种客户端远程并发执行SQL的能力。不用再使用提交jar包的方式来创建任务了。

我是使用filnk 1.17.1版本。

官网关于SQL Gateway的讲解:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql-gateway/overview/

SQL Gateway提交作业的执行后端可以是Flink的standalone集群或者是Yarn集群。

我这里的环境是standalone,然后我只用了一台机器,主要是为了测试SQL Gateway提交SQL Job

flink下载解压到服务上之后就可以启动了。在主节点执行$FLINK_HOME/bin/start-cluster.sh,启动集群。

关闭standalone集群可以执行$FLINK_HOME/bin/stop-cluster.sh。

集群成功启动之后可以接着启动sql-gateway。执行:

$FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx

其中-Dsql-gateway.endpoint.rest.address用来指定SQL Gateway服务绑定的地址。注意如果指定为localhost则SQL Gateway只能通过本机访问,无法对外提供服务。可以设置成0.0.0.0。SQL Gateway服务日志文件在$FLINK_HOME/log目录中。 

可以执行$FLINK_HOME/bin/sql-gateway.sh -h获取sql-gateway.sh命令更多的使用方式

Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]commands:start               - Run a SQL Gateway as a daemonstart-foreground    - Run a SQL Gateway as a console applicationstop                - Stop the SQL Gateway daemonstop-all            - Stop all the SQL Gateway daemons-h | --help         - Show this help message

sql-gateway-配置">SQL Gateway 配置

可以通过如下方式动态指定SQL Gateway的配置项

$ ./sql-gateway -Dkey=value

 官网给出的配置项列表如下:

KeyDefaultTypeDescription
sql-gateway-session-check-interval">sql-gateway.session.check-interval
1 minDuration定时检查空闲 session 是否超时的间隔时间,设置为 0 时关闭检查。
sql-gateway-session-idle-timeout">sql-gateway.session.idle-timeout
10 minDurationsession 超时时间,在这个时间区间内没有被访问过的 session 会被关闭。如果设置为 0,session 将不会被关闭。
sql-gateway-session-max-num">sql-gateway.session.max-num
1000000IntegerSQL Gateway 服务中存活 session 的最大数量。
sql-gateway-session-plan-cache-enabled">sql-gateway.session.plan-cache.enabled
falseBoolean设置为 true 的时候,SQL Gateway 会在一个 session 内部缓存并复用 plan。
sql-gateway-session-plan-cache-size">sql-gateway.session.plan-cache.size
100IntegerPlan cache 的大小, 当且仅当 `table.optimizer.plan-cache.enabled` 为 true 的时候生效。
sql-gateway-session-plan-cache-ttl">sql-gateway.session.plan-cache.ttl
1 hourDurationPlan cache 的 TTL, 控制 cache 在写入之后多久过期, 当且仅当 `table.optimizer.plan-cache.enabled` 为 true 的时候生效。
sql-gateway-worker-keepalive-time">sql-gateway.worker.keepalive-time
5 minDuration空闲工作线程的存活时间。当工作线程数量超过了配置的最小值,超过存活时间的多余空闲工作线程会被杀掉。
sql-gateway-worker-threads-max">sql-gateway.worker.threads.max
500IntegerSQL Gateway 服务中工作线程的最大数量。
sql-gateway-worker-threads-min">sql-gateway.worker.threads.min
5IntegerSQL Gateway 服务中工作线程的最小数量。

Rest API

前面部署过程中SQL Gateway默认是以Rest API的形式提供服务,这里直接讲解使用方式。假设在我们的测试环境SQL Gateway运行的IP和端口为sql-gateway-ip:8083。

首先执行:

curl --request POST http://sql-gateway-ip:8083/v1/sessions

创建并获取到一个sessionHandle。示例返回如下:

{"sessionHandle":"2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef"}

然后以执行SQL SELECT 1语句为例。格式为:

curl --request POST http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'

我们替换sessionHandle为上面返回的sessionHandle,实际命令如下: 

curl --request POST http://sql-gateway-ip:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/statements/ --data '{"statement": "SELECT 1"}'

 得到的返回值包含一个operationHandle,如下所示:

{"operationHandle":"7dcb0266-ed64-423d-a984-310dc6398e5e"}

最后我们使用sessionHandle和operationHandle来获取运行结果。格式为:

curl --request GET http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0

其中最后一个0为token。可以理解为查询结果是分页(分批)返回,token为页码。

替换sessionHandle和operationHandle为前面获取的真实值,实际命令如下:

curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/0

得到结果如下:

{"results":{"columns":[{"name":"EXPR$0","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":[1]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1"}

我们从result -> data -> fields 可以得到SELECT 1的运行结果为1。

前面提到token的作用类似于分页。上面JSON的nextResultUri告诉我们获取下一批结果的URL。发现token从0变成了1。我们访问这个nextResultUri:

curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1

返回如下内容:

{"results":{"columns":[{"name":"EXPR$0","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[]},"resultType":"EOS","nextResultUri":null}

 可以看到resultType为EOS,表示所有结果都已经获取到了。此时nextResultUri为null,没有下一页结果。


http://www.ppmy.cn/news/1547305.html

相关文章

单片机 外部中断实验 实验三

实验三 外部中断实验 实验目的 1、掌握MCS-51单片机外部中断的原理。 2、掌握MCS-51单片机外部中断程序的设计方法及其过程。 3、掌握MCS-51单片机外部中断的电路应用。 实验任务 利用外部中断方式,实现通过按键切换流水灯的流向。流水灯形式自定&#xff…

Rust实战项目与未来发展——跨平台应用开发项目实践

第十章:实战项目与未来发展 第一节:跨平台应用开发项目实践 随着移动设备、桌面设备和Web平台之间界限的模糊,跨平台应用开发已成为开发者日常工作中不可或缺的一部分。随着技术栈的不断演进,开发者有更多选择来构建高效、易维护…

[AI] 从游戏到现实:强化学习的应用与挑战

随着AI技术的快速发展,强化学习(Reinforcement Learning, RL)逐渐成为人工智能领域的一个重要分支。尤其是在游戏领域,RL展示了极大的潜力:它可以在没有预先标记的数据情况下,通过智能体的互动和反馈自主学习。然而,强化学习的影响力远远超越了游戏本身,它的理念和方法…

风险数据集市整体架构及技术实现

引言 在当今大数据时代,风险数据集市作为金融机构的核心基础设施之一,扮演着至关重要的角色。它不仅为银行、保险等金融机构提供了全面、准确的风险数据支持,还帮助这些机构实现了风险管理的精细化和智能化。本文将深入探讨一种基于大数据La…

Leetcode 791 Custom Sort String

题意:给定两个字符串,第一个字符串order,给定字符出现的先后顺序。 第二个字符串需要按照第一个字符串的顺序重新排列。没有在order字符串中出现的数组随意排列 https://leetcode.com/problems/custom-sort-string/ 解答:先根据…

IP数据云 识别和分析tor、proxy等各类型代理

在网络上使用代理(tor、proxy、relay等)进行访问的目的是为了规避网络的限制、隐藏真实身份或进行其他的不正当行为。 对代理进行识别和分析可以防止恶意攻击、监控和防御僵尸网络和提高防火墙效率等,同时也可以对用户行为进行分析&#xff…

【系统设计】理解带宽延迟积(BDP)、吞吐量、延时(RTT)与TCP发送窗口的关系:优化网络性能的关键

在设计和优化网络性能时,理解 带宽延迟积(BDP)、吞吐量、延时(RTT) 和 TCP发送窗口 之间的关系至关重要。这些概念相互影响,决定了网络连接的性能上限,尤其是在高带宽、高延迟的环境中&#xff…

javascript实现sha512和sha384算法(支持微信小程序),可分多次计算

概述: 本人前端需要实现sha512和sha384计算的功能,最好是能做到分多次计算。 本文所写的代码在现有sha512和sha384的C代码,反复测试对比计算过程参数,成功改造成sha512和sha384的javascript代码,并成功验证好分多次计算…