【图解大数据技术】流式计算:Spark Streaming、Flink

news/2024/9/15 4:54:17/ 标签: 大数据, spark, flink, big data, 分布式

【图解大数据技术】流式计算:Spark Streaming、Flink

  • 批处理 VS 流式计算
  • Spark Streaming
  • Flink
    • Flink简介
      • Flink入门案例
      • Streaming Dataflow
    • Flink架构
      • Flink任务调度与执行
      • task slot 和 task
    • EventTime、Windows、Watermarks
      • EventTime
      • Windows
      • Watermarks

批处理 VS 流式计算

计算存储介质上的大规模数据,这类计算叫大数据批处理计算。数据是以批为单位进行计算,比如一天的访问日志、历史上所有的订单数据等。这些数据通常通过 HDFS 存储在磁盘上,使用 MapReduce 或者 Spark 这样的批处理大数据计算框架进行计算,一般完成一次计算需要花费几分钟到几小时的时间。

在这里插入图片描述

还有一种是针对实时产生的大规模数据进行即时计算处理,比如摄像头采集的实时视频数据、淘宝实时产生的订单数据等。实时处理最大的不同就是这类数据,是实时传输过来的针对这类大数据的实时处理系统也叫大数据流计算系统。

在这里插入图片描述

Spark Streaming

在这里插入图片描述

Spark是一个批处理大数据计算引擎,而 Spark Steaming 则利用了 Spark 的分片和快速计算的特性,把实时传输过来的数据按时间范围进行分段,转成一个个的小批,再交给 Spark 去处理。因此 Spark Streaming 的原理是流转批,Spark Streaming 不是真正意义上的实时计算框架,它是一个准实时的计算框架。

Flink

Flink简介

Flink 和 Spark Streaming 不一样,Flink 一开始设计就是为了做实时流式计算的。它可以监听消息队列获取数据流,也可以用于计算存储在 HDFS 等存储系统上的数据(Flink 把 这些静态数据当做数据流来进行处理)。

在这里插入图片描述

然后 Flink 计算后生成的结果流,也可以发送到其他存储系统。

在这里插入图片描述

Flink入门案例

    public static void main(String[] args) throws Exception {// 初始化一个流执行环境final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 利用这个执行环境构建数据流 DataStream(source操作)DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));// 执行各种数据转换操作(transformation)DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});// 打印结果(sink类型操作)adults.print();// 执行env.execute();}

在这里插入图片描述

首先构建一个执行环境env,然后通过执行环境env构建数据流DataStream(这就是source操作),然对这个数据流进行各种转换操作(transformation),最后跟上一个sink类型操作(类似是Spark的action操作),然后调用env的execute()启动计算。

上面是流计算的例子,如果要进行批计算,则要构建ExecutionEnvironment类型的执行环境,然后使用ExecutionEnvironment执行环境构建一个DataSet。

在这里插入图片描述

Streaming Dataflow

Flink程序代码会被映射为Streaming Dataflow(类似于DAG)。一个Streaming Dataflow是由一组Stream(流)和Operator(算子)组成,并且始于一个或多个Source Operator,结束于一个或多个Sink Operator,中间有一个或多个Transformation Operator。

Source Operator:

        DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));

Transformation Operator:

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});

Sink Operator:

	adults.print();

在这里插入图片描述

由于Flink是分布式并行的,因此在程序执行期间,一个Stream流会有多个Stream Partition(流分区),一个Operator也会有多个Operator Subtask(算子子任务)。

在这里插入图片描述

两个 operator 之间传递的时候有两种模式:

  • One to One 模式:像Source到map这种传递模式,不会改变数据的分区特性。
  • Redistributing (重新分配)模式:像map到keyBy这种传递模式,会根据key的hashcode进行重写分区,改变分区特性的。

Flink还会进行优化,将紧密度高的算子结合成一个Operator Chain(算子链)。

在这里插入图片描述

比如Source操作和map操作可以结合成一个Operator Chain,结合成Operator Chain后就在一个task中由一个thread完成。

Flink架构

Flink任务调度与执行

在这里插入图片描述

  1. 我们的代码会被Flink解析成一个DAG图,当我们调用env.execute()方法后,该DAG图就会被打包通过Akka客户端发送到JobManager。
  2. JobManager会通过调度器,把task调度到TaskManager上执行。
  3. TaskManager接收到task后,task将会在一个task slot中执行。

task slot 和 task

我们看到在TaskManager上有一个个的task slot被划分出来,task slot的数量是在TaskManager创建之初就设置好的。每个task(正确来说应该是subtask)都会调度到一个task slot上执行。task slot的作用主要是进行内存隔离,比如TaskManager设置了3个task slot的数量,那么每个task slot占用TaskManager三分之一的内存,task在task slot执行时,task与task之间将不会有内存资源竞争的情况发生。

在这里插入图片描述

EventTime、Windows、Watermarks

由于Flink处理的是流式计算,数据是以流的形式源源不断的流过来的,也就是说数据是没有边界的,但是对数据的计算必须在一个范围内进行,比如实时统计高速公路过去一个小时里的车流量。

在这里插入图片描述

那么就需要给源源不断流过来的数据划分边界,我们可以根据时间段或数据量来划分边界。

如果要按照时间段来划分边界,那么是通过时间字段进行划分。

EventTime

在这里插入图片描述

Flink有三种类型的时间:

  • Event Time
  • Ingestion Time
  • Processing Time

一般用的较多的时Event Time,因为Event Time是固定不变的,不管什么时候计算,都会得到相同的输出结果。

Windows

有了时间字段后,就可以根据时间划分时间窗,比如下面就是划分1分钟为一个时间窗,然后就可以对时间窗内的数据做计算。

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

TumblingEventTimeWindows是滚动时间窗:

在这里插入图片描述

还有SlidingEventTimeWindows滑动时间窗:

// 没10秒计算前1分钟窗口内的数据
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))

在这里插入图片描述

以及EventTimeSessionWindows会话时间窗:

// 间隔超过5s的话,下一达到的事件在新的窗口内计算,否则在同一窗口内计算
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))

上面设置的会话时间窗表示如果两个事件间的间隔超过5秒,那么后一个事件就会在新的窗口中计算;如果两个事件间隔没有超过5秒,那么就在同一窗口内计算。

在这里插入图片描述

Watermarks

但是事件流并不一定是有序的,它有可能是无序,有可能早发生的事件反而比晚发生的事件更晚到达。这时Flink需要等待较早发生的事件都到达了,才能进行一个时间窗的计算。

但是Flink无法得知什么时候边界内的所有事件都达到,因此必须有一种机制控制Flink什么时候停止等待。

这时候就要使用watermarks ,Flink接收到每一条数据时,会使用watermark生成器根据EventTime计算出一个watermark然后插入到数据中。当我们设置watermark的延迟时长是t时,那么watermark就等于当前所有达到数据中的EventTime中的最大值(maxEventTime)减去时间t,代表EventTime在 maxEventTime - t 之前的数据都已达到,结束时间为 maxEventTime - t 的时间窗可以进行计算。

在这里插入图片描述

比如上面的例子,我们设置wartemark的延时时间t为2,那么当EventTime为7的事件到达时,该事件的watermark就是5(maxEventTime = 7, t = 2, watermark = maxEventTime - t = 7 - 2 = 5),那么表示Flink认定EventTime在5或5之前的时间都已经达到了,那么如果有一个窗口的结束时间为5的话,该窗口就会触发计算。

watermarks的使用:

DataStream<Event> stream = ...;WatermarkStrategy<Event> strategy = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.timestamp);DataStream<Event> withTimestampsAndWatermarks =stream.assignTimestampsAndWatermarks(strategy);

当然,使用了watermarks之后,也不一定就能保证百分之一百准确。当我们把延时时间t设置的较短时,就能获取更低的延迟,但是准确性也相对下降;而如果我们把t设的较大,那么延迟就更大,但是准确性就想对较高。


http://www.ppmy.cn/news/1475982.html

相关文章

UnityECS学习中问题及总结entityQuery.ToComponentDataArray和entityQuery.ToEntityArray区别

在Unity的ECS&#xff08;Entity Component System&#xff09;开发中&#xff0c;entityQuery.ToComponentDataArray<T>(Allocator.Temp) 和 entityQuery.ToEntityArray(Allocator.Temp) 是两种不同的方法&#xff0c;用于从实体查询中获取数据。除了泛型参数之外&#…

AI人工智能开源大模型生态体系分析

人工智能开源大模型生态体系研究 "人工智能开源大模型生态体系研究报告v1.0"揭示&#xff0c;AI(A)的飞速发展依赖于三大核心&#xff1a;数据、算法和算力。这一理念已得到业界广泛认同&#xff0c;三者兼备才能推动AI的壮大发展。随着AI大模型的扩大与普及&#xf…

iPhone恢复篇:如何从iPhone恢复误删除的照片

iPhone用户喜欢它的相机。使用其引人注目的功能捕捉昂贵的记忆是 iOS 设备的最大吸引力之一。但是&#xff0c;与任何其他手机一样&#xff0c;数码照片也可能会从iPhone中丢失。 如果丢失的照片包括您珍贵的回忆——假期、婚礼、您孩子的一岁生日或朋友的聚会&#xff0c;那么…

python-Web

FLASK整体框架: from flask import Flask,render_templateapp Flask(__name__)app.route("/show/info")#网址 def index():#网址对应的函数return render_template("index.html")#falsk 支持将字符串写入文件if __name____main__:app.run()#访问网站的时…

springboot1——快速构建项目

需求 第一步&#xff1a;创建maven工程(非web项目) 第二步&#xff1a;导入起步依赖 点击&#xff1a; 下拉复制&#xff1a; 粘贴&#xff1a;&#xff01;&#xff01;这是springboot工程需要继承的父工程 下拉复制&#xff1a; 粘贴&#xff1a;&#xff01;&#xf…

大白话之SpringMVC中的ModelAndView

在Spring MVC框架中&#xff0c;ModelAndView是一个非常关键的类&#xff0c;它用于封装控制器(Controller)方法执行后的两个重要输出&#xff1a;模型数据(Model)和视图(View)。 让我们用大白话来解释一下&#xff1a; 想象一下&#xff0c;你去一家餐厅点餐。服务员&#x…

Qt 多语言

记录Qt多语言的实现过程 目录 1.项目配置文件.pro配置 2.程序中的字符串用tr()封装 3.生成翻译文件 4.使用Qt语言家修改翻译文件 4.1使用Qt语言家打开 4.2 .更改文件配置 5. 生成qm文件 6.代码执行切换语言 6.1入口处 6.2 事件执行 0.效果 1.项目配置文件.pro配置 T…

电脑只有一个C盘怎么办?

在日常使用电脑的过程中&#xff0c;不少用户会遇到电脑只有一个C盘的情况。C盘作为系统盘&#xff0c;既要运行操作系统&#xff0c;又要安装各种软件和存放用户文件&#xff0c;时间一长&#xff0c;C盘就容易爆满&#xff0c;导致系统运行缓慢&#xff0c;甚至出现崩溃。那么…

Calibration相机内参数标定

1.环境依赖 本算法采用张正友相机标定法进行实现&#xff0c;内部对其进行了封装。 环境依赖为 ubuntu20.04 opencv4.2.0 yaml-cpp yaml-cpp安装方式&#xff1a; &#xff08;1&#xff09;git clone https://github.com/jbeder/yaml-cpp.git #将yaml-cpp下载至本地 &a…

开源科学工程技术软件

目录 0 参考链接 1 Silx 2 Klampt 3 参数化三维3D软件Dune 3D 4 GPS日志文件查看器GPXSee 5 三维3D软件Chili3D 6 集成电路设计软件XicTools 7 天文学软件Cosmonium 8 计算流体力学软件FluidX3D 9 点云处理软件CloudCompare 10 野外火灾建模软件WindNinja 11 电子设…

DVC+Minio

由于参数文件比较大&#xff0c;因此onnx、engine等大文件弃用LFS管理&#xff0c;改用dvc管理&#xff1a; minio就是存储用的 启动miniosudo netstat -ntpl#查看端口号 sudo kill -9 $(sudo lsof -i:5061 -t) 关闭端口对应进程 ./minio server --console-address ":6570…

昇思25天学习打卡营第二十四天|基于MindSpore通过GPT实现情感分类

基于MindSpore通过GPT实现情感分类 导入数据集 import osimport mindspore from mindnlp._legacy.engine import Evaluator, Trainer from mindnlp._legacy.engine.callbacks import BestModelCallback, CheckpointCallback from mindnlp._legacy.metrics import Accuracy fr…

【Vue3】4个比较重要的设计模式!!

大家好,我是CodeQi! 一位热衷于技术分享的码仔。 在我投身于前端开发的职业生涯期间,曾有一次承接了一个大型项目的维护工作。此项目运用的是 Vue 框架,然而其代码结构紊乱不堪,可维护性极度糟糕😫。 这使我深刻领会到,理解并运用 Vue 中的重要设计模式是何等关键! …

#if defined(WEBRTC_USE) webrtc.a的宏机制

#ifndef是 if not define的缩写,#ifdef 是 if define 的缩写。 define有两种,一种是单纯宏定义,一种是定义宏为特定值。 #define WEBRTC_USE_H264#defined WEBRTC_USE_H264 11.1定义值用作预处理: #define A 0

NAT地址转换+多出口智能选路,附加实验内容

本章主要讲&#xff1a;基于目标IP、双向地址的转换 注意&#xff1a;基于目标NAT进行转换 ---基于目标IP进行地址转换一般是应用在服务器端口映射&#xff1b; NAT的基础知识 1、服务器映射 服务器映射是基于目标端口进行转换&#xff0c;同时端口号也可以进行修改&…

堆、栈和队列(数据结构)

堆、栈和队列&#xff08;数据结构&#xff09; 这里写目录标题 堆、栈和队列&#xff08;数据结构&#xff09;**栈****队列**堆&#xff08;Heap&#xff09;&#xff08;&#xff09;队列&#xff08;Queue&#xff09;&#xff08;FIFO&#xff09;栈&#xff08;Stack&…

【C++】15.二叉搜索树

一、二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值若它的右子树不为空&#xff0c;则右子树上所有节点的值都大于根节点的值它的左…

vue中缩放比的使用

大屏适用性比较大&#xff0c;后台系统不推荐 抽组件&#xff0c;scaleScreen <template><divid"screen":style"{width: ${style.width}px,height: ${style.height}px,transform: ${style.transform},}"><slot ></slot></div&g…

leetcode:2833. 距离原点最远的点(python3解法)

难度&#xff1a;简单 给你一个长度为 n 的字符串 moves &#xff0c;该字符串仅由字符 L、R 和 _ 组成。字符串表示你在一条原点为 0 的数轴上的若干次移动。 你的初始位置就在原点&#xff08;0&#xff09;&#xff0c;第 i 次移动过程中&#xff0c;你可以根据对应字符选择…

debian固定ip

debian固定ip 前言 安装好的Debian系统后&#xff0c;为了确保每次登陆的ip不变&#xff0c;需要固定 方法 命令如下 ip addr | grep inet因为有有线网和无线网 2 种连接方式&#xff0c;因此需要区别。 其中 enp 的是有线&#xff0c;wlp 的是无线 查看网关 IP 命令如下 …