Pyspark dataframe基本内置方法(5)

ops/2024/11/14 14:51:53/

文章目录

  • Pyspark sql DataFrame
    • 相关文章
    • toDF 设置新列名
    • toJSON row对象转换json字符串
    • toLocallterator 获取迭代器
    • toPandas 转换python dataframe
    • transform dataframe转换
    • union unionALL 并集不去重(按列顺序)
    • unionByName 并集不去重(按列名)
    • unpivot 反转表(宽表转长表)
    • withColumn 添加列操作
    • withColumns 添加多列操作
    • withColumnRenamed 列重命名
    • withColumnsRenamed 多列重命名
    • withMetadata 设置元数据
    • write 存储表
      • write.saveAsTable
      • insertInto

spark_sql_DataFrame_1">Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

toDF 设置新列名

列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。

from spark>pyspark.sql.functions import litdata.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
data.toDF(*['n1','n2','n3','n5','n4']).show()
+-----+---+---+---+---+
|   n1| n2| n3| n5| n4|
+-----+---+---+---+---+
| ldsx| 12|  1| 男|  1|
|test1| 20|  1| 女|  1|
|test2| 26|  1| 男|  1|
|test3| 19|  1| 女|  1|
|test4| 51|  1| 女|  1|
|test5| 13|  1| 男|  1|
+-----+---+---+---+---+

toJSON row对象转换json字符串

把dataframe的row对象转换为json字符串,返回rdd

data.rdd.first()
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
# data.toJSON()返回rdd类型
data.toJSON().first()
'{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'

toLocallterator 获取迭代器

返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。

d1 = data.toLocalIterator()
d1
<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
# 便利迭代器
for i in d1:print(i)Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
Row(name='test1', age='20', id='1', gender='女', new_id='1')
Row(name='test2', age='26', id='1', gender='男', new_id='1')
Row(name='test3', age='19', id='1', gender='女', new_id='1')
Row(name='test4', age='51', id='1', gender='女', new_id='1')
Row(name='test5', age='13', id='1', gender='男', new_id='1')

python_dataframe_73">toPandas 转换python dataframe

需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。

data.toPandas()
type(data.toPandas())
<class 'pandas.core.frame.DataFrame'>name age id gender new_id
0   ldsx  12  1      男      1
1  test1  20  1      女      1
2  test2  26  1      男      1
3  test3  19  1      女      1
4  test4  51  1      女      1
5  test5  13  1      男      1

transform dataframe转换

参数为处理函数,返回值必须为dataframe

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+# 处理函数自定义最后返回了dataframe
def ldsx(spark_df):colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]return spark_df.toDF(*colums)data.transform(ldsx).show()
+------+------+------+------+------+
|0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
+------+------+------+------+------+
|  ldsx|    12|     1|    男|     1|
| test1|    20|     1|    女|     1|
| test2|    26|     1|    男|     1|
| test3|    19|     1|    女|     1|
| test4|    51|     1|    女|     1|
| test5|    13|     1|    男|     1|
+------+------+------+------+------+

union unionALL 并集不去重(按列顺序)

获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合

df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
+---+-----+
df2.show()
+---+-----+
| id|value|
+---+-----+
|  3|    C|
|  4|    D|
+---+-----+
df1.union(df2)
DataFrame[id: bigint, value: string]
df1.union(df2).show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
|  3|    C|
|  4|    D|
+---+-----+# 去重使用distinct
df1.union(df2).distinct().show()
+---+-----+
| id|value|
+---+-----+
|  2|    B|
|  1|    A|
|  3|    C|
|  4|    D|
+---+-----+

unionByName 并集不去重(按列名)

是否允许缺失列:allowMissingColumns,默认不允许

# 按照列名合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+# 对于不存在列进行填补
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
# allowMissingColumns True默认填补null
df1.unionByName(df2, allowMissingColumns=True).show()+----+----+----+----+----+
|col0|col1|col2|col3|col4|
+----+----+----+----+----+
|   1|   2|   3|NULL|NULL|
|NULL|   4|   5|   6|   7|
+----+----+----+----+----+

unpivot 反转表(宽表转长表)

ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值

宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,

valueColumnName 对应 variableColumnName 存储值。

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
# 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
+---+------+-------+
| id| c_col|c_value|
+---+------+-------+
|  1|   age|     12|
|  1|  name|   ldsx|
|  1|gender|     男|
|  1|   age|     20|
|  1|  name|  test1|
|  1|gender|     女|
|  1|   age|     26|
|  1|  name|  test2|
|  1|gender|     男|
|  1|   age|     19|
|  1|  name|  test3|
|  1|gender|     女|
|  1|   age|     51|
|  1|  name|  test4|
|  1|gender|     女|
|  1|   age|     13|
|  1|  name|  test5|
|  1|gender|     男|
+---+------+-------+

withColumn 添加列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

# 使用d1上的列或者用常量列
d1.withColumn('c_value2',d1.c_value).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|      12|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|      男|
|  1|   age|     20|      20|
|  1|  name|  test1|   test1|
|  1|gender|     女|      女|
|  1|   age|     26|      26|
|  1|  name|  test2|   test2|
|  1|gender|     男|      男|
|  1|   age|     19|      19|
|  1|  name|  test3|   test3|
|  1|gender|     女|      女|
|  1|   age|     51|      51|
|  1|  name|  test4|   test4|
|  1|gender|     女|      女|
|  1|   age|     13|      13|
|  1|  name|  test5|   test5|
|  1|gender|     男|      男|
+---+------+-------+--------+
# 使用常量补充列
from spark>pyspark.sql.functions import lit
d1.withColumn('c_value2',lit('ldsx')).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|    ldsx|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     20|    ldsx|
|  1|  name|  test1|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     26|    ldsx|
|  1|  name|  test2|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     19|    ldsx|
|  1|  name|  test3|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     51|    ldsx|
|  1|  name|  test4|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     13|    ldsx|
|  1|  name|  test5|    ldsx|
|  1|gender|     男|    ldsx|
+---+------+-------+--------+
# 使用表达式设置列
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
df.show()
+------+
|number|
+------+
|     1|
|     2|
|     3|
|     4|
+------+
from spark>pyspark.sql.functions import col, when
df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
------+----------+
|number|new_number|
+------+----------+
|     1|       Low|
|     2|       Low|
|     3|      High|
|     4|      High|
+------+----------+

withColumns 添加多列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+---+-----+----+----+
|age| name|age2|age3|
+---+-----+----+----+
|  2|Alice|   4|   5|
|  5|  Bob|   7|   8|
+---+-----+----+----+# 可使用表达式
df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
+---+-----+----+---+
|age| name|  h1| h2|
+---+-----+----+---+
|  2|Alice|High|  5|
|  5|  Bob|High|  8|
+---+-----+----+---+

withColumnRenamed 列重命名

不存在的列重命名报错,返回新dataframe。

列,重命名列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()
+----+-----+
|age2| name|
+----+-----+
|   2|Alice|
|   5|  Bob|
+----+-----+

withColumnsRenamed 多列重命名

字典,列名的映射

df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
+-------+--------+
|new_age|new_name|
+-------+--------+
|      2|   Alice|
|      5|     Bob|
+-------+--------+

withMetadata 设置元数据

更新元数据,返回新dataframe

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
# 查看列的元数据
df.schema['age'].metadata
{}
# 设置元数据
df_meta = df.withMetadata('age', {'foo': 'bar'})
df_meta.schema['age'].metadata
{'foo': 'bar'}

write 存储表

write.saveAsTable

当追加插入的时候dataframe只需要scheam一致,会自动匹配

  • name: str, 表名

  • format: Optional[str] = None, 格式类型 hive,parquet…

  • mode: Optional[str] = None, 写入方式

    1. append:将this:class:DataFrame的内容附加到现有数据中,数据格式需要一致。
    2. “overwrite”:覆盖现有数据,数据格式不重要了,已此次覆盖为准。
    3. errorerrorifeists:如果数据已经存在,则抛出异常。
    4. ‘ignore’:如果数据已经存在,则自动忽略此操作。
  • partitionBy: Optional[Union[str, List[str]]] = None, 分区列表

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
# 覆盖重写
df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])# 追加写入
df.write.saveAsTable('ldsx_test','parquet','append',['age'])# 另一种写法
df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')

在这里插入图片描述

在这里插入图片描述

insertInto

不会对scheam进行校验,按位置插入

d2.show()
+-----+----+
|name1|age1|
+-----+----+
|ldsx1|   2|
|ldsx2|   3|
+-----+----+
d2.write.insertInto('ldsx_test')
d2.schema
StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])

http://www.ppmy.cn/ops/113739.html

相关文章

【Webpack--013】SourceMap源码映射设置

&#x1f913;&#x1f60d;Sam9029的CSDN博客主页:Sam9029的博客_CSDN博客-前端领域博主 &#x1f431;‍&#x1f409;若此文你认为写的不错&#xff0c;不要吝啬你的赞扬&#xff0c;求收藏&#xff0c;求评论&#xff0c;求一个大大的赞&#xff01;&#x1f44d;* &#x…

mysql笔记—sql性能分析

1.查看数据库各个语句的执行频次 show global/session status like ‘com__’ 2.慢查询 默认没有开启&#xff0c;需要手动开启&#xff08;在/etc/my.cnf中开启&#xff09; 开启后在localhost-slow.log中可以查询到慢查询的语句的相关信息&#xff1a; 3.explain 用法&…

ASP .NET CORE 6 项目实现WebSocket通信实践

一、简介 WebSocket 是一种计算机通信协议&#xff0c;提供了全双工通信通道&#xff0c;特别适用于需要频繁更新数据的应用&#xff0c;如实时聊天、在线游戏和股票行情等。它在 Web 应用中有着广泛的应用&#xff0c;因为它能够在客户端和服务器之间建立持久连接&#xff0c;…

第二百三十九节 JPA教程 - JPA一对一延迟加载示例

JPA教程 - JPA一对一延迟加载示例 以下代码显示如何使用延迟加载设置执行一对一映射。 OneToOne(fetchLAZY)private Department department;例子 下面的代码来自Person.java。 package cn.w3cschool.common; import javax.persistence.Entity; import javax.persistence.Gene…

Spring Boot校园管理系统:技术选型与架构设计

第2章相关技术 2.1 B/S架构 B/S结构的特点也非常多&#xff0c;例如在很多浏览器中都可以做出信号请求。并且可以适当的减轻用户的工作量&#xff0c;通过对客户端安装或者是配置少量的运行软件就能够逐步减少用户的工作量&#xff0c;这些功能的操作主要是由服务器来进行控制的…

Vue接入高德地图并实现基本的路线规划功能

目录 一、申请密钥 二、安装依赖 三、代码实现 四、运行截图 五、官方文档 一、申请密钥 登录高德开放平台&#xff0c;点击我的应用&#xff0c;先添加新应用&#xff0c;然后再添加Key。 如图所示填写对应的信息&#xff0c;系统就会自动生成。 二、安装依赖 npm i am…

PMP--二模--解题--41-50

文章目录 11.风险管理--风险代表对将来问题的预判&#xff0c;问题代表对过去问题事件的跟踪&#xff1b;两者联系&#xff1a;风险发生后会变成问题&#xff0c;而问题可能导致新的风险。41、 [单选] 在项目会议期间&#xff0c;一个团队发现三个月前关闭的问题仍然处于活跃状…

PyQt / PySide + Pywin32 + ctypes 自定义标题栏窗口 + 完全还原 Windows 原生窗口边框特效项目

项目地址&#xff1a; GitHub - github201014/PyQt-NativeWindow: A class of window include nativeEvent, use PySide or PyQt and Pywin32 and ctypesA class of window include nativeEvent, use PySide or PyQt and Pywin32 and ctypes - github201014/PyQt-NativeWindow…