Spark+实例解读

server/2024/10/21 12:00:05/

第一部分 Spark入门

学习教程:Spark 教程 | Spark 教程

Spark 集成了许多大数据工具,例如 Spark 可以处理任何 Hadoop 数据源,也能在 Hadoop 集群上执行。大数据业内有个共识认为,Spark 只是Hadoop MapReduce 的扩展(事实并非如此),如Hadoop MapReduce 中没有的迭代查询和流处理。然而Spark并不需要依赖于 Hadoop,它有自己的集群管理系统。更重要的是,同样数据量,同样集群配置,Spark 的数据处理速度要比 Hadoop MapReduce 快10倍左右。

Spark 的一个关键的特性是数据可以在内存中迭代计算,提高数据处理的速度。虽然Spark是用 Scala开发的,但是它对 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。

第二部分 SparkCore

2 RDD

2.1 转换算子-map

map是将RDD的数据一条条处理,返回新的RDD

# 定义方法
def add(data):return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap

flatMap对RDD执行map操作,然后执行解除嵌套操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum

2.3转换算子-reduceByKey

针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作

rdd.reduceByKey(func)

      val clickStat = joinDf.where(F.col("active_type")==="click").rdd.map(row => {val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat)))mapInfo match {case Some(x) => xcase _ => null}}).filter(_!=null).flatMap(x=>x).reduceByKey(_+_)
2.4 转换算子-mapValues

针对二元元组RDD,对其内部的二元元组的value进行map操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy

将RDD的数据进行分组

rdd.groupBy(func)

rdd = sc.parallelize([('a',1),('a',11),('b',1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
​
    val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat]).map(l=>{(l.getUid,l.getContent_properties.get(0).getId)}).toDF("uid", "docid").groupBy($"uid")
2.6 转换算子-filter

过滤想要的数据进行保存

rdd = sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
    val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath).select("_c0").withColumnRenamed("_c0", "userid").withColumn("flow", getexpId($"userid")).filter($"flow" >= start and $"flow" <= end).select("userid").dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区val userContentHis = hisPathList.map(path =>{val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])println(s"hisData ==>${hisData.count()}")hisData}).reduce(_ union _).distinct().repartition(partition)
union算子 
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重  rdd的类型不同也是可以合并的
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
​
2.rdd的分区数怎么查看? 
通过getNumPartitions API查看,返回int
​
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
​
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile

3 RDD的持久化

3.1 RDD的持久化

rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失

3.2 RDD的缓存

val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint

也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系

3.4 缓存面试题

4 案例

4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源?
计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数
​

5 广播变量 累加器


http://www.ppmy.cn/server/90199.html

相关文章

elasticsearch 解决全模糊匹配最佳实践

事件背景&#xff1a; 某 CRM 系统&#xff0c;定义了如下两个表&#xff1a; 客户表 t_custom 字段名 类型 描述 idlong自增主键phonestring客户手机......... 客户产品关系表 t_custom_product 字段名 类型 描述 idlong自增主键custom_idlong客户idproduct_idlong产品…

【BUG】已解决:SyntaxError:positional argument follows keyword argument

SyntaxError:positional argument follows keyword argument 目录 SyntaxError:positional argument follows keyword argument 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c…

CeoMax总裁主题最新3.8.1破解免授权版/WordPress付费资源素材下载主题

CeoMax总裁主题最新3.8.1破解免授权版&#xff0c;一套WordPress付费资源素材下载的主题&#xff0c;感觉这是做资源站唯一一个可以和ripro媲美甚至超越的模板&#xff0c;UI很美&#xff0c;功能也很强大&#xff0c;有想学习的可下载搭建学习一下&#xff0c;仅供学习研究借鉴…

探索Prompt的世界

在人工智能&#xff08;AI&#xff09;和自然语言处理&#xff08;NLP&#xff09;的飞速发展中&#xff0c;prompt技术作为一种与语言模型交互的重要方式&#xff0c;正逐步占据中心舞台。为了对prompt这一概念进行全面介绍&#xff0c;我们将从其发展历史、运行原理、调试方式…

springSecurity学习之springSecurity用户单设备登录

用户只能单设备登录 有时候在同一个系统中&#xff0c;只允许一个用户在一个设备登录。 之前的登陆者被顶掉 将最大会话数设置为1就可以保证用户只能同时在一个设备上登录 Override protected void configure(HttpSecurity http) throws Exception {http..anyRequest().aut…

基于stm32的多旋翼无人机(Multi-rotor UAV based on stm32)

由于一直在调试本项目&#xff0c;好久没有发文章&#xff0c;最近本项目的PID调试初见成效&#xff01;开始正文前首先感谢各位粉丝的支持&#xff0c;以及对本项目技术上支持的老师以及师兄&#xff0c;谢谢你们&#xff01; 基于stm32的多旋翼无人机 一、多旋翼无人机飞行原…

FastGPT部署和接入使用重排模型bce-reranker-base

bce-reranker简介 bce-reranker 是一种专门用于信息检索和自然语言处理领域中的重排序(reranking)模型。这种模型由北京智源人工智能研究院(BAAI)开发,是 BGE(BAAI General Embedding)系列的一部分。BGE 系列模型专注于提供通用的嵌入表示,而 bce-reranker 则更进一步…

文件包涵条件竞争(ctfshow82)

Web82 利用 session.upload_progress 包含文件漏洞 <!DOCTYPE html> <html> <body> <form action"https://09558c1b-9569-4abd-bf78-86c4a6cb6608.challenge.ctf.show//" method"POST" enctype"multipart/form-data"> …