FlinkSQL State的生命周期

news/2024/10/21 10:00:27/

FlinkSQL未显示配置state生命周期

FlinkSQL默认没有配置state 的过期时间。也就是说默认情况是FlinkSQL从不清除状态。如果状态后端保存在rocksdb中,直到本地磁盘被打满,服务挂掉,报错如下:

java.io.IOException: [bf3ba881614e80c741fb962c87b7d6fd] Failed to fetch BLOB 12264817074958457302144211122648/p-0b2dedc3a7e6bb642714d445695acf07d6374a9a-3c2bf7d6210f764a50df18d0d6a68d02 from x.x.x.x:25511 and store it under /export/tmp/io_tmp_dirs/blobStore-c0670c38-26c0-43fe-b3b4-9c9de34c4520/incoming/temp-00681209at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:168)at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:166)at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:212)at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:297)at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:268)at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:233)at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:393)at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1158)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:715)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:644)at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on deviceat java.io.FileOutputStream.writeBytes(Native Method)at java.io.FileOutputStream.write(FileOutputStream.java:326)at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:146)... 10 more

FlinkSQL state 生命周期配置

对于有状态计算的流连接和分组聚合操作,用户可以通过 STATE_TTL 来指定算子粒度的生命周期,该方式的状态配置优先级大于【table.exec.state.ttl】作业级别的状态优先级配置。

方式1

作业级别设置

-- 单位:ms,1小时
SET table.exec.state.ttl = 3600000

方式2

流连接时配置,默认没有状态

CREATE TABLE my_table (...  
) WITH (  'connector' = '...',  'scan.startup.mode' = 'latest-offset',  'state.ttl' = '3h'  -- 设置状态生存时间为3小时  
);

state.ttl 被设置为 3h,意味着任何状态数据的生存时间超过 3 小时后都会被自动清理

方式3

分组聚合时,hints方式配置,SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。

-- 表名作为 hint 键
SELECT /*+ STATE_TTL('orders' = '30d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders
GROUP BY o_orderkey;-- 别名作为 hint 键
SELECT /*+ STATE_TTL('o' = '30d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders AS o
GROUP BY o_orderkey;-- 查询块作为 hint 键
SELECT /*+ STATE_TTL('tmp' = '30d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM (SELECT o_orderkey, o_totalpriceFROM ordersWHERE o_shippriority = 0) tmp
GROUP BY o_orderkey;

注意:

用户既可以选择表(或视图)名也可以选择别名作为提示键,但在指定别名时需要使用别名。
对于多流连接场景,直接指定每张表的生命周期只会在第一个连接算子的左右流和第二个连接算子的右流上生效(因为流上关联操作是二元的)。如果想为每个连接算子的左右流都指定不同生命周期,需要将查询拆成多个查询块,如下所示。CREATE TEMPORARY VIEW V AS SELECT /*+ STATE_TTL('A' = '1d', 'B' = '12h')*/ * FROM A JOIN B ON...;SELECT /*+ STATE_TTL('V' = '1d', 'C' = '3d')*/ * FROM V JOIN C ON ...;STATE_TTL 提示仅作用在当前查询块上。
当 STATE_TTL 提示键重复时取最后一个值。举例来说,在出现 SELECT /*+ STATE_TTL('A' = '1d', 'A' = '2d')*/ * FROM ... 时,输入 A 的 TTL 值将会取 2d。
当出现多个 STATE_TTL 且提示键重复时取第一个值。举例来说,在出现 SELECT /*+ STATE_TTL('A' = '1d', 'B' = '2d'), STATE_TTL('C' = '12h', 'A' = '6h')*/ * FROM ... 时,输入 A 的 TTL 值将会取 1d。

优先级:

方式2(流连接) 、方式3(分组聚合) > 方式1(作业级别)


http://www.ppmy.cn/news/1427681.html

相关文章

【linux】centos7 开机 进单用户模式修改root密码

本站以分享各种运维经验和运维所需要的技能为主 《python零基础入门》:python零基础入门学习 《python运维脚本》: python运维脚本实践 《shell》:shell学习 《terraform》持续更新中:terraform_Aws学习零基础入门到最佳实战 《k8…

5G网络架构;6G网络架构

目录 5G和6G架构 6G网络架构 5G和6G架构 在设计和功能上有显著的区别,这主要体现在它们各自的核心特点、优势和应用场景上。 5G技术架构的核心特点包括高速率与低延迟、大容量与高密度以及网络切片。高速率与低延迟极大地提升了用户体验,支持更多实时应用和大规模数据传输…

[Kubernetes] etcd的集群基石作用

文章目录 1. 配置存储2. 数据一致性3. 服务发现与协调4. 集群状态中枢5. 集群稳定性 1. 配置存储 etcd作为一个高度可靠的分布式键值存储系统,存储了Kubernetes集群的完整配置和状态数据。集群的元数据,包括节点信息、命名空间、部署、副本集、服务、持…

物联网嵌入式ESP32开发实战,从基础到项目程序开发125例【doc.yotill.com】

一、ESP32基础入门例程开发 物联网嵌入式ESP32开发例程-ESP32实现ADC模拟量信号采集(ESP-IDF VSCode编程) 物联网嵌入式ESP32开发例程-ESP32实现RTC实时时钟(ESP-IDF VSCode编程) 物联网嵌入式ESP32开发例程-ESP32实现USB虚拟串口…

深入Git配置

git配置 git config -h usage: git config [<options>]Config file location--global use global config file--system use system config file--local use repository config file--worktree use per-worktree con…

LeetCode 面试经典150题 219.存在重复元素II

题目&#xff1a;给你一个整数数组 nums 和一个整数 k &#xff0c;判断数组中是否存在两个 不同的索引 i 和 j &#xff0c;满足 nums[i] nums[j] 且 abs(i - j) < k 。如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 思路&#xff1a; 代码…

社交媒体数据恢复:Coco

社交媒体数据恢复&#xff1a;Coco - Live Video Chat HD 见面、聊天、疯狂&#xff01;关于 Coco - 上线并见面&#xff01; Coconut/Coconut是一个优质的直播和视频平台&#xff0c;适合那些希望结识新朋友并通过直播获得更多关注者的人。 Coconut 允许用户单独直播或与朋友…

Day11.一刷数据结构算法(C语言版) 239滑动窗口最大值;347前K个高频元素

今天就两道题&#xff0c;但是有点难&#xff0c;争取理解吧。 一.239滑动窗口最大值 之前讲的都是栈的应用&#xff0c;这次该是队列的应用了。 本题算比较有难度的&#xff0c;需要自己去构造单调队列&#xff0c;建议先看视频来理解。 题目链接/文章讲解/视频讲解&#xff…