Python学习从0到1 day27 第三阶段 Spark ③ 数据计算 Ⅱ

server/2024/11/14 13:38:29/

目录

一、Filter方法

功能

语法

代码

总结

filter算子

二、distinct方法

功能

语法

代码

总结

distinct算子

三、SortBy方法

功能

语法

代码 

总结

sortBy算子

四、数据计算练习

需求:

解答

总结

去重函数:

过滤函数:

转换函数:

排序函数:


于是我驻足,享受无法复刻的一些瞬间

                                                        —— 24.11.9

一、Filter方法

功能

过滤想要的数据进行保留

语法

基于filter中我们传入的函数,决定rdd对象中哪个保留哪个丢弃

代码

from pyspark import SparkConf,SparkContext# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对RDD的数据进行过滤,保留奇数,去除偶数# 方法1:
def Retain(data):if data % 2 == 1:return Trueelse:return False# 对RDD数据进行过滤,留下奇数
rdd1 = rdd.filter(Retain)
print(rdd1.collect())# 方法2:
rdd2 = rdd.filter(lambda num:num % 2 == 1)
print(rdd2.collect())


总结

filter算子

接受一个处理函数,可用lambda匿名函数快速编写

函数对RDD数据逐个处理,得到True的保留到返回值的RDD中


二、distinct方法

功能

对RDD数据进行去重,返回新RDD

语法

rdd.distinct()    # 无需传参

代码

from pyspark import SparkConf,SparkContext# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1,3,3,4,4,4,7,8,9,9])
rdd = rdd.distinct()
print(rdd.collect())


总结

distinct算子

完成对Rdd内数据的去重操作


三、SortBy方法

功能

对RDD数据进行排序,基于指定的排序依据

语法

rdd.sortBy()

rdd.sortBy(func, ascending = False, numPartitions = 1)
# func:(T) - > U: 告知按照rdd中的哪个数据进行排序,比如 lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending: True升序 False 降序
# numPartitions: 用多少分区排序

代码 

from pyspark import SparkConf,SparkContext# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 读取数据文件
rdd = sc.textFile("D:/2LFE\Desktop\WordCount.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda x:x.split(" "))
print(word_rdd.collect())# 将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word:(word,1))
# 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 对结果进行排序
result_rdd = result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
# 打印并输出结果
print(result_rdd.collect())


总结

sortBy算子

接收一个处理函数,可用lambda快速编写

函数表示用来决定排序的依据

可以控制升序或降序

全局排序需要设置分区数为1


四、数据计算练习

需求:

复制以上内容到文件中,使用Spark读取文件进行计算:

① 各个城市销售额排名,从大到小

② 全部城市,有哪些商品类别在售卖

③ 北京市有哪些商品类别在售卖

解答

from pyspark import SparkConf,SparkContext
import json# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 读取文件得到RDD
file_rdd = sc.textFile("E:\python.learning\pyspark\sortBy.txt")# 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x:x.split("|"))# 将一个JSON字符串转换为字典 json模块
dict_rdd = json_str_rdd.map(lambda x:json.loads(x))# 取出城市和销售额数据:(城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))# 按销售额对结果进行聚合然后根据销售额降序排序
city_result_rdd = city_with_money_rdd.reduceByKey(lambda x,y:x+y)
res1 = city_result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
print("需求1结果:" , res1.collect())# 需求2 对全部商品进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2结果:",category_rdd.collect())# 需求3 过滤北京市的数据
BJ_data_rdd = dict_rdd.filter(lambda x:x['areaName'] == '北京')
print("需求3结果:",BJ_data_rdd.collect())# 需求4 对北京市的商品类别进行商品类别去重
res2 = BJ_data_rdd.map(lambda x:x['category']).distinct()
print("需求4结果:",res2.collect())


总结

去重函数:

在 PySpark 框架下,distinct函数用于返回一个新的 RDD,其中包含原始 RDD 中的不同元素。

过滤函数:

filter函数用于从弹性分布式数据集(RDD)中筛选出满足特定条件的元素,返回一个新的 RDD 只包含满足条件的元素。

转换函数:

在 PySpark 中,map函数是对弹性分布式数据集(RDD)进行转换操作的一种重要方法。map函数对 RDD 中的每个元素应用一个函数,返回一个新的 RDD,其中包含应用函数后的结果。

排序函数:

sortBy 函数用于对RDD 中的元素进行排序,它接受一个函数或者一个字段名作为参数,根据这个参数来确定排序的依据。


http://www.ppmy.cn/server/140842.html

相关文章

10. java基础知识(下)

文章目录 一、一带而过二、字符串类型String1. 简单了解2. 关于结束符\03. 自动类型转换与强制类型转换 三、API文档与import导包1. API文档2. import导包 四、java中的数组1. 创建2. 遍历3. 补充4. Arrays类① 简单介绍② 练习 五、方法的重载六、规范约束七、内容出处 一、一…

订单分库分表

一、引言 在当今互联网时代,随着电商、金融等行业的快速发展,订单数量呈爆炸式增长。传统的单一数据库存储订单信息的方式面临着巨大的挑战,如数据存储容量有限、查询性能下降、数据备份和恢复困难等。为了解决这些问题,分库分表技…

讨论一个mysql事务问题

最近在阅读一篇关于隔离级别的文章,文章中提到了一种场景,我们下面来分析一下。 文章目录 1、实验环境2、两个实验的语句执行顺序3、关于start transaction和start transaction with consistent snapshot4、实验结果解释4.1、实验14.2、实验24.3、调整实…

torch.full函数介绍

torch.full 是 PyTorch 中用于创建一个具有指定形状、填充值和数据类型的张量的函数。它非常适用于需要初始化特定数值的张量的情况,比如将所有元素填充为一个常量值。 函数定义 torch.full(size, fill_value, *, dtypeNone, layouttorch.strided, deviceNone, re…

多平台编包动态引入依赖的解决方案

最近开发时遇到了这样的需求,A 平台需要引入一个 video.js,B 平台却是不需要的,那么面向 B 平台打包的时候把依赖装进去自然就不大合适。最好的方法是动态引入依赖,根据平台来判断要不要引入 动态引入依赖 很快啊,动…

剑指offer第九天

1.数组中只出现一次的两个数 #include <vector> class Solution { public:vector<int> FindNumsAppearOnce(vector<int>& nums) {// write code hereint v 0;for(int&x:nums)v^x;int cnt 0;while(v){v>>1;cnt;}int a 0,b 0;for(int&x…

UE5.1 控制台设置帧率

仅个人记录&#xff0c;未经过严格验证。 也可通过控制台命令蓝图节点&#xff0c;在运行时执行 锁帧&#xff1a; 0->120帧 1-》60帧

【报错解决】使用@SpringJunitConfig时报空指针异常

报错描述 具体的报错如下图所示&#xff1a; 我的测试代码如下&#xff1a; import config.StudentConfig; import controller.StudentController; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test…