Flink集群批作业实践:七析BI批作业执行

server/2024/12/28 15:39:00/

目录

背景

Flink架构介绍

JobManager

TaskManager

Flink集群模式的选择

Flink集群资源提供者的选择

Flink作业的提交

Flink作业项目开发

user jar准备

作业提交


背景

市场上比较常见的大数据批处理分布式计算引擎有Spark、MapReduce和Hive等,而把Flink当作批作业的执行引擎相对来说没那么主流。

不过,因为七析BI本身已经使用了FlinkCDC作为CDC的工具,从统一技术栈(Flink是一个流批一体化引擎)、减少学习成本和维护成本的角度考虑,我们团队选择了Flink来执行我们的批作业。

七析BI简介

七析BI是一款嵌入式自助 BI 平台,无需代码,通过简单的拖拉拽即可实现数据可视化大屏、丰富多样的报表。其中,“数据探索”功能是一个让用户在数据分析前对源数据进行过滤、分组、关联等操作的准备过程。

本文所提及的对Flink的使用,就是通过Flink的批处理能力将源表根据数据探索的逻辑来生成一个新的表。

七析BI数据探索功能示例图

Flink架构介绍

分布式计算引擎架构常见的核心组件通常有资源管理层、调度层和执行层等,Flink也不例外。

Flink集群的进程分为两大类: JobManager 和 TaskManager 。

Flink架构图(来自Flink官方)

JobManager

JobManager负责Flink集群的作业管理、资源管理和作业调度。

  • 作业管理

每一个Flink作业都会根据userJar和运行时的数据生成一个JobGraph,Flink会为每一个作业生成一个JobMaster来管理这个job的整个生命周期。

  • 资源管理

一个job可以分为多个task,负责执行这些task的是TaskManager中的slot。JobManager负责管理所有TaskManger上的slot的状态,使其可以方便地为每个task分配合适的资源。

  • 调度

Flik提供了一系列REST API来接收客户端的请求,用于管理作业、监控集群基本状态等。

在JobManager高可用模式下向Flink集群提交作业时,接收到REST请求的JobManger节点会将任务发送给Master节点执行。

TaskManager

TaskManger是Flink作业的实际执行单位。一个作业对应的JobGraph可分为多个task,而一个task又可包含一个或多个算子。故此也可认为算子链组成了task。

每一个TaskManger从逻辑上又分为多个slot,每一个slot可以同时执行一个task,但一个slot可以同步执行多个算子(算子链)。

每一个slot都会分配一个线程执行,所以slot是Flink作业的最小执行单位,而task slot的总数量代表整个Flink集群的并行度。

算子链-flink的一个优化,将多个算子放在同一个slot中执行可以减少数据在多个slot中传输导致的花销

TaskManager中slot、task、算子链的关系(来自Flink官方)

Flink群模式的选择

Flink的作业分为流作业和批作业两大类。其次,Flink的使用者们所处理的数据量和对数据的可用性要求都会存在差异。以上情况就会使得开发者们对Flink集群的需求有较大的差异。

面对这种差异,Flink提供了多种集群模式:

  • Flink Application集群

集群的生命周期与作业的生命周期一致。作业独享整个集群的资源。集群启动时需要指定一个 userJar,因为作业的入口从 userJar 的 main 方法开始,所以每个集群只能执行一个作业。

  • Flink Job集群

集群的生命周期与作业的生命周期一致。作业独享整个集群的资源。集群管理器或客户端为每一个作业启动一个集群,集群只能运行该作业。

  • Flink Session集群

集群的生命周期与作业无关。用户可向集群提交多个作业,所以资源的隔离性较差,存在资源竞争的情况。适合执行时间段、对稳定性要求不高的常见。

首先,现阶段七析BI的批作业都是以小数据量为主,执行时间在3分钟以内。其次,每一个作业对失败的容忍度较高(失败时不会影响用户使用,只是数据相对旧且可重试)。

Application模式和Job模式都需要为每个作业启动一个独立的集群执行。虽然资源的隔离性更高了,但时间和机器资源的花销更大了,显然是不合适的。

而使用Session模式时,集群启动时不用跟用户代码绑定,客户端可以不断地向集群提交不同的作业。由于不同的作业可以共享集群的资源,所以只需要有一个高可用的集群即可满足生产使用。运维成本和机器资源占用相对其他方式而言都是最低的。

所以,综合以上几点,Session模式是最适合我们的模式。

Flink集群资源提供者的选择

Flink的资源提供者有以下几种:

  • Standalone

Standalone集群部署、扩容、恢复和升级完全依赖人工,对运维的压力较大,不太适合大规模的生产化部署。

  • YARN

YARN模式通过YARN作为Flink的资源管理和调度程序,可以实现自动化资源管理、弹性伸缩和自动恢复等功能。适合团队本身有使用YARN的背景和大规模的生产化部署。

  • Native Kubernetes

Flink集成了Kubernetes的API,可以通过Flink脚本将集群部署到已有的Kubernetes集群上。有了Kubernetes,高可用、自动化资源管理等能力实现起来也非常方便。适合有Kubernetes和Flink经验的团队,也是官方推荐的部署方式一。

  • Flink Kubernetes Operator

Flink社区提供的独立项目,可通过operator以云原生的方式管理Flink集群。可自动处理Flink集群的部署、资源伸缩、升级,以及蓝绿发布、滚动升级等高级功能,是未来的趋势。适合有Kubernetes经验,已经进行大规模生产化部署的团队。

结合使用规模和运维成本,七析BI选择的是Native Kubernetes的方式。它部署简单、自动化程度高,在我们的场景下用起来还是挺方便的。

详细的部署步骤这里就不多说了,可以直接参考Flink官方文档:

Native Kubernetes | Apache Flink

Flink作业的提交

Flink提交作业的方式大部分都是在集群启动时指定的userJar(流作业)、使用Flink SQL或者通过Flink的本地程序提交。而七析BI都是通过后端服务触发作业提交,如果可以通过http的方式向Flink提交作业,那对客户端来说是最方便的。

Flink提供了一系列用于监控和管理的REST API,包括提交作业和查询作业运行结果。

Flink作业项目开发

  • 通过maven archetype创建项目
mvn archetype:generate                \
  -DarchetypeGroupId=org.apache.flink   \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.20.0

这允许你命名新建的项目,而且会交互式地询问 groupId、artifactId、package 的名字。

  • 项目依赖

创建好的项目的pom文件中已经包含了最基本的Flink Streaming API程序的依赖和打包插件。通常来说,你还需要添加这些依赖:

  1. 其他Flink API | 如Flink Table API
  2. 连接器 | 用于与其他数据源进行数据读写。参考:连接器和格式 | Apache Flink
  3. 单元测试

Flink API依赖

可以参考一下表格选择你要使用的API,添加到你的项目依赖中:

你要使用的 API

你需要添加的依赖项

DataStream

flink-streaming-java

DataStream Scala 版

flink-streaming-scala_2.12

Table API

flink-table-api-java

Table API Scala 版

flink-table-api-scala_2.12

Table API + DataStream

flink-table-api-java-bridge

Table API + DataStream Scala版

flink-table-api-scala-bridge_2.12

user jar准备

通过maven archetype创建的项目中包含了maven-shade-plugin,它的作用是将项目代码和其他依赖打包成一个fat jar。有了这个fat jar后,你就可以将它上传到集群上并执行你的作业了。

存在的问题

在高可用配置下,这个fat jar在每次作业提交时都会被Flink重复上传到文件系统上。这就造成了很多不必要的时间和带宽的花销,所以需要对user jar做一个优化。

user jar拆分

Flink自己有一个lib目录用来放置自身的java代码,我们可以利用这个目录来放置一些我们fat jar中的依赖。

第一步是将原来的fat jar拆分成一个thin jar和lib jar,thin jar是我们的flink程序代码,lib jar是我们程序的其他依赖。

第二,像原来一样,thin jar依然上传到Flink;而lib jar就放到Flink的lib路径。

user jar拆分前后对比

最后,我们需要添加一个maven-jar-plugin用于构建我们的thin jar;原来的maven-shade-plugin用于构造我们lib jar(需要把user code排除掉)。

如下所示:

<!-- 构建 thin jar-->
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.2.0</version><configuration><archive><manifest>                                                                    <mainClass>com.App</mainClass></manifest></archive></configuration>
</plugin><!-- 构建 lib jar-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<!-- 构造所有依赖包放到flink的lib目录下-->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<id>flink-lib-jar</id>                                
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>true</shadedArtifactAttached>
<finalName>flink-app-lib</finalName>
<artifactSet>
<excludes><!--flink 用到的依赖,按需调整-->
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.apache.flink:*</exclude>
<!-- 排除当前工程 -->
<exclude>com.groupId:artifactId</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters></configuration>
</execution>
</executions>
</plugin>

user jar上传

上面构建出的thin jar有两种方式上传到Flink集群:

  • 通过Flink监控页面的上传jar功能
  • 之前把jar包放置在flink保存上传jar的目录下

而lib jar则直接放置到flink的lib目录下。

* 注意:每次更新lib都需要更新镜像并重启。

作业提交

通过http的方式调用Flink的作业运行接口:

/jars/:jarid/run
{"programArgsList":["参数1", "参数2"]
}

jarid是Flink上传目录中对应jar的文件名或者在Flink监控页面查看,如下图:

programArgsList参数放置在request body中,可作为我们程序main方法的入参。

到这里,我们就成功地将我们批作业提交到Flink集群了。

道一云七巧-与你在技术领域共同成长

更多技术知识分享:https://bbs.qiqiao668.com/


http://www.ppmy.cn/server/153948.html

相关文章

盈达智汇是由顶尖团队打造的多元化自治平台

在当今这个日新月异的数字化时代&#xff0c;各类创新平台如雨后春笋般涌现&#xff0c;它们不仅深刻改变了人们的生活方式&#xff0c;更在商业领域掀起了一场前所未有的变革。其中&#xff0c;“盈达智汇”作为一个由顶尖投资与管理团队精心打造的综合性生活平台&#xff0c;…

Flutter将应用打包发布到App Store

使用Flutter将应用打包发布到App Store的详细步骤及流程图&#xff1a; 流程图 #mermaid-svg-X09iOP2FtRxwKsWw {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-X09iOP2FtRxwKsWw .error-icon{fill:#552222;}#mermai…

飞牛 fnos 使用docker部署 OneNav 书签管理器

OneNav简介 OneNav是一款开源的书签管理器&#xff0c;能助力用户轻松管理并访问在线资源。其具备诸多特点&#xff0c;如下&#xff1a; 支持后台管理&#xff1b;支持私有链接&#xff1b;支持Chrome/Firefox/Edge书签批量导入&#xff1b;支持多种主题风格&#xff1b;支持…

网络安全等级保护测评工作流程

一、测评准备活动阶段 首先&#xff0c;被测评单位在选定测评机构后&#xff0c;双方需要先签订《测评服务合同》&#xff0c;合同中对项目范围&#xff08;系统数量&#xff09;、项目内容&#xff08;差距测评&#xff1f;验收测评&#xff1f;协助整改&#xff1f;&#xf…

【ETCD】【实操篇(十七)】 etcd 集群定期维护指南

目录 概述Raft 日志保留键空间历史压缩&#xff1a;v3 API 键值数据库碎片整理空间配额快照备份 概述 为了保持 etcd 集群的可靠性&#xff0c;需要定期进行维护。根据 etcd 应用程序的需求&#xff0c;这些维护通常可以自动化进行&#xff0c;并且不会导致停机或性能显著下降…

路由器刷机TP-Link tp-link-WDR566 路由器升级宽带速度

何在路由器上设置代理服务器&#xff1f; 如何在路由器上设置代理服务器&#xff1f; 让所有连接到该路由器的设备都能够享受代理服务器的好处是一个不错的选择&#xff0c;特别是当需要访问特定的网站或加速网络连接的时候。下面是一些您可以跟随的步骤&#xff0c;使用路由器…

mongodb给不同的库设置不同的密码进行连接

默认的数据库安装之后是没有密码的&#xff0c;是可以直接访问的&#xff0c;但是如果端口不小心暴露出去了&#xff0c;就会存在很大的安全隐患。本节课教大家如何给mongodb设置账号密码进行访问。 设置管理员并密码登录 查看服务 默认安装好的mongodb会自动创建好服务&…

汇编学习笔记

汇编 1. debug指令 -R命令(register) 查看、改变CPU寄存器的内容 r ax 修改AX中的内容 -D命令(display) 查看内存中的内容 -E命令(enter) 改写内存中的内容 -U命令(unassenble反汇编) 将内存中的机器指令翻译成汇编指令 -T命令(trace跟踪) 执行一条机器指令 -A命令…