【Flink系列】部署篇(一):Flink集群部署

news/2024/11/14 23:15:37/

主要回答以下问题:

  • Flink集群是由哪些组件组成的?它们彼此之间如何协调工作的?
  • 在Flink中job, task, slots,parallelism是什么意思?集群中的资源是如何调度和分配的?
  • 如何搭建一个Flink集群?如何配置高可用服务?如何使用外部文件系统?

Flink系统架构

在这里插入图片描述

Flink的核心组件包含客户端,jobmanager(JM)和taskmanager™三部分。此外Flink往往还需要结合很多外部组件一起使用,比如高可用服务、持久化存储、资源管理、指标存储与分析的组件。

Flink客户端主要负责将job提交给JM。JM是中央调度器,包含Jobmaster, Dispatcher, ResourceManager三部分。JobMaster is responsible for managing the execution of a single JobGraph. Multiple jobs can run simultaneously in a Flink cluster, each having its own JobMaster. The Dispatcher provides a REST interface to submit Flink applications for execution and starts a new JobMaster for each submitted job. It also runs the Flink WebUI to provide information about job executions. The ResourceManager is responsible for resource de-/allocation and provisioning in a Flink cluster — it manages task slots, which are the unit of resource scheduling in a Flink cluster. TM负责执行具体的任务。

如果只是提交作业和执行作业,不考虑整个集群的稳定性,拓展性,便于维护的性能等,只部署以上三个组件就够了。

但是,如果TM done掉了,JM还可以控制任务重启在其它TM上;如果JM done掉了,所有的任务都将失败,因此我们需要部署高可用服务使得一个JM done掉后,备用的JM 自动地顶上去作业。Flink目前(1.16)仅支持两种高可用服务:Zookeeper HA service 和 K8s HA service.

Flink有故障恢复的机制在任务失败后重启任务,并读取任务失败前的状态在这个状态下继续工作,可以保证哪怕任务失败重启,数据也不丢失,不重发。而这个“任务失败前的状态”是通过checkpoint保存的,考虑到多个JM需要共享checkpoint,checkpoint往往保存在可共享的持久化外部存储系统中,比如HDFS,S3等。因此我们还需要部署文件存储系统。

再说集群的资源管理和调度,Flink支持k8s和YARN两种工具来自动化管理集群资源,也可以不依赖于任何Resource Provider,采用独立部署(standalone)方式部署集群。

再说集群的监控,Flink本身收集了很多指标,可以通过metrics reporter与外部的指标存储、分析、展示工具一起搭建一个Flink监控系统。比如联合Prometheus, grafana搭建监控系统。

Flink的作业执行机制

在讲解Flink不同的部署方式以及不同部署方式下各组件如何协调工作前,我认为很有必要讲解一下Flink的作业执行机制,便于理解之后会反复提到的JobGraph,task, slots等概念。

DataFlows和Operator

程序运行时会被映射为dataflows,每个数据流都是以一个或多个sources开始,一个或多个sinks结束,类似于任意的有向无环图。大多数情况下,程序中的转换运算和dataflow中的算子(operator)是一一对应的关系。

比如下图中的程序就可以转化为由source,map算子,分组聚合算子,sink组成的数据流。
在这里插入图片描述

并行计算和并行度(Parallelism)

在这里插入图片描述

任务并行:不同的任务(算子)并行处理不同的数据,数据流图中横向的同时执行。

数据并行:一个算子可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或容器中完全独立地执行。

并行度:一个特定算子的子任务个数,指的数据并行。有些像多线程的线程数,但和多线程不一样的是,多线程的子线程共享内存资源,但是一个算子的子任务运行在不同的slot上,内存资源是隔离的。注意并行度针对的是算子,不同的算子可以设置为不同的并行度。

并行度的设置:

//全局,不推荐
env.setParallelism(1);
//每一个算子
source.map(...).setParallelism(1)

并行度的执行规则:底层实现>代码局部>代码全局设置>提交任务时的命令行设置>配置文件的默认设置

算子链

上面介绍了数据流图,算子,并行度的概念,再来说什么是算子链。

Flink中算子与算子之间的数据传输形式大体可以分为以下两类:

  • one-to-one(forwarding) 直通:

    从一个算子到另一个算子的分区不变,比如source和map之间,这代表着map算子的子任务看到的元素的个数和顺序和source算子的子任务产生的相同。map,filter,flatMap都属于这种(前提是并行度不变)

  • redistributing(重分配):

    stream的分区会发生改变,如keyby.

如果前后两个算子并行度相同,且传输方式为one-to-one就可以合并为一个算子链。通常我们说的task就是指的一个算子链,subtask往往指的同一算子链的子任务。

算子合并为算子链是作业执行中很重要的一个优化手段,是否合并是可以通过代码控制的,在作业的性能调优中也是一个可以考虑的调优点。

Flink中之所以合并算子主要考虑的是减少算子之间不必要的数据传输,因为在flink中,不同任务之间的数据传输带来的性能开销其实并不小,一是数据传输必然涉及到序列化和反序列,要是一条数据很大,又选择了不合适的数据类型比如json,那带来的性能损耗是非常明显的;二是如果任务处于不同的taskmanager,那数据传输还涉及到网络传输。另外合并算子也减少了整个job的线程数,能够减少线程转化的开销。

需要注意的是,合并算子并不一定能带来性能提升的,因为算子合并其实相当于减少了并发,可能会影响CPU利用率,可以参考多线程的线程数考虑这一点。

执行图(ExecutionGraph)

相关概念介绍完后,简单介绍一下(很多细节还未搞明白,但尚不影响使用)一个Flink作业是如何一步步转化为Taskmanager上可以执行的task的。下面的描述主要针对Session部署方式,对于Application部署模式之后再介绍。

首先,客户端会将代码转化为dataflow,dataflow进一步优化如合并算子链后生成JobGraph。
在这里插入图片描述

然后,JM对JobGraph根据并行度进行拆分生成执行图,
在这里插入图片描述

最后JM会分发执行图到taskmanager上,实际执行的叫物理执行图。

Flink的资源分配和调度

slots是Flink中资源分配的最小单位。Flink对内存资源是进行了隔离的,隔离出来的每一份资源叫一个slot。每个TM通过参数taskmanager.numberOfTaskSlots配置slots的数量。建议根据核的数量分配任务槽,这样一个任务槽就一个cpu核,cpu就不需要分时复用了。默认slots平分整个TM的内存资源,Flink也支持细粒度地划分slots的资源。
在这里插入图片描述

需要配置cluster.fine-grained-resource-management.enabled为true

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a").setCpuCores(1.0).setTaskHeapMemoryMB(100).build();SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b").setCpuCores(0.5).setTaskHeapMemoryMB(100).build();someStream.filter(...).slotSharingGroup("a") // Set the slot sharing group with name “a”
.map(...).slotSharingGroup(ssgB); // Directly set the slot sharing group with name and resource.env.registerSlotSharingGroup(ssgA); // Then register the resource of group “a”

在这里插入图片描述

上面讲的是资源的分配,再讲资源的调度:不同的task如何分配到slots上面。 主要遵守下面两个原则: 同一个任务的不同子任务只能分配到不同的slots上;多个任务可以共享slot。以上图为例,一共3个算子链,并行度分别为6,6,1,每个算子链在slots上依次分配,同一个Job的不同算子链共享slot的。

基于这样的资源调度规则,就不难理解“一个job需要的任务槽的数量至少为算子链的最大并行度“。像上面的示例,需要的任务槽数量就是6。

为什么slots可以共享?不同的task资源完全隔离不好吗?这里主要是从提高资源的利用率考虑的,希望各个内存区域的使用相对均衡,而不是忙的忙死闲的闲死。

Flink的部署

Flink提供了3种部署模式:

  • 会话模式(Session Mode)
  • 单作业模式(Per-Job Mode)
  • 应用模式(Application Mode)

它们的区别主要在于:集群的生命周期以及资源的分配方式;应用的Main方法在哪里执行——客户端还是JobManager。 其中Per-Job模式在1.15版本后已经废弃,就不再介绍了。

会话模式

先启动集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。一个任务导致集群崩溃会牵连其他所有任务。

会话模式适合单个规模小、执行时间短的大量作业。(因为执行时间短,所以单个作业占用的资源很快能释放掉给下一个作业使用,不需要反复启动集群,反复部署资源)

应用模式

应用模式是提交任务的同时启动集群,一个应用一个集群,应用在集群在,应用亡集群自动关闭。此外,应用模式的另一个显著特点是应用的main方法执行在JM,而不是客户端。这样做是为了减轻客户端的负载,避免当多个用户同时提交任务时客户端宕机。

那么,main方法的执行为什么会带来较大的负载呢?执行main方法首先需要下载相关的依赖,还需要抽取拓扑结构(比如JobGraph)便于后续的处理。客户端执行完后还需要把这些都传输给JM。这就使得客户端一是需要格外的网络带宽下载依赖,传输数据给JM; 二是消耗更多的CPU。因此application模式把这部分的工作放在了JM上。

官方推荐在产线上使用应用模式,在测试开发中使用会话模式。

参考资料

  1. official document
  2. B站视频
  3. 尚硅谷课程教材

http://www.ppmy.cn/news/11556.html

相关文章

Linux项目自动化构建工具-make/Makefifile

目录 背景 实例代码 依赖关系 依赖方法 原理 项目清理 可重复执行的依据 背景 会不会写makefile,从一个侧面说明了一个人是否具备完成大型工程的能力 一个工程中的源文件不计数,其按类型、功能、模块分别放在若干个目录中,makefile定义了一系…

华为交换机、路由器设备批量配置端口方法步骤

华为交换机、路由器批量配置端口方法步骤 在现实工作中,如果要对多个端口做同样的配置,每个接口逐一进行相同的配置,很容易出错,而且造成大量重复工作。 配置端口组功能就可以解决这个问题啦。 你只需要将这些以太网接口加入同一…

RocketMQ部署详解

上篇文章已经介绍过RocketMQ,这里就不再写了,下面直入主题,介绍RocketMQ安装 因为RocketMQ是基于Java开发的,所以安装RocketMQ之前,我们需要先安装JDK,因为服务器一般采用Linux,所以本文只介绍基…

【C/C++】静态顺序表详解(附完整源码)

本章内容 1.什么是线性表 2.什么是顺序表 3.静态顺序表结构的定义 4.静态顺序表的函数接口实现 5.静态顺序表的问题及思考 1.什么是线性表 线性表(linear list)是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&…

Mybatis-plus(上)

1.什么是mybatis-plus升级版的mybatis,目的是让mybatis更易于使用, 用官方的话说“为简化而生”官网:https://baomidou.com/初体验按照官网中的快速开始即可1.准备数据库脚本数据库 Schema 脚本如下: DROP TABLE IF EXISTS user; …

pytest-pytest插件之测试覆盖率pytest-cov

简介 测试覆盖率是指项目代码被测试用例覆盖的百分比,使用pytest-cov插件可以统计测试覆盖率 添加链接描述 安装插件pytest-cov pip install pytest-cov用法 基本用法 –cov的参数是要统计代码覆盖率的源码,我将源码放在mysrc中,test_s…

Kotlin的lateinit和by lazy的区别

一、lateinit1.lateinit的使用由于kotlin有严格的语法要求变量需要声明是否可以为null,但由于在实际的业务场景中,这个变量必须在某些时候才能做初始化操作,并且这个变量肯定不为null,如果为null,就是逻辑有问题了。这…

xilinx srio ip学习笔记之再识srio

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 xilinx srio ip学习笔记之再识srio前言SRIO的理解IP核的理解前言 这段时间,随着对SRIO的学习,又有了更深的一点认识,不像一开始这么慌张了…