(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

news/2024/11/2 0:49:03/

目录

一、PySpark

二、数据介绍

三、PySpark大数据开发实战

1、数据文件上传HDFS

2、导入模块及数据

3、数据统计与分析

①、计算演员参演电影数

②、依次罗列电影番位前十的演员

③、按照番位计算演员参演电影数

④、求每位演员所有参演电影中的最早、最晚上映时间及其相隔天数、年数

⑤、求每位演员所有电影中的评分最高值、最低值、电影数量、评分均值、标准差、方差、最高最低评分之差值

⑥、求参演大于等于10部电影的每位演员的平均评分,计算规则:去掉一个最高分和一个最低分,然后再计算电影平均分

⑦、求投票数在所有电影中排前80%、评分在所有电影中排前20%的电影信息

⑧、求美国、中国(含港澳台)、英国、法国、俄罗斯5个国家各个电影类型的上映电影数目

⑨、求各个地区评分最高的电影,若并列第一则以“|”拼接

⑩、统计从数据中最早年份到最晚年份的每月上映电影数量,若某个月份无电影上映则数量为0

4、下载统计结果

四、总结


一、PySpark

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。

PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

本文软件环境如下:

操作系统:CentOS Linux 7

Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程

Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战

Python版本:Python(Anaconda)3.11.4

PySpark基础学习可看 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测


二、数据介绍

本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。

爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析

数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z

数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:

三、PySpark大数据开发实战

1、数据文件上传HDFS

首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:

hdfs dfs -mkdir /data
hdfs dfs -mkdir /output
hdfs dfs -put film_info.csv /data

2、导入模块及数据

使用SparkSession.builder.config(conf=SparkConf()).getOrCreate()创建Spark会话。使用spark.read.csv()读取CSV文件,并设置header=True以识别首行为列名,inferSchema=True自动推断数据类型。

from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window# 主程序:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()df = spark.read.csv("/data/film_info.csv", header=True, inferSchema=True)

3、数据统计与分析

①、计算演员参演电影数

以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计:

df.groupBy("actor").agg(count("title").alias("act_film_num"))
# 按分隔符切分列表
df_split = df.withColumn("actors", F.split(df["actors"], "\|")) \.withColumn("types", F.split(df["types"], "\|")) \.withColumn("regions", F.split(df["regions"], "\|"))# 演员:拆分多行
df_exploded = df_split.withColumn("actor", F.explode(F.col("actors")))
df_exploded.drop(*["actors", "regions", "types"]).createOrReplaceTempView("actor_exploded")df1 = spark.sql('''select actor,count(*) as act_film_numfrom actor_exploded group by actor''')
df1.sort(df1["act_film_num"].desc()).repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result1.csv")

结果如下:

+-------------+------------+
|        actor|act_film_num|
+-------------+------------+
|       童自荣|          43|
|     户田惠子|          37|
|         林雪|          33|
|       张国荣|          32|
|       刘德华|          31|
|       周星驰|          31|
|         成龙|          31|
|       任达华|          31|
|         刘洵|          30|
|塞缪尔·杰克逊|          29|
|  汤姆·汉克斯|          29|
|       梁家辉|          28|
|       吴孟达|          28|
|       梁朝伟|          27|
|      斯坦·李|          27|
|       吴君如|          27|
|    威廉·达福|          27|
|       黄秋生|          27|
|       胡立成|          27|
|  布拉德·皮特|          26|
+-------------+------------+
only showing top 20 rows
②、依次罗列电影番位前十的演员

这一题考察了窗口函数、行转列等等。

# 定义窗口函数,按电影标题和演员顺序排序
windowSpec = Window.partitionBy("title").orderBy("actors")
# 添加序号列
df2 = df_exploded.withColumn("rank", F.row_number().over(windowSpec))
# 过滤出前10个演员
rank_num = 10
rank_num_list = [str(i + 1) for i in range(rank_num)]
# 将演员重新组合成单行
df2_tmp1 = df2.groupBy("title").pivot("rank", rank_num_list).agg(F.collect_list("actor"))
df2_tmp2 = df2_tmp1.select("title", *[F.col(f"{i + 1}")[0].alias(f"actor{i + 1}") for i in range(rank_num)])
df2_tmp2.repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result2.csv")

结果如下:

+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                   title|             actor1|               actor2|            actor3|         actor4|           actor5|                actor6|             actor7|               actor8|           actor9|          actor10|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                 101忠狗|          罗德·泰勒|            凯特·鲍尔|           本·怀特|    丽莎·戴维斯| 贝蒂·洛乌·格尔森|         J·帕特·奥马利|      玛莎·温特沃思|      大卫·弗兰克海姆|弗莱德里克·沃洛克|        汤姆·康威|
|                   11:14|        亨利·托马斯|          布莱克·赫伦|       芭芭拉·赫希|  克拉克·格雷格|    希拉里·斯万克|           肖恩·海托西|      斯塔克·桑德斯|          科林·汉克斯|        本·福斯特|  帕特里克·斯威兹|
|                    2012|        约翰·库萨克|          阿曼达·皮特|     切瓦特·埃加福|      坦迪·牛顿|    奥利弗·普莱特|           汤姆·麦卡锡|        伍迪·哈里森|          丹尼·格洛弗|      连姆·詹姆斯|        摩根·莉莉|
|                    2046|             梁朝伟|               章子怡|              王菲|       木村拓哉|             巩俐|                刘嘉玲|               张震|               张曼玉|             董洁|      通猜·麦金泰|
|                    21克|            西恩·潘|          娜奥米·沃茨|本尼西奥·德尔·托罗|  夏洛特·甘斯布|      梅丽莎·里奥|         迈克尔·芬内尔|     

http://www.ppmy.cn/news/1543733.html

相关文章

(9)位运算

1. 位运算的概念 位运算操作的是整数在内存中的二进制位。C 语言提供了以下几种位运算操作符: 按位与(&) 运算规则:将两个操作数对应的二进制位进行与运算。只有当两个对应位都为 1 时,结果位才为 1&#xff0c…

UE 引入 IOS framework库的坑

一、我明明已经把framework库进行签名的却在 上传到开发者后台时一直报错 90034 签章遗失 或者 未签 这个问题我最近遇到 极其坑爹 我是这个情况 这是我的framework库的目录 关键就在这了 多出了这个文件 就影响了 上传到开发者后台 就报错 90034 将其删除就好 &…

Gradio DataFrame分页功能详解:从入门到实战

Gradio DataFrame分页功能详解:从入门到实战 1. 引言2. 为什么需要分页?3. 环境准备4. 基础知识准备5. 代码实现5.1 创建示例数据5.2 分页状态管理5.3 分页核心逻辑5.4 创建Gradio界面 6. 关键功能解析6.1 页码计算6.2 数据切片 7. 使用示例8. 实用技巧9…

LeetCode994. 腐烂的橘子(2024秋季每日一题 54)

在给定的 m x n 网格 grid 中,每个单元格可以有以下三个值之一: 值 0 代表空单元格;值 1 代表新鲜橘子;值 2 代表腐烂的橘子。 每分钟,腐烂的橘子 周围 4 个方向上相邻 的新鲜橘子都会腐烂。 返回 直到单元格中没有…

系统安全架构的深度解析与实践:Java代码实现

引言 系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。本文将详细介绍系统安全架构的概念,并从前后分层、业务切割、…

Flink CDC系列之:学习理解核心概念——Data Pipeline

Flink CDC系列之:学习理解核心概念——Data Pipeline 数据管道sourcesink管道配置Table IDroutetransform案例 数据管道 由于 Flink CDC 中的事件以管道方式从上游流向下游,因此整个 ETL 任务被称为数据管道。 管道对应于 Flink 中的一系列操作。 要描…

【STM32+HAL】STM32CubeMX学习目录

一、基础配置篇 【STM32HAL】微秒级延时函数汇总-CSDN博客 【STM32HAL】CUBEMX初始化配置 【STM32HAL】定时器功能小记-CSDN博客 【STM32HAL】PWM呼吸灯实现 【STM32HAL】DACDMA输出波形实现-CSDN博客 【STM32HAL】ADCDMA采集(单通道多通道)-CSDN博客 【STM32HAL】三重A…

TS 项目中给常用的路径定义一个别名 tsconfig.json

TS 项目中给常用的路径定义一个别名 tsconfig.json 在 TS 项目中,可以定义一些自定义的别名,来取代经常需要引用的一些文件路径。 比如 Vue 项目中你可以需要经常从 /src 中取文件,在每个层级的文件中引用时的相对路径 ../../src ../src 都不…