0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

news/2024/11/9 0:53:27/

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

http://www.ppmy.cn/news/1189688.html

相关文章

linux NC命令详解及使用

命令简介 NC是netcat的简写,是一个功能强大的网络工具,有着网络界的瑞士军刀美誉。Nc 命令在linux系统中实际命令是ncat,nc是软连接到ncat。NC命令的主要作用如下: 实现任意TCP/UDP端口的侦听,nc可以作为server以TCP…

【牛客网】安全—加密和安全

每日一练 Day1: 1.信息安全的基本属性是( D ) A.保密性 B.完整性 C.可用性,可靠性,可控性 D.A、B、C都是 信息安全的基本属性通常可以归纳为以下几个方面: 保密性(Confidentiality&#xf…

对比解析php和go对JSON处理的区别

一、go 转化php数组代码 php程序 $str <<<EOF {"操作源":"任意","数据库":"任意","语句类型":"CREATE DATABASE&#xff1b;DROP DATABASE&#xff1b;ALTER DATABASE","影响行数":"不…

剑指Offer || 080.组合

题目 给定两个整数 n 和 k&#xff0c;返回 1 ... n 中所有可能的 k 个数的组合。 示例 1: 输入: n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4], ] 示例 2: 输入: n 1, k 1 输出: [[1]]提示: 1 < n < 201 < k < n 注意&#xff1a;本题与主站 …

iOS使用AVCaptureSession实现音视频采集

AVCaptureSession配置采集行为并协调从输入设备到采集输出的数据流。要执行实时音视频采集&#xff0c;需要实例化采集会话并添加适当的输入和输出。 AVCaptureSession&#xff1a;管理输入输出音视频流AVCaptureDevice&#xff1a;相机硬件的接口&#xff0c;用于控制硬件特性…

axios 多个baseURL配置、实现不同前缀代理到不同的服务器的几种方式

前言&#xff1a; 在开发中&#xff0c;有可能遇到每部分的功能的需要调用另一台服务器的地址。这个时候就需要设置不同的请求前缀首先代理到不同的服务器地址。 一、axios封装实例以及代理&#xff1a;(不是完整的封装实例&#xff0c;重点在于baseURL的区别) 文件路径&…

stm32 ETH

1 How do I create a project for STM32H7 with Ethernet and LwIP stack working? STM32 LWIP 接收大数据包导致Hardfault问题解决记录 Trying to get Ethernet, LWIP and FreeRTOS working on the STM32H745. Testing on the NUCLEO-H745ZI-Q using FW_1.7 and the STM32Cub…

力扣 203 移除链表元素

目录 1.解题思路2.代码实现 1.解题思路 利用快慢指针&#xff0c;如果快指针此时的值为val则将此时满指针的next指向快指针的next利用循环&#xff0c;其次&#xff0c;要特意判断链表的头部为val的情况以及链表为空的情况. 2.代码实现 struct ListNode* removeElements(str…