大数据学习之Spark分布式计算框架RDD、内核进阶

news/2025/2/7 11:21:18/

一.RDD

28.RDD_为什么需要RDD

29.RDD_定义

30.RDD_五大特性总述

31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

演示代码:
// 获取当前 RDD 的分区数
@Since ( "1.6.0" )
final def getNumPartitions : Int =
partitions . length
// 显示出 RDD 被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def glom (): RDD [ Array [ T ]]
1
2
3
4
5
6
package com . itbaizhan . rdd
//1. 导入 SparkConf 类、 SparkContext
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByParallelize {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象。并设置本地运行和程序的
名称
val conf = new
SparkConf (). setMaster ( "local[2]" ). setAppName
( "CreateRdd1" )
//3. 构建 SparkContext 对象
val sc = new SparkContext ( conf )
//4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的
RDD 对象
1
2
3
4
5
6
7
8
9
10
11
12
79    
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
val rdd : RDD [ Int ] =
sc . parallelize ( List ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ,
8 ), 3 )
//5. 输出默认的分区数
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//8, 默认当前系统的
CPU
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//2
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
println ( " 默认分区
数: " + rdd . getNumPartitions ) //3
//6.collect 方法:将 rdd 对象中每个分区的数据,都
发送到 Driver ,形成一个 Array 对象
val array1 : Array [ Int ] = rdd . collect ()
println ( "rdd.collect()=" + array1 . mkString ( ",
" ))
//7. 显示出 rdd 对象中元素被分布到不同分区的数据信
13
14
15
16
17
18
19
20
21
22
23
24
25
80 运行结果:
实时效果反馈
1. 以下关于并行化创建 RDD 的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式 RDD
B
parallelize() 方法必须传递两个参数。
C
parallelize 没有给定分区数 , 默认分区数等于执行程序的当前
服务器 CPU 核数。
答案:
val array2 : Array [ Array [ Int ]] =
rdd . glom (). collect ()
println ( "rdd.glom().collect() 的内容是 :" )
/*for(eleArr<- array2){
println(eleArr.mkString(","))
}*/
array2 . foreach ( eleArr => println ( eleArr . mkStr
ing ( "," )))
}
}
26
27
28
29
30
31
32
33
默认分区数: 3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect() 的内容是 :
1,2
3,4,5
6,7,8

39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD

扩展 wholeTextFiles 适合读取一堆小文件:
//path 指定小文件的路径目录
//minPartitions 最小分区数 可选参数
def wholeTextFiles ( path :
String , minPartitions : Int =
defaultMinPartitions ): RDD [( String , String )]
1
2
3
85 代码演示:
package com . itbaizhan . rdd
//1. 导入类
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByWholeTextFiles {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "WholeTextFiles" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 读取指定目录下的小文件
val rdd : RDD [( String , String )] =
sc . wholeTextFiles ( "data/tiny_files" )
//(filePath1, " 内容 1"),(filePath2, " 内容
2"),...,(filePathN, " 内容 N")
val tuples : Array [( String , String )] =
rdd . collect ()
tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))
//6. 获取小文件中的内容
val array : Array [ String ] =
rdd . map ( _ . _2 ). collect ()
println ( "---------------------------" )
println ( array . mkString ( "|" ))
//4. 关闭 sc 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86 运行输出结果 :
RDD_ 算子概述
定义: 分布式集合 RDD 对象的方法被称为算子
算子分类:
Transformation 转换算子
1
Action 行动算子
2
sc . stop ()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql

41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

object RddGroupBy {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "groupBy" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 创建 Rdd
val rdd : RDD [( Char , Int )] =
sc . parallelize ( Array (( 'a' , 1 ), ( 'a' , 2 ),
( 'b' , 1 ), ( 'b' , 2 ), ( 'a' , 3 ), ( 'a' , 4 )))
//6. 通过 groupBy 算子对 rdd 对象中的数据进行分组
//groupBy 插入的函数的用意是指定按照谁进行分组
// 分组后的结果是有二元组组成的 RDD
val gbRdd : RDD [( Char , Iterable [( Char ,
Int )])] = rdd . groupBy ( tupEle => tupEle . _1 )
// 收集到 Driver
val result1 : Array [( Char ,
Iterable [( Char , Int )])] = gbRdd . collect ()
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
println ( result1 . mkString ( "," ))
//7. 使用 map 转换算子
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
val result2 : Array [( Char , List [( Char ,
Int )])] = gbRdd . map ( tup => ( tup . _1 ,
tup . _2 . toList )). collect ()
println ( result2 . mkString ( "," ))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104 实时效果反馈
1. 以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy 传入的函数的意思是 : 通过这个函数 , 确定按照谁来
分组。
B
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
数只能为 2
C
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
>=2
答案:
1=>B

49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey

52.RDD_转换算子union并集

53.RDD_转换算子交集和差集

54.RDD_转换算子关联算子

55.RDD_转换算子partitionBy

56.RDD_转换算子mapPatitions

57.RDD_转换算子sample

58.RDD_行动算子foreachPartition

59.RDD_行动算子foreach

60.RDD_行动算子saveAsTestFile

61.RDD_行动算子countByKey

62.RDD_行动算子reduce

63.RDD_行动算子fold

64.RDD_行动算子first_take_count

65.RDD_行动算子top_takeOrderd

66.RDD_行动算子takeSample

二.内核进阶

67.内核进阶_DAG概述

68.内核进阶_血缘关系

69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分

71.内核进阶_任务调度概述

72.内核进阶_管道计算模式上

73.内核进阶_管道计算模式下

74.内核进阶_cache缓存

75.内核进阶_checkpoint检查点

76.内核进阶_cache和checkpoint区别

77.内核进阶_并行度

78.内核进阶_广播变量

79.内核进阶_累加器一

80.内核进阶_累加器二

81.内核进阶_累加器之重复计算

82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析

84.内核进阶_项目实战UV分析

85.内核进阶_二次排序实战

86.内核进阶_分组取topN实战

87.内核进阶_卡口统计项目需求分析

88.内核进阶_卡口统计项目统计正常的卡口

89.内核进阶_卡口统计项目TOP5

90.内核进阶_卡口统计项目统计不同区域同时出现的车辆

91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二

93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三

94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四


http://www.ppmy.cn/news/1570051.html

相关文章

蓝桥杯刷题 DAY4:小根堆 区间合并+二分

import os import sys import heapq# 请在此输入您的代码if __name__"__main__":x,n map(int,input().split())l[]a[0]*nb[0]*nc[0]*nq[]for i in range(n):l.append(list( map( int ,input().split()) ))l.sort(keylambda pair:-pair[1])total0j0for i in range(x,0…

使用Ollama 在Ubuntu运行deepseek大模型:以DeepSeek-coder为例

DeepSeek大模型这几天冲上热搜啦&#xff01; 咱们来亲身感受下DeepSeek模型的魅力吧&#xff01; 整个操作流程非常简单方便&#xff0c;只需要2步&#xff0c;先安装Ollama&#xff0c;然后执行大模型即可。 安装Ollama 在Ubuntu下安装Ollama非常简单&#xff0c;直接sna…

GB/T 43698-2024 《网络安全技术 软件供应链安全要求》标准解读

一、43698-2024标准图解 https://mmbiz.qpic.cn/sz_mmbiz_png/rwcfRwCticvgeBPR8TWIPywUP8nGp4IMFwwrxAHMZ9Enfp3wibNxnfichT5zs7rh2FxTZWMxz0je9TZSqQ0lNZ7lQ/640?wx_fmtpng&fromappmsg 标准在线预览&#xff1a; 国家标准|GB/T 43698-2024 相关标准&#xff1a; &a…

基于单片机的智能感控杆设计(论文+源码)

2.1功能设计 本次以智能感控杆设计为题&#xff0c;智能感控杆是一种可以应用在多种场合的设备&#xff0c;可以极大的节约人类的精力和时间。在此将其主要功能设计如下&#xff1a; 1.LCD1602液晶显示当前感控杆状态开启/关闭&#xff0c;显示当前模式手动/自动&#xff1b…

K8S学习笔记-------2.极简易懂的入门示例

K8S学习笔记-------2.极简易懂的入门示例 1. 准备应用代码1.1 确保 Node.js 和 npm 已安装1.2. 创建项目目录并初始化项目1.3. 安装 Express1.4 验证安装 2.容器化应用2.1 准备 Dockerfile2.2 构建镜像 3.编写K8s配置文件创建 Deployment创建 Service 4.部署到K8s集群应用Deplo…

技术架构师成长路线(2025版)

目录 通用知识 计算机原理&#xff08;1 - 2 个月&#xff09; 数据结构&#xff08;2 - 3 个月&#xff09; 网络编程&#xff08;1 - 2 个月&#xff09; 软件工程&#xff08;1 个月&#xff09; 基础知识 Java 编程语言基础&#xff08;2 - 3 个月&#xff09; JVM&…

【深度学习框架】MXNet(Apache MXNet)

MXNet&#xff08;Apache MXNet&#xff09;是一个 高性能、可扩展 的 开源深度学习框架&#xff0c;支持 多种编程语言&#xff08;如 Python、R、Scala、C 和 Julia&#xff09;&#xff0c;并能在 CPU、GPU 以及分布式集群 上高效运行。MXNet 是亚马逊 AWS 官方支持的深度学…

MongoDB深度解析与实践案例

MongoDB深度解析与实践案例 在当今大数据盛行的时代,NoSQL数据库以其灵活的数据模型和水平扩展能力,成为了众多应用场景下的首选。MongoDB,作为NoSQL数据库的领军者之一,凭借其面向文档的存储方式、强大的查询功能以及丰富的生态系统,在众多领域大放异彩。本文将从MongoD…