Spark集群搭建

server/2024/12/21 8:51:44/

在这里插入图片描述
在这里插入图片描述

文章目录

    • Spark集群搭建
      • 一、基础集群脚本
        • 1、远程调用脚本(remote_call.sh)
        • 2、远程复制目录脚本(remote_copy.sh)
        • 3、Spark集群服务管理脚本
      • 二、配置相关文件
        • 端口配置
        • 日志文件路径
      • 三、任务提交案例
        • 1. 案例详解
        • 2. Spark Submit 选项详解

Spark集群搭建

一、基础集群脚本

1、远程调用脚本(remote_call.sh)

如果有传命令参数,则执行该命令;如果没有传命令参数,则不执行。

#!/bin/bashcmd=$1
if [ ! $cmd ];thencmd="jps"
fi# 提取集群免密通信的虚拟机主机名
hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}'`# 遍历所有主机
for host in $hosts;doecho "-------- $host --------"# 此处使用"$cmd"的原因是避免将命令中的空格识别为多条命令ssh root@$host "$cmd"
done
2、远程复制目录脚本(remote_copy.sh)

首先,要验证待复制的目录在本机是否存在;然后需要从/etc/hosts文件中获取除去当前主机名的其他主机名,并且对每个主机进行循环操作,先判断父目录是否存在,再进行递归复制。

#!/bin/bashpath=$1 # 本机目录# 验证路径是否存在
if [ ! -e $path ];thenecho "目录 $path 不存在,无法拷贝"exit 0
fi# 提取集群中排除当前主机名之外的所有主机名
# 只需验证父目录是否存在即可
me=`hostname`
parent=`dirname $path`
hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}' | grep -v "$me"`for host in $hosts;doscp -r $path root@$host:$parentecho "-------- 拷贝 $path$host 成功 --------"
done
3、Spark集群服务管理脚本

按照"先开的后关"的原则,按照"ZooKeeper-HDFS-YARN-Spark-启用其他Master节点"的次序进行集群的服务管理

#!/bin/bashif [ "$1" == "start" ];then# 自定义启动服务函数,传入两个字符串参数:参数1为启动命令,参数2为启动方式function startService(){# 定义两个局部遍历依次接收用户传入的两个参数# service 为启动命令local service=$1# exeType 为启动方式(daemon:后台启动(用于那些会占用窗口的服务),非daemon表示正常启动)local exeType=$2if [ "$exeType" == "daemon" ];then# 后台启动nohup $service 1>/dev/null 2>&1 &else# 正常启动$service 1>/dev/null 2>&1fi# 判断执行启动脚本后的状态if [ $? -ne 0 ];then# 因为服务按从上到下有依赖关系,所以任何一个脚本执行出错,将退出整个脚本,所以选择exit而非return# 函数按通用规则 return 0:正常,非0:表示异常;若函数中return缺省,则默认返回最后一条命名的状态即改命令执行后的$?echo "ERROR : FAIL TO EXECUTE $service IN MODE OF $exeType ... EXIT ..."# 退出脚本exit 1else# 成功则正常输出提示结果echo "INFO : SUCCESS TO EXECUTE $service IN MODE OF $exeType"fi}# 依次按服务依赖顺序调用启动服务函数,并为每次调用传入启动命令和启动方式# 启动 Zookeeper/root/install/remote_call.sh "zkServer.sh start"# 启动 hadoop hdfsstartService "start-dfs.sh" "non-daemon"# 启动 hadoop yarnstartService "start-yarn.sh" "non-daemon"# 启动 SparkstartService "start-all.sh" "non-daemon"# 实现集群高可用ssh root@master02 "start-master.sh"elif [ "$1" == "stop" ];then# 自定义启动服务函数,传入两个字符串参数:参数1为关闭命令,参数2为关闭方式function stopService(){# 服务名称或关闭命令local service=$1# 关闭方式(command:命令关闭,pid:根据服务查找进程编号(pid)在借助kill命令关闭)local exeType=$2if [ "$exeType" == "command" ];then# 直接执行参数一命令关闭服务$service 1>/dev/null 2>&1# 根据关闭命令执行的状态展示结果if [ $? -eq  0 ];thenecho "INFO : SUCCESS TO EXECUTE $service"elseecho "ERROR : FAIL TO EXECUTE $service"fielse# 根据参数一传入的服务名称查看进程编号(pid)local pid=$(jps -ml|grep $service|awk '{print $1}')if [ "$pid" ];then# 如果进程编号存在,则直接强制 killkill -9 $pid# 根据kill的状态展示结果if [ $? -eq  0 ];thenecho "INFO : SUCCESS TO KILL $service WITH PID $pid"elseecho "ERROR : FAIL TO KILL $service WITH PID $pid"fielseecho "INFO : NO SERVICE EXIST WITH NAME OF $service"fifi}# 根据服务的依赖关系,逆向逐个关闭服务# 关闭高可用另外起的masterssh root@master02 "stop-master.sh"# 关闭SparkstopService "stop-all.sh" "command"# 最后关闭 yarn 和 hdfsstopService "stop-yarn.sh" "command"stopService "stop-dfs.sh" "command"# 关闭ZooKeeper/root/install/remote_call.sh "zkServer.sh stop"
else# 附带查一下java进程jps -ml
fi

如果需要切换主节点的状态,可以先通过stop-master.sh使A-Master节点状态由Active->Standby,待B-Master根据机制切换为Active态之后再重启A-Master节点。

二、配置相关文件

cd /opt/software/spark-3.1.2/conf
// mv xxx.template xxx 将所有template文件去除后缀为正式配置文件
vim spark-env.sh
-------------------------------------------------------------------
export JAVA_HOME=/opt/software/jdk1.8.0_171export HADOOP_CONF_DIR=/opt/software/hadoop-3.1.3/etc/hadoopSPARK_MASTER_WEBUI_PORT=9090SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=9091
-Dspark.history.fs.logDirectory=hdfs://master01:8020/spark_event_log_dir
-Dspark.history.retainedApplications=30
"SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=master01,master02,worker01
-Dspark.deploy.zookeeper.dir=/spark
"
-------------------------------------------------------------------vim workers // 配置集群所有机器名称
-------------------------------------------------------------------
master01
master02
worker01
-------------------------------------------------------------------vim spark-default.conf
-------------------------------------------------------------------
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://master01:8020/spark_event_log_dir
spark.yarn.historyServer.address=master01:9091
spark.history.ui.port=9091
-------------------------------------------------------------------vim yarn-site.xml // [选配]:在启动Yarn模式时才需要配置
-------------------------------------------------------------------
<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value>
</property>
<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value>
</property>
-------------------------------------------------------------------./remote_copy.sh /opt/software/spark-3.1.2/ // 拷贝Spark内容
./remote_copy.sh /etc/profile.d/myenv.sh // 拷贝环境变量
./remote_call.sh "source /etc/profile"
./remote_call.sh "/opt/software/service-bigdata.sh start" // 完整启动Spark集群中的所有服务

在此配置下,有以下关键信息:

  • 端口配置
    • Spark Master Web UI:9090
    • Spark HistoryServer UI:9091
  • 日志文件路径
    • 日志文件路径:hdfs://master01:8020/spark_event_log_dir
  • 关闭内存检查的原因:Spark任务吃资源,不能为了保证资源不被过度使用就强行终止Spark任务。

三、任务提交案例

1. 案例详解

ProductsAnalyzer类

val path = args(0) val dirPath = args(1) 表示输入和输出路径在提交任务时具体指定。

package envtestimport core.{SparkSessionBuilder, SparkSessionBuilderDeployer}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Windowobject ProductsAnalyzer {case class Product(name: String, price: Double, date: String, market: String, province: String, city: String)implicit class StrExt(line: String){// 香菜	2.80	2018/1/1	山西汾阳市晋阳农副产品批发市场	山西	汾阳val regexProduct = "(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)".rdef toProduct: Product = {line match {// 使用模式匹配将字符串和正则表达式进行匹配,如果成功,则将各个匹配到的部分转换为Product对象并进行返回// java.lang.NumberFormatException: empty String:输入数据中的某些行可能不符合预期的格式,导致 toDouble 转换失败。解决方案是:使其转化为Product对象时,如果为空,则转化为0.0fcase regexProduct(name, price, date, market, province, city) => Product(name, if (price.trim.isEmpty) 0.0f else price.toDouble, date, market, province, city)case _ => throw new RuntimeException(s"产品数据格式有误:$line")}}}def main(args: Array[String]): Unit = {/*** 在Edit Configurations中指定参数,通过args[0]和args[1]获取输入路径和输出路径*/val path = args(0)val dirPath = args(1)val ssb: SparkSessionBuilderDeployer = SparkSessionBuilderDeployer()val spark = ssb.sparkval sc: SparkContext = ssb.scval rddProduct: RDD[Product] = sc.textFile(path).mapPartitions(_.map(_.toProduct))import org.apache.spark.sql.functions._import spark.implicits._// 统计每个省份下每个商品的平均价格和商品数量,并且在不同省份下按照商品数量创建排名指标spark.createDataFrame(rddProduct).groupBy($"province",$"name").agg(avg($"price").as("avg_price"),count("*").as("product_cnt")).select($"province",$"name",$"avg_price",$"product_cnt",dense_rank().over(Window.partitionBy($"province").orderBy($"avg_price".desc)).as("rank")).repartition(1).write.mode(saveMode = "overwrite").option("separator",",").option("header", "true").csv(dirPath)spark.stop()}
}

SparkSessionBuilderDeployer类

SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。

package coreimport org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}// 封装SparkSession的创建方法
// SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。
class SparkSessionBuilderDeployer(){lazy val config:SparkConf = {new SparkConf()}lazy val spark:SparkSession = {SparkSession.builder().config(config).getOrCreate()}lazy val sc:SparkContext = {spark.sparkContext}def stop(): Unit = {if (null != spark) {spark.stop()}}
}
object SparkSessionBuilderDeployer {def apply(): SparkSessionBuilderDeployer = new SparkSessionBuilderDeployer
}

在具体测试案例代码时,可以先通过对pathdirpath指定具体路径的方式来测试整体逻辑,再通过Edit Configurations设置具体参数来测试传参后的程序逻辑,测试完毕之后,即可直接将输入和输出路径分别修改为args(0)args(1),并且直接打包发布到虚拟机,供给Spark集群进行执行。

2. Spark Submit 选项详解
  • 主要选项

    • –class :

      • 指定应用程序的主类,即包含 main 方法的类。
      • 示例:--class envtest.ProductsAnalyzer
    • –master :

      • 指定 Spark Master 节点的 URL。

      • 示例:

        • Standalone 模式:

          --master spark://master01:7077
          
          • 表示 Spark Master 节点运行在 master01 主机的 7077 端口上。
        • YARN 模式:

          --master yarn
          
          • 使用 YARN 作为资源管理器。
    • –deploy-mode :

      • 指定应用程序的运行模式。
      • 模式:
        • client:Driver 程序运行在提交任务的本地机器上。
        • cluster:Driver 程序运行在集群的某个节点上。
      • 示例:--deploy-mode cluster
    • –name :

      • 指定应用程序的名称。
      • 示例:--name spark-sql-product
    • :

      • Spark 应用程序的 JAR 文件的路径,包含了主类及其依赖。
      • 示例:/root/spark/spark_sql-1.0.jar
    • <application-argument(s)>:

      • 应用程序的输入参数。
      • 示例:
        • 输入路径:hdfs://master02:8020/spark/data/products.txt
        • 输出路径:hdfs://master02:8020/spark/result/product_result
  • 具体代码

standalone

spark-submit \
--class envtest.ProductsAnalyzer \
--master spark://master01:7077 \
--deploy-mode client \
--name spark-sql-product \
/root/spark/spark_sql-1.0.jar \
hdfs://master02:8020/spark/data/products.txt \
hdfs://master02:8020/spark/result/product_result

yarn-cluster

spark-submit \
--class envtest.ProductsAnalyzer \
--master yarn \
--deploy-mode cluster \
--name spark-sql-product \
/root/spark/spark_sql-1.0.jar \
hdfs://master02:8020/spark/data/products.txt \
hdfs://master02:8020/spark/result/product_result

在这里插入图片描述


http://www.ppmy.cn/server/61112.html

相关文章

struts2如何防止XSS脚本攻击(XSS防跨站脚本攻击过滤器)

只需要配置一个拦截器即可解决参数内容替换 一、配置web.xml <filter><filter-name>struts-xssFilter</filter-name><filter-class>*.*.filters.XssFilter</filter-class></filter><filter-mapping><filter-name>struts-xss…

自用的C++20协程学习资料

C20的一个重要更新就是加入了协程。 在网上找了很多学习资料&#xff0c;看了之后还是不明白。 最后找到下面这些资料总算是讲得比较明白&#xff0c;大家可以按照顺序阅读&#xff1a; 渡劫 C 协程&#xff08;1&#xff09;&#xff1a;C 协程概览C20协程原理和应用

FastAPI 学习之路(四十一)定制返回Response

接口中返回xml格式内容 from fastapi import FastAPI, Responseapp FastAPI()# ① xml app.get("/legacy") def get_legacy_data():data """<?xml version"1.0"?><shampoo><Header>Apply shampoo here.</Header&…

《基于深度学习的车辆重识别算法研究与系统实现》论文分析概要

目录 一、选题背景与研究意义 二、国内外研究现状 三、算法创新点 四、实验与结果分析 五、系统实现 六、总结与展望 本文题为《基于深度学习的车辆重识别算法研究与系统实现》,由华东师范大学齐恬恬撰写,旨在研究利用深度学习技术提升车辆重识别的精度,并搭建…

解决IntelliJ IDEA中克隆GitHub项目不显示目录结构的问题

前言 当您从GitHub等代码托管平台克隆项目到IntelliJ IDEA&#xff0c;却遇到项目目录结构未能正确加载的情况时&#xff0c;不必太过困扰&#xff0c;本文将为您提供一系列解决方案&#xff0c;帮助您快速找回丢失的目录视图。 1. 调整Project View设置 操作步骤&#xff1…

AV1 编码标准中帧内预测技术概述

AV1 编码标准帧内预测 AV1&#xff08;AOMedia Video 1&#xff09;是一种开源的视频编码格式&#xff0c;旨在提供比现有标准更高的压缩效率和更好的视频质量。在帧内预测方面&#xff0c;AV1相较于其前身VP9和其他编解码标准&#xff0c;如H.264/AVC和H.265/HEVC&#xff0c;…

机器学习——决策树(笔记)

目录 一、认识决策树 1. 介绍 2. 决策树生成过程 二、sklearn中的决策树 1. tree.DecisionTreeClassifier&#xff08;分类树&#xff09; &#xff08;1&#xff09;模型基本参数 &#xff08;2&#xff09;模型属性 &#xff08;3&#xff09;接口 2. tree.Decision…

初始网络知识

前言&#x1f440;~ 上一章我们介绍了使用java代码操作文件&#xff0c;今天我们来聊聊网络的一些基础知识点&#xff0c;以便后续更深入的了解网络 网络 局域网&#xff08;LAN&#xff09; 广域网&#xff08;WAN&#xff09; 路由器 交换机 网络通信基础 IP地址 端…