文章目录
- Spark集群搭建
- 一、基础集群脚本
- 1、远程调用脚本(remote_call.sh)
- 2、远程复制目录脚本(remote_copy.sh)
- 3、Spark集群服务管理脚本
- 二、配置相关文件
- 端口配置
- 日志文件路径
- 三、任务提交案例
- 1. 案例详解
- 2. Spark Submit 选项详解
Spark集群搭建
一、基础集群脚本
1、远程调用脚本(remote_call.sh)
如果有传命令参数,则执行该命令;如果没有传命令参数,则不执行。
#!/bin/bashcmd=$1
if [ ! $cmd ];thencmd="jps"
fi# 提取集群免密通信的虚拟机主机名
hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}'`# 遍历所有主机
for host in $hosts;doecho "-------- $host --------"# 此处使用"$cmd"的原因是避免将命令中的空格识别为多条命令ssh root@$host "$cmd"
done
2、远程复制目录脚本(remote_copy.sh)
首先,要验证待复制的目录在本机是否存在;然后需要从/etc/hosts文件中获取除去当前主机名的其他主机名,并且对每个主机进行循环操作,先判断父目录是否存在,再进行递归复制。
#!/bin/bashpath=$1 # 本机目录# 验证路径是否存在
if [ ! -e $path ];thenecho "目录 $path 不存在,无法拷贝"exit 0
fi# 提取集群中排除当前主机名之外的所有主机名
# 只需验证父目录是否存在即可
me=`hostname`
parent=`dirname $path`
hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}' | grep -v "$me"`for host in $hosts;doscp -r $path root@$host:$parentecho "-------- 拷贝 $path 到 $host 成功 --------"
done
3、Spark集群服务管理脚本
按照"先开的后关"的原则,按照"ZooKeeper-HDFS-YARN-Spark-启用其他Master节点"的次序进行集群的服务管理
#!/bin/bashif [ "$1" == "start" ];then# 自定义启动服务函数,传入两个字符串参数:参数1为启动命令,参数2为启动方式function startService(){# 定义两个局部遍历依次接收用户传入的两个参数# service 为启动命令local service=$1# exeType 为启动方式(daemon:后台启动(用于那些会占用窗口的服务),非daemon表示正常启动)local exeType=$2if [ "$exeType" == "daemon" ];then# 后台启动nohup $service 1>/dev/null 2>&1 &else# 正常启动$service 1>/dev/null 2>&1fi# 判断执行启动脚本后的状态if [ $? -ne 0 ];then# 因为服务按从上到下有依赖关系,所以任何一个脚本执行出错,将退出整个脚本,所以选择exit而非return# 函数按通用规则 return 0:正常,非0:表示异常;若函数中return缺省,则默认返回最后一条命名的状态即改命令执行后的$?echo "ERROR : FAIL TO EXECUTE $service IN MODE OF $exeType ... EXIT ..."# 退出脚本exit 1else# 成功则正常输出提示结果echo "INFO : SUCCESS TO EXECUTE $service IN MODE OF $exeType"fi}# 依次按服务依赖顺序调用启动服务函数,并为每次调用传入启动命令和启动方式# 启动 Zookeeper/root/install/remote_call.sh "zkServer.sh start"# 启动 hadoop hdfsstartService "start-dfs.sh" "non-daemon"# 启动 hadoop yarnstartService "start-yarn.sh" "non-daemon"# 启动 SparkstartService "start-all.sh" "non-daemon"# 实现集群高可用ssh root@master02 "start-master.sh"elif [ "$1" == "stop" ];then# 自定义启动服务函数,传入两个字符串参数:参数1为关闭命令,参数2为关闭方式function stopService(){# 服务名称或关闭命令local service=$1# 关闭方式(command:命令关闭,pid:根据服务查找进程编号(pid)在借助kill命令关闭)local exeType=$2if [ "$exeType" == "command" ];then# 直接执行参数一命令关闭服务$service 1>/dev/null 2>&1# 根据关闭命令执行的状态展示结果if [ $? -eq 0 ];thenecho "INFO : SUCCESS TO EXECUTE $service"elseecho "ERROR : FAIL TO EXECUTE $service"fielse# 根据参数一传入的服务名称查看进程编号(pid)local pid=$(jps -ml|grep $service|awk '{print $1}')if [ "$pid" ];then# 如果进程编号存在,则直接强制 killkill -9 $pid# 根据kill的状态展示结果if [ $? -eq 0 ];thenecho "INFO : SUCCESS TO KILL $service WITH PID $pid"elseecho "ERROR : FAIL TO KILL $service WITH PID $pid"fielseecho "INFO : NO SERVICE EXIST WITH NAME OF $service"fifi}# 根据服务的依赖关系,逆向逐个关闭服务# 关闭高可用另外起的masterssh root@master02 "stop-master.sh"# 关闭SparkstopService "stop-all.sh" "command"# 最后关闭 yarn 和 hdfsstopService "stop-yarn.sh" "command"stopService "stop-dfs.sh" "command"# 关闭ZooKeeper/root/install/remote_call.sh "zkServer.sh stop"
else# 附带查一下java进程jps -ml
fi
如果需要切换主节点的状态,可以先通过stop-master.sh
使A-Master节点状态由Active->Standby
,待B-Master根据机制切换为Active
态之后再重启A-Master节点。
二、配置相关文件
cd /opt/software/spark-3.1.2/conf
// mv xxx.template xxx 将所有template文件去除后缀为正式配置文件
vim spark-env.sh
-------------------------------------------------------------------
export JAVA_HOME=/opt/software/jdk1.8.0_171export HADOOP_CONF_DIR=/opt/software/hadoop-3.1.3/etc/hadoopSPARK_MASTER_WEBUI_PORT=9090SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=9091
-Dspark.history.fs.logDirectory=hdfs://master01:8020/spark_event_log_dir
-Dspark.history.retainedApplications=30
"SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=master01,master02,worker01
-Dspark.deploy.zookeeper.dir=/spark
"
-------------------------------------------------------------------vim workers // 配置集群所有机器名称
-------------------------------------------------------------------
master01
master02
worker01
-------------------------------------------------------------------vim spark-default.conf
-------------------------------------------------------------------
spark.eventLog.enabled true
spark.eventLog.dir hdfs://master01:8020/spark_event_log_dir
spark.yarn.historyServer.address=master01:9091
spark.history.ui.port=9091
-------------------------------------------------------------------vim yarn-site.xml // [选配]:在启动Yarn模式时才需要配置
-------------------------------------------------------------------
<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value>
</property>
<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value>
</property>
-------------------------------------------------------------------./remote_copy.sh /opt/software/spark-3.1.2/ // 拷贝Spark内容
./remote_copy.sh /etc/profile.d/myenv.sh // 拷贝环境变量
./remote_call.sh "source /etc/profile"
./remote_call.sh "/opt/software/service-bigdata.sh start" // 完整启动Spark集群中的所有服务
在此配置下,有以下关键信息:
-
端口配置
- Spark Master Web UI:9090
- Spark HistoryServer UI:9091
-
日志文件路径
- 日志文件路径:
hdfs://master01:8020/spark_event_log_dir
- 日志文件路径:
-
关闭内存检查的原因:Spark任务吃资源,不能为了保证资源不被过度使用就强行终止Spark任务。
三、任务提交案例
1. 案例详解
ProductsAnalyzer类
val path = args(0) val dirPath = args(1)
表示输入和输出路径在提交任务时具体指定。
package envtestimport core.{SparkSessionBuilder, SparkSessionBuilderDeployer}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Windowobject ProductsAnalyzer {case class Product(name: String, price: Double, date: String, market: String, province: String, city: String)implicit class StrExt(line: String){// 香菜 2.80 2018/1/1 山西汾阳市晋阳农副产品批发市场 山西 汾阳val regexProduct = "(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)".rdef toProduct: Product = {line match {// 使用模式匹配将字符串和正则表达式进行匹配,如果成功,则将各个匹配到的部分转换为Product对象并进行返回// java.lang.NumberFormatException: empty String:输入数据中的某些行可能不符合预期的格式,导致 toDouble 转换失败。解决方案是:使其转化为Product对象时,如果为空,则转化为0.0fcase regexProduct(name, price, date, market, province, city) => Product(name, if (price.trim.isEmpty) 0.0f else price.toDouble, date, market, province, city)case _ => throw new RuntimeException(s"产品数据格式有误:$line")}}}def main(args: Array[String]): Unit = {/*** 在Edit Configurations中指定参数,通过args[0]和args[1]获取输入路径和输出路径*/val path = args(0)val dirPath = args(1)val ssb: SparkSessionBuilderDeployer = SparkSessionBuilderDeployer()val spark = ssb.sparkval sc: SparkContext = ssb.scval rddProduct: RDD[Product] = sc.textFile(path).mapPartitions(_.map(_.toProduct))import org.apache.spark.sql.functions._import spark.implicits._// 统计每个省份下每个商品的平均价格和商品数量,并且在不同省份下按照商品数量创建排名指标spark.createDataFrame(rddProduct).groupBy($"province",$"name").agg(avg($"price").as("avg_price"),count("*").as("product_cnt")).select($"province",$"name",$"avg_price",$"product_cnt",dense_rank().over(Window.partitionBy($"province").orderBy($"avg_price".desc)).as("rank")).repartition(1).write.mode(saveMode = "overwrite").option("separator",",").option("header", "true").csv(dirPath)spark.stop()}
}
SparkSessionBuilderDeployer类
SparkSessionBuilderDeployer()
使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。
package coreimport org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}// 封装SparkSession的创建方法
// SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。
class SparkSessionBuilderDeployer(){lazy val config:SparkConf = {new SparkConf()}lazy val spark:SparkSession = {SparkSession.builder().config(config).getOrCreate()}lazy val sc:SparkContext = {spark.sparkContext}def stop(): Unit = {if (null != spark) {spark.stop()}}
}
object SparkSessionBuilderDeployer {def apply(): SparkSessionBuilderDeployer = new SparkSessionBuilderDeployer
}
在具体测试案例代码时,可以先通过对path
和dirpath
指定具体路径的方式来测试整体逻辑,再通过Edit Configurations
设置具体参数来测试传参后的程序逻辑,测试完毕之后,即可直接将输入和输出路径分别修改为args(0)
和args(1)
,并且直接打包发布到虚拟机,供给Spark集群进行执行。
2. Spark Submit 选项详解
-
主要选项
-
–class :
- 指定应用程序的主类,即包含
main
方法的类。 - 示例:
--class envtest.ProductsAnalyzer
- 指定应用程序的主类,即包含
-
–master :
-
指定
Spark Master
节点的 URL。 -
示例:
-
Standalone 模式:
--master spark://master01:7077
- 表示
Spark Master
节点运行在master01
主机的 7077 端口上。
- 表示
-
YARN 模式:
--master yarn
- 使用 YARN 作为资源管理器。
-
-
-
–deploy-mode :
- 指定应用程序的运行模式。
- 模式:
client
:Driver 程序运行在提交任务的本地机器上。cluster
:Driver 程序运行在集群的某个节点上。
- 示例:
--deploy-mode cluster
-
–name :
- 指定应用程序的名称。
- 示例:
--name spark-sql-product
-
:
-
<application-argument(s)>:
-
-
具体代码
standalone
spark-submit \
--class envtest.ProductsAnalyzer \
--master spark://master01:7077 \
--deploy-mode client \
--name spark-sql-product \
/root/spark/spark_sql-1.0.jar \
hdfs://master02:8020/spark/data/products.txt \
hdfs://master02:8020/spark/result/product_result
yarn-cluster
spark-submit \
--class envtest.ProductsAnalyzer \
--master yarn \
--deploy-mode cluster \
--name spark-sql-product \
/root/spark/spark_sql-1.0.jar \
hdfs://master02:8020/spark/data/products.txt \
hdfs://master02:8020/spark/result/product_result