Spark on Yarn(client和cluster模式,spark-shell 和 spark-submit 的区别,WorldCount实现与理解)

news/2024/11/25 21:20:19/

文章目录

  • Spark on Yarn
    • 两种模式
      • client
      • cluster
    • spark-shell 和 spark-submit 的区别的理解
      • spark-shell
      • spark-submit
    • WorldCount实现
      • IDEA本地实现
      • On Yarn 实现
      • WorldCount图解

Spark on Yarn

spark on yarn 的两种模式是指 spark 应用程序的 driver 进程(负责控制和协调整个应用程序的运行)在哪里运行的问题。

两种模式

client

yarn-client 模式是指 driver 运行在客户端上,通过 application master(负责向 yarn 的 resource manager 申请资源和监控应用程序状态)来管理集群中的 executor 进程(负责执行具体的任务)。这样做的好处是可以方便地获取日志和返回信息,因为 driver 在客户端。缺点是会占用客户端的资源,可能影响性能和并发。这种模式适合开发测试环境,因为可以方便地调试和交互。

在这里插入图片描述

cluster

yarn-cluster 模式是指driver 运行在 yarn 集群中的一个 worker 节点上,这个节点会在 spark web UI 上显示为 driver 节点。这样做的好处是可以节省客户端(提交 spark 应用程序的机器)的资源,不受客户端的限制。缺点是不方便查看日志和返回信息,因为 driver 不在客户端。这种模式适合生产环境,因为可以保证应用程序的稳定性和效率。

在这里插入图片描述

spark-shell 和 spark-submit 的区别的理解

spark-shell

spark-shell 是一个交互式的 shell,可以用来运行 spark 代码和 SQL 语句,支持 Scala 和 Python 两种语言。spark-shell 可以在本地或者集群上运行,可以方便地进行数据分析和探索,也可以用来测试和调试 spark 应用程序。spark-shell 启动时会创建两个对象,一个是 sc,一个是 spark。sc 是 SparkContext 的实例,是 spark 的底层核心对象,负责和集群进行通信和协调。spark 是 SparkSession 的实例,是 spark 的高层封装对象,提供了更多的功能和便利,如 SQL, DataFrame, Dataset 等。

spark-submit

spark-submit 是一个用来在集群上提交和运行 spark 应用程序的脚本,支持 Scala, Java 和 Python 三种语言。spark-submit 可以指定各种选项和配置,如集群管理器,部署模式,资源分配,依赖包等。spark-submit 需要提供一个包含 spark 应用程序的 jar 包或者 py 文件,以及传递给应用程序的参数。spark-submit 适合运行生产环境的 spark 应用程序,可以保证应用程序的稳定性和效率。

spark-submit 的一般语法是:./bin/spark-submit \–class <main-class> \–master <master-url> \–deploy-mode <deploy-mode> \–conf <key>=<value> \# 其他选项<application-jar> \[application-arguments]其中,一些常用的选项是:–class: 应用程序的入口类(例如 org.apache.spark.examples.SparkPi)–master: 集群的 master URL(例如 yarn)–deploy-mode: 是否在 worker 节点上部署 driver(cluster)或者在本地作为一个外部客户端(client)(默认是 client)–conf: 任意的 spark 配置属性,以 key=value 的格式。如果值包含空格,需要用引号括起来(如下所示)。多个配置需要用不同的参数传递。(例如 --conf <key>=<value> --conf <key2>=<value2>)application-jar: 包含应用程序和所有依赖的 jar 包的路径。这个 URL 必须在集群中全局可见,例如,一个 hdfs:// 路径或者一个 file:// 路径,且在所有节点上存在。

WorldCount实现

IDEA本地实现

在maven中导入相应依赖

  <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.10</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version> </dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.4.17</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.12</artifactId><version>2.4.17</version></dependency></dependencies><build><plugins><!-- 打包插件, 否则 scala 类不会编译并打包进去 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>

代码实现

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//准备sc/SparkContext/Spark上下文执行环境/*这段代码的意思是创建一个 SparkConf 对象,用来配置 spark 应用程序的属性。SparkConf 对象可以传递给 SparkContext,用来创建 spark 环境。这段代码设置了两个属性:setAppName(“wc”): 设置 spark 应用程序的名称为 “wc”,这个名称会显示在 spark web UI 上,也可以用来标识应用程序。setMaster("local[*]"): 设置 spark 应用程序的运行模式为 local 模式,即在本地运行。local[*] 表示使用所有可用的 CPU 核数,也可以指定具体的数字,例如 local[2] 表示使用两个 CPU 核数。*/val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")/*这段代码的意思是创建一个 SparkContext 对象,用来初始化 spark 环境。SparkContext 是 spark 的核心对象,负责和集群进行通信和协调,创建 RDD, DataFrame, Dataset 等分布式数据集。这段代码传递了一个 SparkConf 对象,用来指定 spark 应用程序的配置属性,例如应用程序的名称,运行模式,资源分配等。*/val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//source/读取数据//RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!//从本地文件系统中读取 data/input/words.txt 文件,创建一个 RDD[String] 类型的数据集,每个元素是文件中的一行文本。val lines: RDD[String] = sc.textFile("data/input/words.txt")//transformation/数据操作/转换//对 lines 数据集进行 flatMap 转换操作,将每行文本按空格切分成单词,然后扁平化成一个 RDD[String] 类型的数据集,每个元素是一个单词。val words: RDD[String] = lines.flatMap(_.split(" "))//对 words 数据集进行 map 转换操作,将每个单词映射成一个 (单词, 1) 的元组,创建一个 RDD[(String, Int)] 类型的数据集,每个元素是一个 (单词, 1) 的键值对。// RDD[(单词, 1)]val wordAndOnes: RDD[(String, Int)] = words.map((_,1))//分组聚合:groupBy + mapValues(_.map(_._2).reduce(_+_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey// 对 wordAndOnes 数据集进行 reduceByKey 转换操作,// 按照单词进行分组,然后对每组的值进行累加,创建一个 RDD[(String, Int)] 类型的数据集,每个元素是一个 (单词, 频数) 的键值对。val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)//直接输出result.foreach(println)//收集为本地集合再输出println(result.collect().toBuffer)//输出到指定path(可以是文件/夹)/*result.repartition(1): 对 result 数据集进行 repartition 操作,将数据集的分区数调整为 1,即将所有的数据合并到一个分区中。这样做的目的是为了方便保存到一个文件中,但是可能会影响性能和并行度。saveAsTextFile(“data/output/result”): 对 repartition 后的数据集进行 saveAsTextFile 操作,将数据集的内容以文本格式保存到 data/output/result 目录中。每个分区对应一个文件,文件名为 part-00000, part-00001 等。因为 repartition 之后只有一个分区,所以只会生成一个文件,即 part-00000。*/result.repartition(1).saveAsTextFile("data/output/result")result.repartition(2).saveAsTextFile("data/output/result2")//为了便于查看Web-UI可以让程序睡一会Thread.sleep(1000 * 60)//关闭资源sc.stop()}
}

On Yarn 实现

代码实现

import org.apache.hadoop.fs.{FileSystem, Path} //导入Hadoop文件系统相关的类
import org.apache.spark.{SparkConf, SparkContext} //导入Spark相关的类object WordCount1 {def main(args: Array[String]): Unit = {//这里不需要设置setMaster,因为在集群运行时,可以通过 spark-submit 命令的 --master 选项来指定 master URL,而不需要在代码中设置val conf: SparkConf = new SparkConf().setAppName("WordCount1") //创建SparkConf对象,设置应用程序名称val sc = new SparkContext(conf) //创建SparkContext对象val fs = FileSystem.get(sc.hadoopConfiguration) //获取Hadoop文件系统val outPutPath = new Path("/result") //设置输出路径if (fs.exists(outPutPath)) //如果输出路径已经存在,则删除fs.delete(outPutPath,true)sc.textFile("/FirstDemo.txt") //读取文本文件.flatMap(_.split(" ")) //将每一行按空格分割成单词.map((_, 1)) //将每个单词映射为(key, value)对,value为1.reduceByKey(_ + _) //按key进行聚合,统计每个单词出现的次数.saveAsTextFile("/result") //将结果保存到输出路径sc.stop() //停止SparkContext}
}

打包

在这里插入图片描述

虚拟机和hdfs

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

提交任务

 spark-submit  --class com.qst.spark.WordCount1  --master yarn  --deploy-mode client /home/project/sparkdemo-1.0-SNAPSHOT.jar

在这里插入图片描述

Yarn端查看

在这里插入图片描述

hdfs端查看

在这里插入图片描述

在这里插入图片描述

命令查看结果

在这里插入图片描述

WorldCount图解

在这里插入图片描述


http://www.ppmy.cn/news/47004.html

相关文章

java线程小结

线程 1.概念与原理 1.1线程和进程的概念 现在的操作系统是多任务操作系统&#xff0c;而多线程是实现多任务的一种方式 进程&#xff1a;是指内存中运行的应用程序&#xff0c;有自己独立的内存空间&#xff0c;可以独立的运行&#xff0c;进程彼此之间互不干扰&#xff0c;一…

【C++】从C语言入门C++的基础知识

C基础知识 前言1. C关键字2. 命名空间namespace命名空间的创建命名空间的使用命名空间的注意事项 3. C输入&输出4. 缺省参数概念分类全缺省参数半缺省参数 5. 函数重载概念实现C为什么能进行函数重载C和C的相互调用&#xff08;可以不用看&#xff09; 6. 引用概念注意事项…

【云原生进阶之容器】第六章容器网络6.4.3--Flannel网络模式

《云原生进阶之容器》专题索引: 第一章Docker核心技术1.1节——Docker综述第一章Docker核心技术1.2节——Linux容器LXC第一章Docker核心技术1.3节——命名空间Namespace第一章Docker核心技术1.4节——chroot技术第一章Docker核心技术1.5.1节——cgroup综述

系统接口幂等性设计探究

前言&#xff1a; 刚开始工作的时候写了一个带UI页面的工具&#xff0c;需要设计登录功能&#xff0c;登录功能也很简单&#xff0c;输入用户名密码点击登录&#xff0c;触发后台查询并比对密码&#xff0c;如果登录成功则返回消息给前端&#xff0c;前端把消息弹出提示一下。…

面向计算机视觉的深度学习:1~5

原文&#xff1a;Deep Learning for Computer Vision 协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【ApacheCN 深度学习 译文集】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 不要担心自己的形象&#xff0c;只关心如何实…

MVC和MVVM模式的区别

MVVM 和 MVC 都是软件架构模式&#xff0c;其中 MVVM 表示“Model-View-ViewModel”&#xff0c;而 MVC 表示“Model-View-Controller”。 MVC 模式中&#xff0c;控制器&#xff08;Controller&#xff09;充当视图&#xff08;View&#xff09;和模型&#xff08;Model&…

C语言中如何判断大小端字节序?

大小端&#xff08;Endian&#xff09;是指多字节整数在内存中存储的方式。在计算机中&#xff0c;一个多字节整数由多个字节组成&#xff0c;而不同的机器和处理器在存储多字节整数时会有两种不同存储方式&#xff0c;分别为大端字节序和小端字节序。 以一个4字节整数0x12345…

RHCE第二次作业

一、配置nto时间服务器&#xff0c;确保客户端主机和服务器主机同步时间 1、NTP是网络时间同步协议&#xff0c;就是用来同步网络中各个计算机的时间的协议。 2、NTP服务端配置 &#xff08;1&#xff09;检查系统是否安装了NTP包&#xff08;Linux一般自带NTP4.2&#xff09;&…