Flink实战教程从入门到精通(基础篇)(五)Flink部署-YARN运行模式

server/2025/4/1 7:07:10/

        

目录

一、YARN运行模式(重点)

二、相关准备和配置

1、Hadoop集群搭建(HDFS、YARN)

三、会话模式部署

(1)创建会话

(2)打开Flink页面

(3)提交作业 

(4)执行作业

(5)通过命令方式上传

(6)停止会话模式

四、单作业模式 

(1)执行命令提交作业

 (2)在YARN的ResourceManager界面查看执行情况。

(3)进行测试

(4)取消作业 

五、应用模式 

(1)命令行提交

(2)测试

(3)上传HDFS提交

六、历史服务器

1、创建存储目录

flink-config.yaml%E4%B8%AD%E6%B7%BB%E5%8A%A0%E5%A6%82%E4%B8%8B%E9%85%8D%E7%BD%AE-toc" name="tableOfContents" style="margin-left:80px">2、在 flink-config.yaml中添加如下配置

3、启动历史服务器

4、webUI


        国内大部分大数据底层架构是通过hadoop进行构建,hadoop有三大模块:HDFS、YARN、MapReduce。

        HDFS:负责存储大规模数据分布存储在多个节点上,支持数据的可靠读写,适合处理大文件和流式数据访问。

        YARN:负责集群资源的管理和调度,将资源分配给不同的应用程序,使多个应用程序能在同一集群中共享资源并高效运行。  

        MapReduce:一种编程模型和计算框架,用于大规模数据集的并行处理 。

一、YARN运行模式(重点)

             YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。        

二、相关准备和配置

1、Hadoop集群搭建(HDFS、YARN)

可以自行前往官网进行下载:hadoop下载链接

Hadoop集群(HDFS、YARN)搭建可参考我的另外一篇文章:

Hadoop集群搭建(hdfs、yarn)-CSDN博客

这里就不在做更多介绍,直接启动Hadoop集群即可。

三、会话模式部署

(1)创建会话

执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。

root@node-1:/opt/module/flink-1.17.0/bin# ./yarn-session.sh -d -nm test
可用参数解读:
-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(--name):配置在YARN UI界面上显示的任务名。
-qu(--queue):指定YARN队列名。
-tm(--taskManager):配置每个TaskManager所使用内存。

启动完成后,我们打开YARN的可视化,可以看到下面开启了一个YARN会话:

(2)打开Flink页面

        此时如果要打开Flink可视化,端口发生了改变,我们有俩中方法打开Flink,第一种找到刚才开始YARN会话中的日志:

 通过http://node-2:41815打开Flink。

        第二种,将刚才的Flink会话拉到最右边,找到Tracking UI,直接点击

(3)提交作业 

(4)执行作业

执行作业之前也讲解过,服务器输入内容:

(5)通过命令方式上传

① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。

② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。

[root@node-1 flink-1.17.0]$ bin/flink run -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar -d

(6)停止会话模式

通过命令行输入停止命令,用于停止该YARN会话。

root@node-1:/opt/module/flink-1.17.0# echo "stop" | ./bin/yarn-session.sh -id application_1742969249592_0002

四、单作业模式 

单作业模式就是提交作业的时候才启动一个Flink集群。代码解析由Flink客户端进行解析。

资源管理:每个作业都会在 YARN 上单独启动一个 Flink 集群,作业完成后,该集群会被销毁。每个作业有自己独立的资源。

部署流程:提交作业时,会同时启动一个专门为该作业服务的 Flink 集群。

资源隔离:不同作业之间的资源隔离性好,一个作业的资源使用不会影响其他作业。

(1)执行命令提交作业

bin/flink run -d -t yarn-per-job -c cn.konne.un.WordCountUn Flink-First-Demo-1.0-SNAPSHOT.jar

命令中的yarn-per-job就是YARN模式中单作业模式(job)提交任务。

注意:如果启动过程中报如下异常。

 解决办法:在flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中设置

[root@node-1 conf]$ vim flink-conf.yamlclassloader.check-leaked-classloader: false

 (2)在YARN的ResourceManager界面查看执行情况。

(3)进行测试

(4)取消作业 

可以通过Flink页面直接 Cancel Job 关闭作业:

这是你会发现以下场景:

这是单作业模式的特点,当作业停止时,当前Flink集群会自动关闭停止,资源进行回收。 

同时我们也可以通过命令行停止作业:

第一步查询作业id,可以通过下面命令

root@node-1:/opt/module/flink-1.17.0# bin/flink list -t yarn-per-job -Dyarn.application.id=application_1742977086338_0005

可以看到我们正在运行的作业id。 

执行停止作业的命令

root@node-1:/opt/module/flink-1.17.0# bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1742977086338_0005 f70405085af955670d85a54fc975ebc7

         这里的application_1742977086338_0005是当前应用的ID,f70405085af...975ebc7是作业的ID。注意如果取消作业,整个Flink集群也会停掉。 

五、应用模式 

        应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。该模式用户的代码是通过JobManager进行解析的,而不是客户端。

(1)命令行提交

执行命令提交作业。

root@node-1:/opt/module/flink-1.17.0# bin/flink run-application -t yarn-application -c cn.konne.un.WordCountUn lib/Flink-First-Demo-1.0-SNAPSHOT.jar

(2)测试

测试和单作业模式测试一样,自行进行测试。

(3)上传HDFS提交

        YARN的机制是我们通过提交jar包以及Flink的一些依赖的时候,YARN会自动将这些jar包以及Flink的依赖先上传到HDFS,然后jobmananger去HDFS中拉拉取。 

        而手动使用HDFS上传提交,其实就是我们提前在HDFS上面创建好一个文件夹,然后提前把jar包以及Flink的一些依赖包上传到HDFS上面,然后通过HDFS再上传到JobMananger 。

        首先在HDFS创建两个文件夹,用来储存jar包以及Flink自身的一些依赖环境。

root@node-1:/opt/module/flink-1.17.0# hadoop fs -mkdir /flink-dist
root@node-1:/opt/module/flink-1.17.0# hadoop fs -mkdir /flink-jars

接下来我们将jar包以及Flink的依赖上传到对应的HDFS的文件夹中。

root@node-1:/opt/module/flink-1.17.0# hadoop fs -put Flink-First-Demo-1.0-SNAPSHOT.jar /flink-jars
root@node-1:/opt/module/flink-1.17.0# hadoop fs -put lib/ /flink-dist
root@node-1:/opt/module/flink-1.17.0# hadoop fs -put plugins/ /flink-dist

点击文件夹进去可以看到我们上传的东西

 接下来我们就可以去提交作业了

root@node-1:/opt/module/flink-1.17.0# bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://192.168.50.55:8020/flink-dist" -c cn.konne.un.WordCountUn hdfs://192.168.50.55:8020/flink-jars/Flink-First-Demo-1.0-SNAPSHOT.jar

这里选择的Flink依赖以及 我们输入的jar包都是从HDFS中获取的。

至此YARN模式的各种场景的作业提交已经完成。

六、历史服务器

        运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。

        Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

        此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

1、创建存储目录

hadoop fs -mkdir -p /logs/flink-job

flink-config.yaml%E4%B8%AD%E6%B7%BB%E5%8A%A0%E5%A6%82%E4%B8%8B%E9%85%8D%E7%BD%AE" name="2%E3%80%81%E5%9C%A8%C2%A0flink-config.yaml%E4%B8%AD%E6%B7%BB%E5%8A%A0%E5%A6%82%E4%B8%8B%E9%85%8D%E7%BD%AE">2、 flink-config.yaml中添加如下配置

jobmanager.archive.fs.dir: hdfs://node-1:8020/logs/flink-job
historyserver.web.address: node-1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node-1:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000

3、启动历史服务器

bin/historyserver.sh start

4、webUI

在浏览器地址栏输入:http://node-1:8082  查看已经停止的 job 的统计信息,可以看到已经停止的作业。点进去可以都看到改作业运行时的一些信息,方便出现问题后进行排查问题。


http://www.ppmy.cn/server/180165.html

相关文章

RWA代币化崛起中的香港机遇:数字金融新枢纽的破局之道

引言:全球资产代币化浪潮中的香港坐标 在2025年全球金融数字化重构的关键节点,RWA(现实世界资产代币化)市场以年均740%的增速重塑价值流动规则。香港凭借独特的政策创新、跨境枢纽优势及庞大的资产储备,正从传统金融中…

【QT5 多线程示例】异步编程

异步编程 QFuture 是 Qt 并发框架提供的一个异步计算结果的类。它允许你在后台执行任务,并在未来某个时刻获取任务的计算结果。QFuture 通常与 QtConcurrent::run 或 QFutureWatcher 结合使用: QtConcurrent::run 适用于异步执行一个函数并通过 QFuture…

Spring Boot框架中常用注解

以下是Spring Boot框架中常用注解的详细说明,包括名称、用途、用法、使用位置及扩展示例,按功能模块分类整理: 一、核心启动与配置注解 1. SpringBootApplication 用途:主启动类注解,整合了 Configuration、EnableAu…

LogitsProcessor代码分析

LogitsProcessor是一个抽象基类,用于在生成序列的过程中对模型输出的logits进行处理。它的派生类实现了各种策略,以控制生成过程。 公共输入和输出 所有的LogitsProcessor派生类都遵循相同的调用约定,即实现了__call__方法,接受以…

使用nohup和--remove-source-files在后台运行rsync并记录日志

一、什么是 --remove-source-files? 作用:在文件成功同步到目标路径后,删除源路径中的文件。适用场景:需要将文件从一处“移动”到另一处,而不是保留副本,例如清理旧数据、迁移文件到新存储。注意&#xf…

什么是具身智能

具身智能(Embodied Intelligence)是人工智能与机器人学交叉的前沿领域,强调智能体通过身体与环境的动态交互实现自主学习和进化,其核心在于将感知、行动与认知深度融合‌。通俗地讲,就是机器人或者智能系统在物理环境中…

matplotlib——南丁格尔玫瑰

南丁格尔玫瑰图(Nightingale Rose Chart),是一种特殊形式的柱状图,它以南丁格尔(Florence Nightingale)命名,她在1858年首次使用这种图表来展示战争期间士兵死亡原因的数据。 它将数据绘制在极坐…

通用人工智能(AGI)的发展路径(人工智能通识)

背景:为什么要写这个话题,因为,最近粉丝身边的朋友总有人在问,这个人工智能到底是个啥?AGI又是个啥?所以就把我的理解写了一下,希望是能够通俗易懂。 通用人工智能(AGI)…