spark的学习-05

news/2024/11/16 2:22:24/

SparkSql

结构化数据与非结构化数据

结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据

结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc

结构化的表:数据库中表的数据:MySQL、Oracle、Hive

我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢

使用的是DataFrame进行数据的导入

将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema

一个经典的wordcount案例:

代码如下:(里面有sql和dsl两种写法)

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()spark.stop()

以上的代码还可以使用with进行优化

补充:

with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭

什么时候可以使用with呢

源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化

优化过后的代码: (此时就不需要在手动stop关闭了)

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

一个案例:

需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。

  • 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】

数据如下:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
  • 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化

可以使用RDD的算子将数据改为我们想要的格式化数据

也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据

写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例

使用RDD+SparkSQL

代码如下:

import os
import refrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("MovieTop10").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:print(spark)rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \.filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")# spark.sql("""#     select * from rating# """).show()movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \.map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \.toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")# spark.sql("""#     select * from movie# """).show(truncate=False)#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数spark.sql("""select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000 order by avgRate desc limit 10""").show(truncate=False)# 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()spark.sql("""with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming <= 10""").show()
复习 排名函数:
1、row_number()

row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列

效果如下:
98                1
97                2
97                3
96                4
95                5
95                6没有并列名次情况,顺序递增
2、rank()

生成数据项在分组中的排名,排名相等会在名次中留下空位

效果如下:
98                1
97                2
97                2
96                4
95                5
95                5
94                7
有并列名次情况,顺序跳跃递增
3、dense_rank()

生成数据项在分组中的排名,排名相等会在名次中不会留下空位

效果如下:
98                1
97                2
97                2
96                3
95                4
95                4
94                5
有并列名次情况,顺序递增
只使用 SparkSQL:

以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来

通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据

df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)# 同样也可以写成排名函数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming <= 10
""").show(truncate=False)

http://www.ppmy.cn/news/1547325.html

相关文章

C语言实现3D动态爱心图形的绘制与动画效果

**标题&#xff1a;C语言实现3D动态爱心图形的绘制与动画效果** --- ### 一、引言 在计算机图形学中&#xff0c;三维图形的绘制和动画处理是一个重要且有趣的研究方向。通过数学公式描述的几何体可以在计算机屏幕上展示出丰富多彩的动态效果&#xff0c;其中“爱心”图形作…

ssm103宠物领养系统+vue(论文+源码)_kaic

毕业设计&#xff08;论文&#xff09; 宠物领养系统的设计与实现 学生姓名&#xff1a; 二级学院&#xff1a; 班级名称&#xff1a; 指导教师&#xff1a; 年 月 日 录 摘 …

STM32CubeMX学习笔记33---芯片因未选serial debug被锁住

利用STM32CubeMX配置了一个工程&#xff0c;下载到芯片后&#xff0c;芯片能够正常工作&#xff0c;但是在第二次通过SWD却怎么都连接不上芯片&#xff0c;偶尔按住复位键能够连接上芯片&#xff0c;但是无法读取也无法擦除flash。找了一些资料后才知道是由于STM32CubeMX没有配…

二:基于ABNF语义定义的HTTP消息格式

引言 超文本传输协议(HTTP,Hypertext Transfer Protocol)是网络通信中应用最广泛的协议之一。随着互联网技术的进步,HTTP协议逐步发展和规范化,形成了众多版本,如HTTP/1.0、HTTP/1.1、HTTP/2,以及最新的HTTP/3。HTTP协议规范的一个关键部分是对消息格式的定义,而其中广…

若依前后端分离版部署(超详细)

一、简介 有些特殊情况需要部署到子路径下,例如:https://www.jzjtest.cn/admin-hb,可以按照下面流程修改。 二、实现步骤 2.1 后端部署 自定义后端端口 # 开发环境配置 server:# 服务器的HTTP端口,默认为8080port: 10081通过maven:package一键打包成jar 将jar上传到服务器…

VBA即用型代码手册:设置PDF中标题行Set Header Row in Output PDF

我给VBA下的定义&#xff1a;VBA是个人小型自动化处理的有效工具。可以大大提高自己的劳动效率&#xff0c;而且可以提高数据的准确性。我这里专注VBA,将我多年的经验汇集在VBA系列九套教程中。 作为我的学员要利用我的积木编程思想&#xff0c;积木编程最重要的是积木如何搭建…

python包管理工具pip和conda的使用对比

python包管理工具pip和conda的使用对比 总述1. pip使用2. conda注意虚拟环境之间的嵌套&#xff0c;这个会导致安装包后看不到包&#xff0c;实际是安装到了base环境里 未完待续 总述 pip相对于conda,对应包的依赖关系管理不强&#xff0c;坏处是容易造成包冲突&#xff0c;好…

2024算法基础公选课练习三(DFS1)(2)

一、前言 dfs是初学者的重点&#xff0c;也是难点&#xff0c;这次的有些题目也不好写。&#xff08;2&#xff09;的中难题很多 二、题目总览 三、具体题目 3.1 问题 H: 卡片 思路 跟卡片&#xff08;2&#xff09;相似&#xff0c;不过这次dfs的时候用string拼接放入set中…