目录
一、YARN运行模式(重点)
二、相关准备和配置
1、Hadoop集群搭建(HDFS、YARN)
三、会话模式部署
(1)创建会话
(2)打开Flink页面
(3)提交作业
(4)执行作业
(5)通过命令方式上传
(6)停止会话模式
四、单作业模式
(1)执行命令提交作业
(2)在YARN的ResourceManager界面查看执行情况。
(3)进行测试
(4)取消作业
五、应用模式
(1)命令行提交
(2)测试
(3)上传HDFS提交
六、历史服务器
1、创建存储目录
flink-config.yaml%E4%B8%AD%E6%B7%BB%E5%8A%A0%E5%A6%82%E4%B8%8B%E9%85%8D%E7%BD%AE-toc" name="tableOfContents" style="margin-left:80px">2、在 flink-config.yaml中添加如下配置
3、启动历史服务器
4、webUI
国内大部分大数据底层架构是通过hadoop进行构建,hadoop有三大模块:HDFS、YARN、MapReduce。
HDFS:负责存储,大规模数据分布存储在多个节点上,支持数据的可靠读写,适合处理大文件和流式数据访问。
YARN:负责集群资源的管理和调度,将资源分配给不同的应用程序,使多个应用程序能在同一集群中共享资源并高效运行。
MapReduce:一种编程模型和计算框架,用于大规模数据集的并行处理 。
一、YARN运行模式(重点)
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
二、相关准备和配置
1、Hadoop集群搭建(HDFS、YARN)
可以自行前往官网进行下载:hadoop下载链接
Hadoop集群(HDFS、YARN)搭建可参考我的另外一篇文章:
Hadoop集群搭建(hdfs、yarn)-CSDN博客
这里就不在做更多介绍,直接启动Hadoop集群即可。
三、会话模式部署
(1)创建会话
执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。
root@node-1:/opt/module/flink-1.17.0/bin# ./yarn-session.sh -d -nm test
可用参数解读:
-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(--name):配置在YARN UI界面上显示的任务名。
-qu(--queue):指定YARN队列名。
-tm(--taskManager):配置每个TaskManager所使用内存。
启动完成后,我们打开YARN的可视化,可以看到下面开启了一个YARN会话:
(2)打开Flink页面
此时如果要打开Flink可视化,端口发生了改变,我们有俩中方法打开Flink,第一种找到刚才开始YARN会话中的日志:
通过http://node-2:41815打开Flink。
第二种,将刚才的Flink会话拉到最右边,找到Tracking UI,直接点击
(3)提交作业
(4)执行作业
执行作业之前也讲解过,服务器输入内容:
(5)通过命令方式上传
① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。
② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。
[root@node-1 flink-1.17.0]$ bin/flink run -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar -d
(6)停止会话模式
通过命令行输入停止命令,用于停止该YARN会话。
root@node-1:/opt/module/flink-1.17.0# echo "stop" | ./bin/yarn-session.sh -id application_1742969249592_0002
四、单作业模式
单作业模式就是提交作业的时候才启动一个Flink集群。代码解析由Flink客户端进行解析。
资源管理:每个作业都会在 YARN 上单独启动一个 Flink 集群,作业完成后,该集群会被销毁。每个作业有自己独立的资源。
部署流程:提交作业时,会同时启动一个专门为该作业服务的 Flink 集群。
资源隔离:不同作业之间的资源隔离性好,一个作业的资源使用不会影响其他作业。
(1)执行命令提交作业
bin/flink run -d -t yarn-per-job -c cn.konne.un.WordCountUn Flink-First-Demo-1.0-SNAPSHOT.jar
命令中的yarn-per-job就是YARN模式中单作业模式(job)提交任务。
注意:如果启动过程中报如下异常。
解决办法:在flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中设置
[root@node-1 conf]$ vim flink-conf.yamlclassloader.check-leaked-classloader: false
(2)在YARN的ResourceManager界面查看执行情况。
(3)进行测试
(4)取消作业
可以通过Flink页面直接 Cancel Job 关闭作业:
这是你会发现以下场景:
这是单作业模式的特点,当作业停止时,当前Flink集群会自动关闭停止,资源进行回收。
同时我们也可以通过命令行停止作业:
第一步查询作业id,可以通过下面命令
root@node-1:/opt/module/flink-1.17.0# bin/flink list -t yarn-per-job -Dyarn.application.id=application_1742977086338_0005
可以看到我们正在运行的作业id。
执行停止作业的命令
root@node-1:/opt/module/flink-1.17.0# bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1742977086338_0005 f70405085af955670d85a54fc975ebc7
这里的application_1742977086338_0005是当前应用的ID,f70405085af...975ebc7是作业的ID。注意如果取消作业,整个Flink集群也会停掉。
五、应用模式
应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。该模式用户的代码是通过JobManager进行解析的,而不是客户端。
(1)命令行提交
执行命令提交作业。
root@node-1:/opt/module/flink-1.17.0# bin/flink run-application -t yarn-application -c cn.konne.un.WordCountUn lib/Flink-First-Demo-1.0-SNAPSHOT.jar
(2)测试
测试和单作业模式测试一样,自行进行测试。
(3)上传HDFS提交
YARN的机制是我们通过提交jar包以及Flink的一些依赖的时候,YARN会自动将这些jar包以及Flink的依赖先上传到HDFS,然后jobmananger去HDFS中拉拉取。
而手动使用HDFS上传提交,其实就是我们提前在HDFS上面创建好一个文件夹,然后提前把jar包以及Flink的一些依赖包上传到HDFS上面,然后通过HDFS再上传到JobMananger 。
首先在HDFS创建两个文件夹,用来储存jar包以及Flink自身的一些依赖环境。
root@node-1:/opt/module/flink-1.17.0# hadoop fs -mkdir /flink-dist
root@node-1:/opt/module/flink-1.17.0# hadoop fs -mkdir /flink-jars
接下来我们将jar包以及Flink的依赖上传到对应的HDFS的文件夹中。
root@node-1:/opt/module/flink-1.17.0# hadoop fs -put Flink-First-Demo-1.0-SNAPSHOT.jar /flink-jars
root@node-1:/opt/module/flink-1.17.0# hadoop fs -put lib/ /flink-dist
root@node-1:/opt/module/flink-1.17.0# hadoop fs -put plugins/ /flink-dist
点击文件夹进去可以看到我们上传的东西
接下来我们就可以去提交作业了
root@node-1:/opt/module/flink-1.17.0# bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://192.168.50.55:8020/flink-dist" -c cn.konne.un.WordCountUn hdfs://192.168.50.55:8020/flink-jars/Flink-First-Demo-1.0-SNAPSHOT.jar
这里选择的Flink依赖以及 我们输入的jar包都是从HDFS中获取的。
至此YARN模式的各种场景的作业提交已经完成。
六、历史服务器
运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。
Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。
此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。
1、创建存储目录
hadoop fs -mkdir -p /logs/flink-job
flink-config.yaml%E4%B8%AD%E6%B7%BB%E5%8A%A0%E5%A6%82%E4%B8%8B%E9%85%8D%E7%BD%AE" name="2%E3%80%81%E5%9C%A8%C2%A0flink-config.yaml%E4%B8%AD%E6%B7%BB%E5%8A%A0%E5%A6%82%E4%B8%8B%E9%85%8D%E7%BD%AE">2、在 flink-config.yaml中添加如下配置
jobmanager.archive.fs.dir: hdfs://node-1:8020/logs/flink-job
historyserver.web.address: node-1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node-1:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
3、启动历史服务器
bin/historyserver.sh start
4、webUI
在浏览器地址栏输入:http://node-1:8082 查看已经停止的 job 的统计信息,可以看到已经停止的作业。点进去可以都看到改作业运行时的一些信息,方便出现问题后进行排查问题。