Python大数据处理利器,PySpark的入门实战

news/2025/1/30 13:40:16/

PySpark极速入门

一:Pyspark简介与安装

什么是Pyspark?

PySpark是Spark的Python语言接口,通过它,可以使用Python API编写Spark应用程序,目前支持绝大多数Spark功能。目前Spark官方在其支持的所有语言中,将Python置于首位。

如何安装?

在终端输入

pip intsall pyspark

或者使用pycharm,在GUI界面安装

二:编程实践

加载、转换数据

# 导入pyspark
# 导入pandas, 稍后与pyspark中的数据结构做对比
import pyspark
import pandas as pd

在编写spark程序前,我们要创建一个SparkSession对象

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark极速入门").getOrCreate()

可以看到会话的一些信息:使用的Spark版本、运行模式、应用程序名字

演示环境用的是local本地模式, * 代表的是使用全部线程 如果想用集群模式的话,可以去查看集群搭建的相关教程 届时pyspark程序作为spark的客户端,设置连接集群,就是真正的分布式计算了 目前只是本地模式,用多线程去模拟分布式计算。

spark

看看我们将用到的test1数据吧

使用read方法,用option设置是否读取csv的头,再指定路径就可以读取数据了

df_spark = spark.read.option("header", "true").csv("./data/test1.csv")

看看是什么类型

type(df_spark)
pyspark.sql.dataframe.DataFrame

再看看用pandas读取是什么类型

type(pd.read_csv("./data/test1.csv"))
pandas.core.frame.DataFrame

可以发现Spark读取这种结构化数据时,用的也是和pandas类似的dataframe结构 这也是Spark应用最广泛的数据结构

使用show方法打印数据

df_spark.show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

使用printSchema方法打印元数据信息,发现明明是数值类型的,它却读取为了字符串类型

df_spark.printSchema()
root|-- Name: string (nullable = true)|-- age: string (nullable = true)|-- Experience: string (nullable = true)|-- Salary: string (nullable = true)

在读取时,加上类型推断,发现此时已经能正确读取了

df_spark = spark.read.option("header", "true").csv("./data/test1.csv",inferSchema=True)
df_spark.printSchema()
root|-- Name: string (nullable = true)|-- age: integer (nullable = true)|-- Experience: integer (nullable = true)|-- Salary: integer (nullable = true)

选择某些列, 可以发现不管选多列还是选单列,返回的都是dataframe 返回的也同样可以printSchema、show等dataframe使用的方法,做到了结构的统一

df_spark.select(["Name", "age"])
DataFrame[Name: string, age: int]
df_spark.select("Name")
DataFrame[Name: string]
df_spark.select(["Name", "age", "Salary"]).printSchema()
root|-- Name: string (nullable = true)|-- age: integer (nullable = true)|-- Salary: integer (nullable = true)

不用select,而用[]直接选取,就有点类似与pandas的series了

df_spark["Name"]
Column<'Name'>

column就不能直接show了

df_spark["age"].show()
---------------------------------------------------------------------------TypeError                                 Traceback (most recent call last)Input In [15], in <cell line: 1>()
----> 1 df_spark["age"].show()TypeError: 'Column' object is not callable

用describe方法可以对dataframe做一些简单的统计

df_spark.describe().show()
+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+

用withColumn方法给dataframe加上一列

df_spark = df_spark.withColumn("Experience After 3 year", df_spark["Experience"] + 3)
df_spark.show()
+---------+---+----------+------+-----------------------+
|     Name|age|Experience|Salary|Experience After 3 year|
+---------+---+----------+------+-----------------------+
|    Krish| 31|        10| 30000|                     13|
|Sudhanshu| 30|         8| 25000|                     11|
|    Sunny| 29|         4| 20000|                      7|
|     Paul| 24|         3| 20000|                      6|
|   Harsha| 21|         1| 15000|                      4|
|  Shubham| 23|         2| 18000|                      5|
+---------+---+----------+------+-----------------------+

用drop方法删除列

df_spark = df_spark.drop("Experience After 3 year")
df_spark.show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

用withColumnRename方法重命名列

df_spark.withColumnRenamed("Name", "New Name").show()
+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

处理缺失值

看看接下来要带缺失值的test2数据吧

CSeoe.png

df_spark = spark.read.csv("./data/test2.csv", header=True, inferSchema=True)
df_spark.show()
+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+

用na.drop删除缺失值 how参数设置策略,any意思是只要一行里有缺失值,那就删了 any也是how的默认参数

df_spark.na.drop(how="any").show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

可以通过thresh参数设置阈值,代表超过一行中缺失值的数量超过这个值,才会被删除

df_spark.na.drop(how="any", thresh=2).show()
+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+

也可以用subset参数设置关注的列 下面代码意思是,在Experience列中,只要有缺失值就删掉

df_spark.na.drop(how="any", subset=["Experience"]).show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+

用fillna填充缺失值, 可以用字典对各列的填充值进行设置

df_spark.fillna({'Name': 'unknown', 'age': 18, 'Experience': 0, 'Salary': 0}).show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 18|         0| 40000|
|  unknown| 34|        10| 38000|
|  unknown| 36|         0|     0|
+---------+---+----------+------+

还可以调用机器学习模块的相关方法, 通过设置策略,可以用平均数、众数等方式填充

from pyspark.ml.feature import Imputerimputer = Imputer(inputCols = ['age', 'Experience', 'Salary'],outputCols = [f"{c}_imputed" for c in ['age', 'Experience', 'Salary']]
).setStrategy("mean")
imputer.fit(df_spark).transform(df_spark).show()
+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+------+-----------+------------------+--------------+

过滤操作

还是切换到test1数据

df_spark = spark.read.csv("./data/test1.csv", header=True, inferSchema=True)
df_spark.show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

可以使用filter方法对数据进行过滤操作,类似于SQL中的where 可以使用字符串的方式,也可以利用column方式去传递条件

df_spark.filter("Salary <= 20000").show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

df_spark.filter(df_spark["Salary"]<=20000).show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

如果是字符串,用 and 表示同时满足多个条件 如果是用column,用( & ) 连接多个条件

df_spark.filter("Salary <= 20000 and age <= 24").show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

df_spark.filter((df_spark["Salary"]<=20000)& (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

column中,用|表示或, ~表示取反
df_spark.filter((df_spark["Salary"]<=20000)| (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

df_spark.filter((df_spark["Salary"]<=20000)| ~(df_spark["age"]<=24)
).show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

分组聚合

换一个数据集test3

df_spark = spark.read.csv("./data/test3.csv", header=True, inferSchema=True)
df_spark.show()
+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+

使用groupby方法对dataframe某些列进行分组

df_spark.groupBy("Name")
<pyspark.sql.group.GroupedData at 0x227454d4be0>

可以看到分组的结果是GroupedData对象,它不能使用show等方法打印 GroupedData对象需要进行聚合操作,才能重新转换为dataframe 聚合函数有sum、count、avg、max、min等

df_spark.groupBy("Departments").sum().show()
+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+

三:总结

Pandas的dataframe与PySpark的dataframe有许多相似之处,熟悉Pandas的同学可以很快适应它的API。目前可以粗浅地把PySpark理解为”分布式的Pandas“,不过,PySpark还有分布式机器学习的功能——Spark MLlib(可以理解为分布式的Sklearn、TensorFlow等),后续会给大家介绍。在集群中,它的dataframe可以分布在不同的机器上,以此处理海量数据。有兴趣的小伙伴可以通过虚拟机搭建一个Spark集群,进一步学习Spark。

Apache Spark™ - 用于大规模数据分析的统一引擎


http://www.ppmy.cn/news/21192.html

相关文章

推荐系统实战6——EasyRec 搭建WideAndDeep排序模型实现CTR点击平台

推荐系统实战6——EasyRec 搭建WideAndDeep排序模型实现CTR点击平台学习前言EasyRec仓库地址WideAndDeep实现思路一、WideAndDeep整体结构解析二、网络结构解析1、Embedding层的构建a、字符串形式的输入b、连续值&#xff08;特定范围值&#xff09;的输入c、Wide网络和Deep网络…

【splishsplash】Houdini粒子的导入与导出

Houdini粒子的导入与导出 Houdini导入到splish 在Houdini中使用file导入任意几何模型 使用points from volume采样点&#xff0c;使其粒子化 使用file导出粒子化之后的模型&#xff0c;后缀写bhclassic 创建json场景文件&#xff08;建议放到MyScences文件夹&#xff09; …

python小游戏——像素鸟代码开源

♥️作者&#xff1a;小刘在这里 ♥️每天分享云计算网络运维课堂笔记&#xff0c;努力不一定有收获&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️夕阳下&#xff0c;是最美的&#xff0c;绽放&#xff0c;愿所有的美好&#…

rust结构体

结构体 和大多数语言一样&#xff0c;rust也提供了结构体。一个结构体定义如下所示&#xff1a; struct User {active: bool,username: String,email: String,sign_in_count: u64, }其中&#xff0c;struct是定义结构体的关键字&#xff0c;User是该结构体的名称&#xff0c;…

HCIA-Datacom题库2023最新放送,能答对60%就拿下证书

HCIA的学习是网络工程师这条路的开始。如果你准备好了&#xff0c;就往下看&#xff01;HCIA认证是华为公司认证体系中的初级认证&#xff0c;是一个入门认证&#xff0c;它包含的技术很简单&#xff0c;只是一个单核心的小网络&#xff0c;距离一个合格的网络工程师还有一段距…

【c语言进阶】动态通讯录

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a; c语言学习 &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对我…

【青训营】规则引擎概述和入门

本文内容总结自 字节跳动青年训练营 第五届后端组 一、规则引擎是什么 规则引擎是一种嵌入在应用程序中的组件&#xff0c;实现了将业务决策从应用程序代码中分离出来&#xff0c;并且使用预定义语义模块编写业务决策。接受数据输入&#xff0c;解释业务规则&#xff0c;并且…

Leetcode:343. 整数拆分(C++)

目录 问题描述&#xff1a; 实现代码与解析&#xff1a; 动态规划&#xff1a; 原理思路&#xff1a; 优化版&#xff1a; 原理思路&#xff1a; 数学方法&#xff1a; 原理思路&#xff1a; 问题描述&#xff1a; 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数…