Pyspark下操作dataframe方法(1)

devtools/2024/9/20 1:26:38/ 标签: pyspark, linux, python, spark, hadoop

文章目录

  • Pyspark dataframe
    • 创建DataFrame
      • 使用Row对象
      • 使用元组与scheam
      • 使用字典与scheam
      • 注意
    • agg 聚合操作
    • alias 设置别名
      • 字段设置别名
      • 设置dataframe别名
    • cache 缓存
    • checkpoint RDD持久化到外部存储
    • coalesce 设置dataframe分区数量
    • collect 拉取数据
    • columns 获取dataframe列

spark_dataframe_1">Pyspark dataframe

创建DataFrame

from spark>pyspark.sql import  SparkSession,Row
from spark>pyspark.sql.types import *def init_spark():spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master', "local[2]") \.enableHiveSupport().getOrCreate()return spark
spark = init_spark()# 设置字段类型
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("gender", StringType(), True),
])

使用Row对象

cs = Row('name','age','id','gender')
row_list = [ cs('ldsx','12','1','男'),cs('test1','20','1','女'),cs('test2','26','1','男'),cs('test3','19','1','女'),cs('test4','51','1','女'),cs('test5','13','1','男')]
data = spark.createDataFrame(row_list)
data.show()+-----+---+---+---+
| name|age| id|gender|
+-----+---+---+---+
| ldsx| 12|  1| 男|
|test1| 20|  1| 女|
|test2| 26|  1| 男|
|test3| 19|  1| 女|
|test4| 51|  1| 女|
|test5| 13|  1| 男|
+-----+---+---+---+
data.printSchema()
root|-- name: string (nullable = true)|-- age: string (nullable = true)|-- id: string (nullable = true)|-- gender: string (nullable = true)

使用元组与scheam

park.createDataFrame([('ldsx1','12','1','男'),('ldsx2','12','1','男')],schema).show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|ldsx1| 12|  1|    男|
|ldsx2| 12|  1|    男|
+-----+---+---+------+

使用字典与scheam

spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}]).show()
+---+------+---+----+
|age|gender| id|name|
+---+------+---+----+
| 12|    女|  1|ldsx|
+---+------+---+----+

注意

scheam设置优先级高于row设置,dict设置的key

schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("测试", StringType(), True),
])
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}],schema).show()
+----+---+---+----+
|name|age| id|测试|
+----+---+---+----+
|ldsx| 12|  1|null|
+----+---+---+----+

agg 聚合操作

在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。可以结合groupby使用。

from spark>pyspark.sql import functions as sf
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.agg({'age':'max'}).show()
+--------+
|max(age)|
+--------+
|      51|
+--------+
data.agg({'age':'max','gender':"max"}).show()
+-----------+--------+
|max(gender)|max(age)|
+-----------+--------+
|         男|      51|
+-----------+--------+data.agg(sf.min(data.age)).show()
+--------+
|min(age)|
+--------+
|      12|
+--------+
data.agg(sf.min(data.age),sf.min(data.name)).show()
+--------+---------+
|min(age)|min(name)|
+--------+---------+
|      12|     ldsx|
+--------+---------+

结合groupby使用

data.groupBy('gender').agg(sf.min('age')).show()+------+--------+
|gender|min(age)|
+------+--------+
|    女|      19|
|    男|      12|
+------+--------+
data.groupBy('gender').agg(sf.min('age'),sf.max('name')).show()
+------+--------+---------+
|gender|min(age)|max(name)|
+------+--------+---------+
|    女|      19|    test4|
|    男|      12|    test5|
+------+--------+---------+

alias 设置别名

字段设置别名

#字段设置别名
data.select(data['name'].alias('rename_name')).show()
+-----------+
|rename_name|
+-----------+
|       ldsx|
|      test1|
|      test2|
|      test3|
|      test4|
|      test5|
+-----------+

设置dataframe别名

d1 = data.alias('ldsx1')
d2 = data2.alias('ldsx2')
d1.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
d2.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|测试1| 12|  1|    男|
|测试2| 20|  1|    男|
+-----+---+---+------+d3 = d1.join(d2,col('ldsx1.gender')==col('ldsx2.gender'),'inner')
d3.show()
+-----+---+---+------+-----+---+---+------+
| name|age| id|gender| name|age| id|gender|
+-----+---+---+------+-----+---+---+------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|
|test2| 26|  1|    男|测试1| 12|  1|    男|
|test2| 26|  1|    男|测试2| 20|  1|    男|
|test5| 13|  1|    男|测试1| 12|  1|    男|
|test5| 13|  1|    男|测试2| 20|  1|    男|
+-----+---+---+------+-----+---+---+------+d3[['name']].show()
#报错提示
spark>pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`ldsx1`.`name`, `ldsx2`.`name`].
# 使用别名前缀获取
d3[['ldsx1.name']].show()
+-----+
| name|
+-----+
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
+-----+
>>> d3[['ldsx2.name']].show()
+-----+
| name|
+-----+
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
+-----+
d3.select('ldsx1.name','ldsx2.name').show()
+-----+-----+
| name| name|
+-----+-----+
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
+-----+-----+

cache 缓存

dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER

df.cache()
# 查看逻辑计划和物理计划
df.explain()

checkpoint RDD持久化到外部存储

Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。
CheckPoint支持写入HDFS。CheckPoint被认为是安全的

sc = spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir('hdfs:///ldsx_checkpoint')
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()

在这里插入图片描述

coalesce 设置dataframe分区数量

# 设置dataframe分区数量
d3 = d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()

collect 拉取数据

当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点,数据量很大慎用防止dirver炸掉。

d3.collect()
[Row(name='ldsx', age='12', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='ldsx', age='12', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试2', age='20', id='1', gender='男')]

columns 获取dataframe列

>>> d3.columns
['name', 'age', 'id', 'gender', 'name', 'age', 'id', 'gender']d3.withColumn('ldsx1.name_1',col('ldsx1.name')).show()
+-----+---+---+------+-----+---+---+------+------------+
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
+-----+---+---+------+-----+---+---+------+------------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|        ldsx|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|        ldsx|
|test2| 26|  1|    男|测试1| 12|  1|    男|       test2|
|test2| 26|  1|    男|测试2| 20|  1|    男|       test2|
|test5| 13|  1|    男|测试1| 12|  1|    男|       test5|
|test5| 13|  1|    男|测试2| 20|  1|    男|       test5|
+-----+---+---+------+-----+---+---+------+------------+# 重命名列名
d3.withColumnRenamed('ldsx1.name_1',col('ldsx1.name')).show()

http://www.ppmy.cn/devtools/111130.html

相关文章

苹果宣布iOS 18正式版9月17日推送:支持27款iPhone升级

9月10日消息,在苹果秋季发布会结束后, 苹果宣布将于9月17日(下周二)推送iOS 18正式版系统。 苹果官网显示,iOS 18正式版将兼容第二代iPhone SE及之后的所有机型,加上刚发布的iPhone 16系列,共兼容27款iPhone。 iOS 18升…

【数据获取与读取】JSON CSV

数据分析流程 获取数据-读取数据-评估数据-清洗数据-整理数据-分析数据-可视化数据 公开数据集 飞桨(百度旗下深度学习平台)数据集:https:/aistudio.baidu.com/aistudio/datasetoverview 天池(阿里云旗下开发者竞赛平台&#xf…

​了解MySQL 的二进制日志文件​Binlog

1. SQL 语句的几种类型 首先介绍一下,对于一个 SQL 语句,它常常被分为以下几种类型: DDL(Data Definition Language,数据定义语言):用来操作数据库、表、列等,比如 CREATE、ALTER…

现在有一台ubuntu22.04 的工作站机器,现在想通过RDP的方式进行远程开发

在 Ubuntu 22.04 工作站上通过 RDP(远程桌面协议)进行连接的具体步骤如下: 1. 安装 RDP 服务 Ubuntu 默认不支持 RDP 连接,因此你需要安装一个 RDP 服务器,通常使用 xrdp 这个软件包。 步骤: 打开终端&a…

核心知识点合集

不断补充... 知识模块文章详情心得&资料JVM相关 Java:简述JDK,JRE,JVM之间的关系JVM:简述JVM内存分配模型JVM:浅谈垃圾回收GC机制JVM:浅谈内存溢出的原因JVM:浅谈JVM调优策略 线程相关资料…

Spring-bean的生命周期-尾篇

上回说到阶段9,现在我们接着往下说 阶段10:所有单例bean初始化完成后阶段 所有单例bean实例化完成之后,spring会回调下面这个接口: package org.springframework.beans.factory;public interface SmartInitializingSingleton {…

策略抉择:左右为难,交易方向要如何破局?

交易决策的核心往往围绕着一个关键问题:是采取左侧交易策略还是右侧交易策略?左侧交易,亦称逆向交易,与右侧交易(顺势交易)形成鲜明对比,两者路径迥异,所以让很多交易员不知道该如何…

iOS——持久化

iOS的数据存储机制 沙盒机制 应用沙盒文件夹包含了: Application(应用程序包):包含了所有的资源文件和和可执行文件,上架前经过数字签名,上架后不可修改。 Documents:文档目录,要保存程序生成的数据&…

Leetcode 最大子数组和

使用“Kadane’s Algorithm”来解决。 Kadane’s Algorithm 在每个步骤中都保持着一个局部最优解,即以当前元素为结尾的最大子数组和(也就是局部最优解),并通过比较这些局部最优解和当前的全局最优解来找到最终的全局最优解。 Kadane’s Algorithm的核…

使用hutool工具发送request 请求模板

使用hutool工具发送request 请求模板 private JSONArray fetchData(){String url "https://www.baidu.com";String token "asdfasdfsdf";JSONObject requestBody new JSONObject();JSONObject filterOption new JSONObject();JSONObject filters new …

【Unity C#】如何计算小球与地面的投影面积

在 3D 空间中移动的圆与平面地板的投影区域 问题简述分步计算:在 Unity 中的实现思路:蒙特卡罗法计算交集面积 Unity 代码示例1解决方案思路2Unity 代码实现优化空间 问题简述 在 3D 空间中移动的圆与平面地板的投影区域。我们可以在 3D 空间中使用数学…

Redis 主从复制的原理详解

引言 Redis 作为一种高性能的内存数据库,广泛应用于高并发、低延迟的场景中。然而,单机版的 Redis 存在一定的局限性,尤其是在高可用性和负载均衡方面。为了应对这些挑战,Redis 提供了主从复制(Replication&#xff0…

TypeScript系列:初篇 - 类型系统

TypeScript 是 JavaScript 的一个超集,添加了类型系统和编译期错误检查等功能 > 静态类型检查。 类型指的是一组具有相同特征的值。 静态类型系统描述了运行程序时的值的形状和行为。 TypeScript 支持块级类型声明,即类型可以声明在代码块&#xff0…

C#中的方法上部

方法重载 在C#中,方法重载(Method Overloading)是一种允许在同一个类中定义多个同名方法,但参数列表不同的特性。参数列表不同可以是参数的类型不同、参数的数量不同,或者是两者都不同。方法重载使得调用者可以根据需…

单例模式对比:静态内部类 vs. 饿汉式

单例模式是一种设计模式,旨在确保一个类只有一个实例,并提供全局访问点。Java 中有多种实现单例模式的方式,其中静态内部类实现和饿汉式实现是两种常见的方法。本文将对这两种单例模式进行详细对比,说明它们在延迟加载方面的区别&…

SQL进阶技巧:如何获取数组中前N个元素?

目录 0 场景描述 1 数据准备 2 问题分析 3 小结 0 场景描述 表数据如下: id arr 1 [a,b,c,d,e,f] 2 [e,d,s,d,g,w,s] 3 [a] 4 [] 5 NULL 目的是按顺序选择每行“arr”中的前 3 个元素并将其作为数组返回,输出如下: id output_…

vue3+ts+vite搭建脚手架(二)配置eslintprettier

我们用vite创建好的脚手架是十分纯净的,我们可以自由配置一些自己想要的东西 1.安装 eslint 依赖 npm i eslint eslint-plugin-vue typescript-eslint/parser typescript-eslint/eslint-plugin vue/eslint-config-typescript -Deslint:ESLint 的核心包…

JAVA基础:抽象类,接口,instanceof,类关系,克隆

1 JDK中的包 JDK JRE 开发工具集(javac.exe) JRE JVM java类库 JVM java 虚拟机 jdk中自带了许多的包(类) , 常用的有 java.lang 该包中的类,不需要引用,可以直接使用。 例如&#xff1…

Linux: network: TCP: errno: EWOULDBLOCK

https://mzhan017.blog.csdn.net/article/details/108010013 这个errno的意思: 如果是send接口函数返回的错误,代表tcp socket的sending buffer满了,让应用程序等上一段时间重试send。 所以,这个产生的原因就不固定了: 可能是当前系统太忙,导致系统发包慢,buffer累积; 可…

集成 Logrus 到 Gin:打造高效的 Go Web 日志系统

在 Go 语言的 Web 开发中,Gin 是一个轻量级且高性能的 Web 框架,而 Logrus 则是一个非常流行的日志库,它提供了结构化的日志记录功能。将 Logrus 集成到 Gin 框架中,可以极大地增强你的 Web 应用程序的日志管理能力。本文将详细介…