Spark算子使用-Map,FlatMap,Filter,diatinct,groupBy,sortBy

news/2024/10/15 15:53:09/

目录

Map算子使用

FlatMap算子使用

Filter算子使用-数据过滤

Distinct算子使用-数据去重

groupBy算子使用-数据分组

sortBy算子使用-数据排序


Map算子使用

# map算子主要使用长场景,一个转化rdd中每个元素的数据类型,拼接rdd中的元素数据,对rdd中的元素进行需求处理
# 需求,处理hdfs中的学生数据,单独获取每个学生的信息
from pyspark import SparkContextsc = SparkContext()# 1-读取数据
rdd = sc.textFile("hdfs://node1:8020/data/student.txt")
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x : x.split(','))
# 3-从rdd2中获取姓名数据
rdd3 =rdd2.map(lambda x : x[1])# lambda 函数能进行简单的数据计算,如果遇到复杂数据计算时,就需要使用自定义函数
# 获取年龄数据,并且转化年龄数据为int类型,将年龄和性别合并一起保存成元组
## 获取年龄
def func(x):# 1-切割数据data_split = x.split(',')# 2-转换数据类型age = int(data_split[3])# 3-拼接性别与年龄data_tuple = (data_split[2],age)return data_tuple# 将函数的名字传递到map中,不要加括号
rdd4 = rdd.map(func)# 触发执行算子,查看读取的数据
res = rdd.collect()
print(res)res2 = rdd2.collect()
print(res2)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)

FlatMap算子使用

# FlatMap算子使用
# 主要场景是对二维嵌套的数据降维操作  [[1,张三],[2,李四],[3,王五]]  --->> [1,张三,2,李四,3,王五]
from pyspark import SparkContextsc = SparkContext()# 生成的rdd
rdd = sc.parallelize([['1', 'alice', 'F', '32'], ['2', 'Tom', 'M', '22'], ['3', 'lili', 'F', '18'], ['4', 'jerry', 'M', '24']])# 使用flatmap
rdd1 = rdd.flatMap(lambda x: x)  # 直接返回x,会自动将x中的元素数据取出,放入新的rdd中# 查看数据
res = rdd1.collect()
print(res)

Filter算子使用-数据过滤

# RDD数据过滤
# 需求:过滤年龄大于20岁的信息
from pyspark import  SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/student.txt')# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
# 使用fliter方法进行数据过滤
# lambda x:过滤条件  可以当成 if 操作  if 条件
# 符合条件的数据会返回保存在新的rdd中
rdd3 = rdd2.filter(lambda x :int(x[3]) > 20)# 查看数据
res = rdd2.collect()
print(res)res3 = rdd3.collect()
print(res3)

Distinct算子使用-数据去重

# distinct  去重算子
# rdd中有重复数据时,可以进行去重
from pyspark import  SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/student.txt')# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))# 3-从rdd2中获取性别数据
rdd3 = rdd2.map(lambda x : x[2])# 对rdd3中重复数据去重
rdd4 = rdd3.distinct()# 查看数据
res = rdd3.collect()
print(res)res1 = rdd4.collect()
print(res1)

groupBy算子使用-数据分组

from pyspark import  SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/student.txt')# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))# 3-对性别进行分组
# lambda x: hash取余的计算  hash(数据)%分组数      余数相同的数据会放在一起
rdd3 = rdd.groupBy(lambda x:hash(x[2]) % 2)
# 查看分组的数据内容  mapValues 取出分组后的数据值,对数据值转为列表即可
rdd4 = rdd3.mapValues(lambda x:list(x))# 查看数据
res2 = rdd2.collect()
print(res2)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)

分组算子用到了哈希算法,lambda x: hash取余的计算  hash(数据)%分组数      余数相同的数据会放在一起
rdd3 = rdd.groupBy(lambda x:hash(x[2]) % 2)

sortBy算子使用-数据排序

# RDD的数据排序
from pyspark import SparkContextsc = SparkContext()# 创建数据
# 非kv数据
rdd = sc.parallelize([10,45,27,18,5,29])# 在spark中可以使用元组表示kv数据(k,v)
rdd2 = sc.parallelize([('张三',27),('李四',18),('王五',31),('赵六',21)])rdd1 = sc.parallelize([(666,'火眼金睛'),(2000,'筋斗云'),(888,'顺风耳'),(1314,'降龙十八掌')])# 数据排序
# 非kv数据
rdd3 = rdd.sortBy(lambda x: x)  # 默认升序,从小到大排
rdd4 = rdd.sortBy(lambda x: x,ascending=False)  # 降序# kv数据排序 x接收(k,v)数据  需要指定采用哪个值进行排序
# 根据v值进行排序
rdd5 = rdd2.sortBy(lambda x: x[1])
rdd6 = rdd2.sortBy(lambda x: x[1],ascending=False)# 根据k值进行排序
rdd7 = rdd1.sortBy(lambda x: x[0])
rdd8 = rdd1.sortBy(lambda x: x[0],ascending=False)# 查看结果
# 非kv数据
res1 = rdd3.collect()
res2 = rdd4.collect()
print(res1)
print(res2)# kv数据排序
res5 = rdd5.collect()
res6 = rdd6.collect()
print(res5)
print(res6)res7 = rdd7.collect()
res8 = rdd8.collect()
print(res7)
print(res8)

join算子使用-数据关联

准备数据,模拟表关联

students.txt

students2.txt

from pyspark import SparkContext
# rdd也是使用join算子进行kv数据关联 ,如果需要将多个rdd数据关联在一起
# 需要现将rdd的数据转为kv结构,关联的字段数据作为key
sc = SparkContext()
# 分别读取两个文件数据
rdd1 = sc.textFile('hdfs://node1:8020/data/students.txt')
rdd2 = sc.textFile('hdfs://node1:8020/data/students2.txt')# 切割行数
rdd_line1 = rdd1.map(lambda x:x.split(','))
rdd_line2 = rdd2.map(lambda x:x.split(','))# 将rdd数据进行关联
# 将关联的数据转为kv结构
rdd_kv1 = rdd_line1.map(lambda x:(x[0],x))
rdd_kv2 = rdd_line2.map(lambda x:(x[0],x))# 使用join关联
rdd_join = rdd_kv1.join(rdd_kv2) # 内关联
rdd_leftjoin = rdd_kv1.leftOuterJoin(rdd_kv2) # 左关联
rdd_rightjoin = rdd_kv1.rightOuterJoin(rdd_kv2) # 右关联# 查看数据res3 = rdd_join.sortBy(lambda x:x[0]).collect() # 找相同数据
print(res3)res4 = rdd_leftjoin.collect() # 左表数据全部展示,右边右相同数据展示,没有相同数据为空None
print(res4)res5 = rdd_rightjoin.collect() # 右表数据全部展示,左边右相同数据展示,没有相同数据为空None
print(res5)

 join内关联:只有共同的才展示

leftOuterJoin左关联:左表数据全部展示,右边右相同数据展示,没有相同数据为空None

rightOuterJoin右关联:右表数据全部展示,左边右相同数据展示,没有相同数据为空None


http://www.ppmy.cn/news/1539491.html

相关文章

分治算法(7)_归并排序_计算右侧小于当前元素的个数

个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 分治算法(7)_归并排序_计算右侧小于当前元素的个数 收录于专栏【经典算法练习】 本专栏旨在分享学习算法的一点学习笔记,欢迎大家在评论区交流讨论&…

全面掌握 Linux 服务管理:从入门到精通

全面掌握 Linux 服务管理:从入门到精通 引言 在 Linux 系统中,服务管理是系统管理员和开发者的基本技能之一。无论是启动、停止、重启还是查看服务状态,systemctl 命令都能让你轻松完成这些操作。今天,我们将深入探讨如何使用 sy…

Linux操作系统——外存的管理(实验报告)

实验 Linux系统外存管理 一、实验目的 熟练Linux系统外存管理的方法与命令。 二、实验环境 硬件:PC电脑一台,网络正常。 配置:win10系统,内存大于8G 硬盘500G及以上。 软件:VMware、Ubuntu16.04。 三、实验内容 …

Python和CUDA(C++)量子退火和伊辛二次算法模型

🎯要点 简化量子退火或离散优化算法处理,使用张量网络模拟和动态系统方法及神经网络逼近。实现并行退火算法和CUDA支持下穷举搜索法。使用大都会算法模拟二维自旋玻璃伊辛模型并测量磁化率、比热容和能量。对比其他组合优化解方法,使用英伟达…

上百种【基于YOLOv8/v10/v11的目标检测系统】目录(python+pyside6界面+系统源码+可训练的数据集+也完成的训练模型)

待更新(持续更新),早关注,不迷路............................................................................... 目标检测系统操作说明【用户使用指南】(pythonpyside6界面系统源码可训练的数据集也完成的训练模型&#xff…

玩机搞机基本常识-----如何在 Android 中实现默认开启某个功能 修改方法列举

我们有时候需要对安卓系统进行修改。实现其中的某些功能。让用户使用得心应手。节约时间。那么如果要实现系统中的有些功能选项开启或者关闭。就需要对系统有一定的了解。那么在 Android 中实现默认开启某个功能可以通过以下几种方式: 一、在应用的设置中添加选项 …

【mysql】统计两个相邻任务/事件的间隔时间以及每个任务的平均用时

准备步骤1. 设置查询参数部分1.1 设置需要分析的起始时间1.2. 设置需要分析的时间的长度(分析的结束时间)1.3. 设置分析内容1.4. 设置需要分析的表和字段 2. 自动计算分析2.1 设置起始序号2.2. 筛选user_log表数据并生成带序号的临时表temp_ria2.3. 通过…

Elasticsearch Suggester

概述 Elasticsearch里设计了4 种类别的 Suggester Term Suggester:词条建议器。对给输入的文本进进行分词,为每个分词提供词项建议。Phrase Suggester:短语建议器,在term的基础上,会考量多个term之间的关系Completio…