Flink高手之路6-Flink四大基石

news/2024/11/25 17:28:53/

文章目录

  • Flink四大基石
    • 一、Flink的四大基石
      • 1. Checkpoint
      • 2. State
      • 3. Time
      • 4. Window
    • 二、案例
      • 1.需求
      • 2.代码实现
      • 3.运行,查看结果
      • 4.增加需求2的实现
      • 5.重启程序,查看结果

Flink四大基石

一、Flink的四大基石

Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

image-20230420181709188

1. Checkpoint

这是Flink最重要的一个特性。

Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。

Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。

Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。

https://zhuanlan.zhihu.com/p/53482103

2. State

提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。

3. Time

除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。

4. Window

另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

二、案例

官网API:https://nightlies.apache.org/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example

在这里插入图片描述

1.需求

基于时间的滚动和滑动窗口

nc -lk 9999

有如下的数据表示:

信号灯的编号和通过该信号灯的车的数量

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,5

需求1:每10秒钟统计一次,最近10秒内,各个路口通过红绿灯的汽车的数量,也就是基于时间的滚动窗口

需求2:每10秒钟统计一次,最近20秒内,各个路口通过红绿灯的汽车的数量,也就是基于时间的滑动窗口

2.代码实现

package cn.edu.hgu.bigdata20.flink.window;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** description:flink的Window演示* author 王* date 2023/04/20*/
public class FlinkWindowDemo {public static void main(String[] args) throws Exception {// 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.sourceDataStream<String> socketDS = env.socketTextStream("hadoop001", 9999);// 3. Transformation//将9,3转为CartInfo(9,3)SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {@Overridepublic CartInfo map(String value) throws Exception {String[] arr = value.split(",");return new CartInfo(arr[0], Integer.parseInt(arr[1]));}});// 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口有数据)SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("count");//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class CartInfo {private String sensorId;//信号灯idprivate Integer count;//通过该信号灯的车的数量}
}

3.运行,查看结果

输入数据:

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5

在这里插入图片描述

4.增加需求2的实现

package cn.edu.hgu.bigdata20.flink.window;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** description:flink的Window演示* author 王* date 2023/04/20*/
public class FlinkWindowDemo {public static void main(String[] args) throws Exception {// 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.sourceDataStream<String> socketDS = env.socketTextStream("hadoop001", 9999);// 3. Transformation//将9,3转为CartInfo(9,3)SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {@Overridepublic CartInfo map(String value) throws Exception {String[] arr = value.split(",");return new CartInfo(arr[0], Integer.parseInt(arr[1]));}});// 需求1:每10秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("count");// 需求2:每10秒钟统计一次,最近20秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10))).sum("count");//4.Sink//result.print();result1.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class CartInfo {private String sensorId;//信号灯idprivate Integer count;//通过该信号灯的车的数量}
}

5.重启程序,查看结果

输入数据:

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5
9,3
9,2
9,7
4,9
2,6
1,5

结果:
在这里插入图片描述


http://www.ppmy.cn/news/49815.html

相关文章

奇异值分解SVD

概念 奇异值分解&#xff08;singular value decomposition&#xff09;是线性代数中一种重要的矩阵分解。奇异值分解在某些方面与对称矩阵或厄密矩阵基于特征向量的对角化类似。然而这两种矩阵分解尽管有其相关性&#xff0c;但还是有明显的不同。对称矩阵特征向量分解的基础…

CorelDRAW2023最新版本图像设计软件

CorelDRAW 2023作为最新版的图像设计软件,在功能上做了较大提升,主要新的功能特性如下: 1. 全新界面设计:采用简约现代的 UI 设计,菜单和工具重新组织,更加直观易用。提供自动提示与设计指导,易于上手。 2. 智能工具与提示:运用 AI技术对用户操作行为和设计习惯进行分析,给出…

【Seata】Seata配置上传Nacos

前言 在seata1.4.2版本之后&#xff0c;Seata可以通过dataId配置seata的所有配置项&#xff0c;也就是说&#xff0c;我们可以将之前上传的所有配置项整合到一个配置文件中&#xff0c;接下来直接演示该怎样操作。 如果你不清楚seata的config.txt文件在哪里下载或者其它的一些…

基于html+css的图片展示20

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

Flutter 布局探索 | 如何分析尺寸和约束

theme: cyanosis 前言 本文来分享一下&#xff0c;通过查看源码和布局信息解决的一个实际中的布局小问题&#xff0c;也希望通过本文的分享&#xff0c;当你遇到布局问题时&#xff0c;可以靠自己的脑子和双手解决问题。 如下所示&#xff0c;将 TextField 作为 AppBar 组件的 …

Linux DNS服务

DNS 作用 DNS是 域名系统 的英文缩写&#xff0c;作为将域名与IP地址相互映射的一个分布式数据库&#xff0c;让人可以通过域名访问互联网 正向解析 将域名解析为IP反向解析 根据IP查找对应域名 域名结构 http://www.sina.com.cn./ http://主机名.子域.二级域.顶级域.根域 / 每…

美颜sdk开发实践:如何构建美颜sdk功能?

美颜功能是现今很多应用中必不可少的一项功能。而要实现美颜功能&#xff0c;除了自己编写美颜算法外&#xff0c;还可以使用美颜sdk来实现。本文将介绍如何从零开始构建美颜功能&#xff0c;利用美颜sdk实现美颜效果。 一、简介 美颜sdk可以向用户提供多种美颜效果&#xff…

瑞吉外卖项目——读写分离

读写分离 读和写所有压力都由一台数据库承担&#xff0c;压力大数据库服务器磁盘损坏则数据丢失&#xff0c;单点故障 Mysql主从复制 介绍 MySQL主从复制是一个异步的复制过程&#xff0c;底层是基于Nysql数据库自带的二进制日志功能。 就是一台或多台MysQL数据库&#xf…