Pyspark dataframe基本内置方法(4)

news/2024/9/24 10:54:35/

文章目录

  • Pyspark sql DataFrame
    • 相关文章
    • RDD
    • repartition 重新分区
    • replace 替换
    • sameSemantics dataframe是否相等
    • sample 采样
    • sampleBy 分层采样
    • schema 显示dataframe结构
    • select 查询
    • selectExpr 查询
    • semanticHash 获取哈希值
    • show 展示dataframe
    • sort 排序
    • sortWithinPartitions 分区按照指定列排序
    • stat 返回统计函数类型
    • storageLevel 获取存储级别
    • subtract 获取差集
    • summary 总览
    • tail 从结尾获取数据
    • take 返回记录
    • to 配合schema返回新结构的dataframe

spark_sql_DataFrame_1">Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

RDD

返回包含ROW对象的rdd

data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.rdd
MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0data.rdd.foreach(lambda x : print(type(x),x))
<class 'pyspark.sql.types.Row'> Row(name='test3', age='19', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test4', age='51', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test5', age='13', id='1', gender='男')
<class 'pyspark.sql.types.Row'> Row(name='ldsx', age='12', id='1', gender='男')
<class 'pyspark.sql.types.Row'> Row(name='test1', age='20', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test2', age='26', id='1', gender='男')

repartition 重新分区

每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。

在物理存储中,每个分区指向一个存储在内存或者硬盘中的数据块 (Block) ,其实这个数据块就是每个 task 计算出的数据块,它们可以分布在不同的节点上。

RDD 只是抽象意义的数据集合,分区内部并不会存储具体的数据,只会存储它在该 RDD 中的 index,通过该 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,然后通过底层存储层的接口提取到数据进行处理。

data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
# 选择以某列进行分区
data.repartition('name').rdd.getNumPartitions()
1
# 指定分区数量进行分区(可以指定多列)
data.repartition(7,'name','age').rdd.getNumPartitions()
7data = data.repartition(5,'gender')
data.rdd.glom().collect()[[], [Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'),Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女'),Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')], [], [], []]# 直接操作rdd只能按数据分区不能按照列分区
data.rdd.repartition(1).glom().collect()
[[Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'), Row(name='test3', age='19', id='1', gender='女'), Row(name='test4', age='51', id='1', gender='女')]]data.repartition(2,'id').rdd.glom().collect()
[[Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'), Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女'), Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')], []]data.repartition(2).rdd.glom().collect()
[[Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女')], [Row(name='test1', age='20', id='1', gender='女'), Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')]]

replace 替换

当替换的值与原本列的数据类型不相同时会报错

df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|    10|  Tom|
|null|  null| null|
+----+------+-----+
df.fillna({'age':1,'height':'2','name':"sr"}).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|     2|  Bob|
|  1|    10|  Tom|
|  1|     2|   sr|
+---+------+-----+df.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|   A|
|   5|  null|   B|
|null|    10| Tom|
|null|  null|null|
+----+------+----+df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|    10|  Tom|
|null|  null| null|
+----+------+-----+df.na.replace(10,12).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  12|    80|Alice|
|   5|  null|  Bob|
|null|    12|  Tom|
|null|  null| null|
+----+------+-----+

sameSemantics dataframe是否相等

当两个 dataframe中的逻辑查询计划相等并因此返回相同的结果时,返回 True

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
data2.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|   2.0|
|test1| 20|  1|    女|   2.0|
|test2| 26|  1|    男|   2.0|
|test3| 19|  1|    女|   2.0|
|test4| 51|  1|    女|   2.0|
|test5| 13|  1|    男|   2.0|
+-----+---+---+------+------+data.sameSemantics(data2)
False
data.sameSemantics(data)
True

sample 采样

withReplacement:是否进行有放回采样,默认为False,表示进行无放回采样;设置为True时,表示进行有放回采样
fraction: 采样比例 float
seed: 随机种子值,值固定后采样获取固定默认为空

# 取样不固定
df.sample(0.1).show()
+---+
| id|
+---+
+---+
df.sample(0.1).show()
+---+
| id|
+---+
|  9|
+---+
df.sample(0.1).show()
+---+
| id|
+---+
|  1|
|  5|
+---+# 随机种子固定,取样固定
df.sample(0.1,1).show()
+---+
| id|
+---+
|  3|
+---+
df.sample(0.1,1).show()
+---+
| id|
+---+
|  3|
+---+

sampleBy 分层采样

col:列名

fractions: 采样字典

seed: 随机种子值,值固定后采样获取固定默认为空

ataset = spark.range(0, 100).select((col("id") % 3).alias("key"))
dataset.show()+---+
|key|
+---+
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
...
...
|  0|
|  1|
|  2|
|  0|
|  1|
+---+# 列为key,中值为0取样10%,值为1取样10%,值为2取样10%
dataset.sampleBy("key", fractions={0: 0.1, 1: 0.1,2:0.1}, seed=0).show()
+---+
|key|
+---+
|  2|
|  0|
|  1|
|  2|
|  1|
|  2|
|  2|
|  1|
|  2|
+---+
# 列为key,中值为0取样10%,值为2取样10%
dataset.sampleBy("key", fractions={0: 0.1,2:0.1}, seed=0).show()
+---+
|key|
+---+
|  2|
|  0|
|  2|
|  2|
|  2|
|  2|
+---+

schema 显示dataframe结构

将此DataFrame的架构作为pyspark.sql.types返回

df.schema
StructType([StructField('id', LongType(), False)])df.printSchema()
root|-- id: long (nullable = false)

select 查询

查询并返回新dataframe,可结合多方法使用是。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])df.select('*').show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+df.select(df.name, (df.age + 10).alias('age')).show()
+-----+---+
| name|age|
+-----+---+
|Alice| 12|
|  Bob| 15|
+-----+---+

selectExpr 查询

接受sql表达式并执行

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
df.selectExpr('age * 2','age+2').show()
+---------+---------+
|(age * 2)|(age + 2)|
+---------+---------+
|        4|        4|
|       10|        7|
+---------+---------+df.selectExpr('age * 2 as ldsx','age+2').show()
+----+---------+
|ldsx|(age + 2)|
+----+---------+
|   4|        4|
|  10|        7|
+----+---------+

semanticHash 获取哈希值

df.selectExpr('age * 2 as ldsx','age+2').semanticHash()
-2082183221
df.semanticHash()
1019336781

show 展示dataframe

展示前n行数据到控制台,默认展示20行

df.show(1)
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
+---+-----+
only showing top 1 row

sort 排序

按照指定列排序

from pyspark.sql.functions import desc, asc
# 下面方式效果一致
df.sort(desc('age')).show()
df.sort("age", ascending=False).show()
df.orderBy(df.age.desc()).show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|Alice|
|  2|  Bob|
+---+-----+# 使用两列排序,一列降序,一列默认(升序)
df.orderBy(desc("age"), "name").show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|Alice|
|  2|  Bob|
+---+-----+
# 使用两列排序,都为降序
df.orderBy(desc("age"), desc("name")).show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|  Bob|
|  2|Alice|
+---+-----+# 两列都为降序
df.orderBy(["age", "name"], ascending=[False, False]).show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|  Bob|
|  2|Alice|
+---+-----+

sortWithinPartitions 分区按照指定列排序

df.sortWithinPartitions('age').show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  5|  Bob|
+---+-----+

stat 返回统计函数类型

df.stat
<pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7f55c87669e8>

storageLevel 获取存储级别

df.storageLevel
StorageLevel(False, False, False, False, 1)
df.cache().storageLevel
StorageLevel(True, True, False, True, 1)

subtract 获取差集

返回一个新的DataFrame,其中包含此DataFrame中的行,但不包含另一个DataFrame中。d1.subtarct(d2),获取d1的差集。

df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
|  c|  4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+
df1.subtract(df2).show()
+---+---+
| C1| C2|
+---+---+
|  c|  4|
+---+---+

summary 总览

计算数值列和字符串列的指定统计信息。可用的统计数据有:-count-mean-stddev-min-max-指定为百分比的任意近似百分位数
如果没有给出统计数据,此函数将计算计数、平均值、标准偏差、最小值、近似四分位数(百分位数分别为25%、50%和75%)和最大值。

df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bob| 13|  40.3| 150.5|
|Alice| 12|  37.8| 142.3|
|  Tom| 11|  44.1| 142.2|
+-----+---+------+------+df.summary().show()
24/09/19 11:24:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.+-------+-----+----+------------------+-----------------+
|summary| name| age|            weight|           height|
+-------+-----+----+------------------+-----------------+
|  count|    3|   3|                 3|                3|
|   mean| null|12.0|40.733333333333334|            145.0|
| stddev| null| 1.0| 3.172275734127371|4.763402145525822|
|    min|Alice|  11|              37.8|            142.2|
|    25%| null|  11|              37.8|            142.2|
|    50%| null|  12|              40.3|            142.3|
|    75%| null|  13|              44.1|            150.5|
|    max|  Tom|  13|              44.1|            150.5|
+-------+-----+----+------------------+-----------------+

tail 从结尾获取数据

运行尾部需要将数据移动到应用程序的驱动程序进程中,如果使用非常大的num,可能会导致驱动程序进程因OutOfMemoryError而崩溃。

df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bob| 13|  40.3| 150.5|
|Alice| 12|  37.8| 142.3|
|  Tom| 11|  44.1| 142.2|
+-----+---+------+------+
df.tail(2)
[Row(name='Alice', age=12, weight=37.8, height=142.3), Row(name='Tom', age=11, weight=44.1, height=142.2)]

take 返回记录

head 调用的就是taketake调用的limit

# 源码def take(self, num: int) -> List[Row]:"""Returns the first ``num`` rows as a :class:`list` of :class:`Row`... versionadded:: 1.3.0.. versionchanged:: 3.4.0Supports Spark Connect.Parameters----------num : intNumber of records to return. Will return this number of recordsor all records if the DataFrame contains less than this number of records..Returns-------listList of rowsExamples-------->>> df = spark.createDataFrame(...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])Return the first 2 rows of the :class:`DataFrame`.>>> df.take(2)[Row(age=14, name='Tom'), Row(age=23, name='Alice')]"""return self.limit(num).collect()

to 配合schema返回新结构的dataframe

from pyspark.sql.types import StructField, StringType
df = spark.createDataFrame([("a", 1)], ["i", "j"])
df.show()
+---+---+
|  i|  j|
+---+---+
|  a|  1|
+---+---+
df.schema
StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])# 设置新的scheam
schema = StructType([StructField("j", StringType()), StructField("i", StringType())])
df.schema
StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])# df使用新的scheam进行转换,查看scheam
df.to(schema).schema
# 顺序改变,字段类型改变
StructType([StructField('j', StringType(), True), StructField('i', StringType(), True)])
df.to(schema).show()
+---+---+
|  j|  i|
+---+---+
|  1|  a|
+---+---+# 当schema设置原df不存在的列,则会默认补充null
schema = StructType([StructField("q", StringType()), StructField("w", StringType()),StructField("i", StringType())])
df.to(schema).show()
+----+----+---+
|   q|   w|  i|
+----+----+---+
|null|null|  a|
+----+----+---+

http://www.ppmy.cn/news/1529783.html

相关文章

常见排序详解

1、常见的排序算法 插入排序&#xff1a;直接插入排序、希尔排序&#xff1b; 选择排序&#xff1a;选择排序、堆排序&#xff1b; 交换排序&#xff1a;冒泡排序、快速排序&#xff1b; 归并排序&#xff1a;归并排序&#xff1b; 2、常见排序算法的实现 2.1 插入排序 2…

【Oauth2整合gateway网关实现微服务单点登录】

文章目录 一.什么是单点登录&#xff1f;二.Oauth2整合网关实现微服务单点登录三.时序图四.代码实现思路1.基于OAuth2独立一个认证中心服务出来2.网关微服务3产品微服务4.订单微服务5.开始测试单点登录 一.什么是单点登录&#xff1f; 单点登录&#xff08;Single Sign On&…

Python记录

1.冒泡排序 时间复杂度O&#xff08;n^2) 选择、插入都是 def bubble(data, reverse):for i in range(len(data)-1):for j in range(len(data)-i-1):if data[j] > data[j1]:data[j], data[j1] data[j1], data[j]if reverse:data.reverse()return data 2.快速排序 时间…

vue中高德地图使用 Marker 标点 - 标点数据快到 1000 时页面卡顿问题解决(已解决 - 多方面原因)+ 海量点功能实现解决

目录 1.业务需求2.最初实现及出现的问题3.解决 - 1000 个标点时页面就出现 卡顿 问题4.使用海量点、聚合标点后还有卡顿&#xff0c;排查其他原因5.最终解决5.1页面中list数据渲染问题解决5.2地图相关实例不要放在 vue 的可响应数据中 页面展示 1.业务需求 需要在 高德地图 中标…

nodejs安装

下载安装包 https://nodejs.cn/download/ 配置环境变量&#xff1a; PATH编辑-新建- D:\node-v20.17.0 检测安装是否成功&#xff1a; npm -v 设置缓存与包安装位置 在nodejs安装目录下新建缓存与模块安装位置node_cache 、node_modules 更改全局模块路径&#xff1a; 运…

Parallels Desktop 20 for Mac 推出:完美兼容 macOS Sequoia 与 Win11 24H2

Parallels Desktop 20 for Mac 近日正式发布&#xff0c;这一新版本不仅全面支持 macOS Sequoia 和 Windows 11 24H2&#xff0c;还在企业版中引入了一个全新的管理门户。新版本针对 Windows、macOS 和 Linux 虚拟机进行了多项改进&#xff0c;其中最引人注目的当属 Parallels …

shell linux cut 切割字符串

shell linux 切割字符串 在Shell脚本中&#xff0c;可以使用内置的cut命令来切割字符串。cut命令主要有三个选项 -c、-f和-d&#xff0c;分别表示按字符、按字段和指定分隔符来切割字符串。 按字符切割&#xff1a; echo "Hello World" | cut -c 1-5 # 输出&#…

开源模型应用落地-Qwen2.5-Coder模型小试-码无止境(一)

一、前言 代码专家模型是一种基于人工智能的先进技术&#xff0c;旨在自动分析和理解大量代码库&#xff0c;并从中学习常见的编码模式和最佳实践。这种模型通过深度学习和自然语言处理&#xff0c;能够提供准确而高效的代码建议&#xff0c;帮助开发人员在编写代码时有效地避免…