flink---window

ops/2024/10/9 0:44:29/

Window介绍

DataStream:

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/datastream/operators/windows/

SQL:

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/queries/window-tvf/

1、为什么需要Window? 

在实时计算领域, 经常会有如下的需求:

每隔xx时间, 计算最近xx时间的数据,

如:

每隔10min,计算最近24h的热搜词

每隔5s,计算最近1min的股票行情数据

每隔10min,计算最近1h的广告点击量

....

这些实时需求的实现就需要借助窗口!

2、Window有哪些控制属性? 

为了完成上面提到的需求, 需要使用窗口来完成, 但是窗口需要有如下的属性才可以

窗口的长度(大小): 决定了要计算最近多长时间的数据

窗口的间隔: 决定了每隔多久计算一次

 3、基于时间的滑动和滚动窗口

窗口的长度(大小) > 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】

 

窗口的长度(大小) = 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】

窗口的长度(大小) < 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】

滚动窗口 Tumble (DataStream Tumbling Window)

滑动窗口 HOP (DataStream Sliding Window)

累积窗口 Cumulate (DataStream没有)

  在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。这种特殊的窗口就叫作“累积窗口”(Cumulate Window),它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

        这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1 小时的累积窗口。注意第三个参数为步长 step ,第四个参数则是最大窗口长度。

4、processTime Window 

/**** {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}**///窗口触发的条件  1. 系统时间大于等于窗口的结束时间   2. 窗口内有数据
//滚动窗口 TUMBLE(TABLE KafkaTable, DESCRIPTOR(event_time), INTERVAL '10' SECOND)
//滑动窗口 :每隔10秒,计算最近10秒数据。统计每个用户在最近10秒消费的次数和总金额
CREATE TABLE KafkaTable ( `username`    STRING, `price`       INT, `event_time`  as proctime()     -- 计算列
) WITH ( 'connector' = 'kafka', 'topic' = 'topic1', 'properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' 
);
select username,window_start,window_end,count(*) cnt,sum(price) total_price
from table(TUMBLE(TABLE KafkaTable, DESCRIPTOR(event_time), INTERVAL '10' SECOND))
group by username,window_start,window_end;//滑动窗口:每隔30秒,计算最近1分钟每隔用户消费次数和消费总金额。
CREATE TABLE KafkaTable ( `username`    STRING, `price`       INT, `event_time`  as proctime()     -- 计算列
) WITH ( 'connector' = 'kafka', 'topic' = 'topic1', 'properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' 
);
select username,window_start,window_end,count(*) cnt,sum(price) total_price
from table(HOP(TABLE KafkaTable, DESCRIPTOR(event_time), INTERVAL '30' SECOND, INTERVAL '60' SECOND))
group by username,window_start,window_end;


http://www.ppmy.cn/ops/105314.html

相关文章

我该如何使用DevEco Studio进行开发呢

使用DevEco Studio进行鸿蒙&#xff08;HarmonyOS&#xff09;开发是一个涉及多个步骤和组件的过程。以下是一个简要的指南&#xff0c;帮助你开始使用DevEco Studio进行鸿蒙应用开发&#xff1a; 1. 安装DevEco Studio 下载与安装&#xff1a;首先&#xff0c;你需要从华为开…

网站如何针对不同的DDOS进行防御?

建设网站租用服务器是多数企业及个人的选择&#xff0c;一个安全稳定的服务器对网站的重要性无需再赘述。要保证服务器租用的安全和稳定&#xff0c;除了需要服务器自身有强大的硬、软件基础之外&#xff0c;还需要防范外部的一些因素&#xff0c;常见的就是各种网络攻击&#…

【Netty】实战:基于Http的Web服务器

目录 一、实现ChannelHandler 二、实现ChannelInitializer 三、实现服务器启动程序 四、测试 本文来实现一个简单的Web服务器&#xff0c;当用户在浏览器访问Web服务器时&#xff0c;可以返回响应的内容给用户。很简单&#xff0c;就三步。 一、实现ChannelHandler pack…

万字长文:大模型从入门到实战

第1节 引言与基础知识 1. 什么是AI文案创作&#xff1f; 嘿&#xff0c;朋友们&#xff01;有没有觉得写文案就像是在给朋友写一封充满情感的长信&#xff1f;好吧&#xff0c;AI文案创作就是在这封长信中找个聪明的帮手&#xff0c;来个“AI写手”&#xff01;想象一下&…

前后端分离项目实战-通用管理系统搭建(前端Vue3+ElementPlus,后端Springboot+Mysql+Redis)第七篇:菜单和路由动态绑定

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

浅析JVM invokedynamic指令和Java Lambda语法|得物技术

一、导语 尽管近年来JDK的版本发布愈发敏捷&#xff0c;当前最新版本号已经20&#xff0c;但是日常使用中&#xff0c;JDK8还是占据了统治地位。 你发任你发&#xff0c;我用Java8&#xff1a;【Jetbrains】2023 开发者生态系统现状 - https://www.jetbrains.com/zh-cn/lp/dev…

【数论 状态机dp】2572. 无平方子集计数

本文涉及知识点 C动态规划 数论 质数、最大公约数、菲蜀定理 LeetCode 2572. 无平方子集计数 给你一个正整数数组 nums 。 如果数组 nums 的子集中的元素乘积是一个 无平方因子数 &#xff0c;则认为该子集是一个 无平方 子集。 无平方因子数 是无法被除 1 之外任何平方数整…

Docker通信全视角:原理、实践与技术洞察

一、引言 在云计算和微服务架构日益成熟的今天&#xff0c;Docker作为一种轻量级的容器化技术&#xff0c;已成为现代软件开发和部署的关键组件。Docker容器通过为应用程序提供隔离的运行环境&#xff0c;不仅显著提升了部署效率&#xff0c;而且增强了系统的可移植性和安全性。…