Pyspark下操作dataframe方法(1)

server/2024/9/20 1:21:46/ 标签: pyspark, linux, python, spark, hadoop

文章目录

  • Pyspark dataframe
    • 创建DataFrame
      • 使用Row对象
      • 使用元组与scheam
      • 使用字典与scheam
      • 注意
    • agg 聚合操作
    • alias 设置别名
      • 字段设置别名
      • 设置dataframe别名
    • cache 缓存
    • checkpoint RDD持久化到外部存储
    • coalesce 设置dataframe分区数量
    • collect 拉取数据
    • columns 获取dataframe列

spark_dataframe_1">Pyspark dataframe

创建DataFrame

from spark>pyspark.sql import  SparkSession,Row
from spark>pyspark.sql.types import *def init_spark():spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master', "local[2]") \.enableHiveSupport().getOrCreate()return spark
spark = init_spark()# 设置字段类型
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("gender", StringType(), True),
])

使用Row对象

cs = Row('name','age','id','gender')
row_list = [ cs('ldsx','12','1','男'),cs('test1','20','1','女'),cs('test2','26','1','男'),cs('test3','19','1','女'),cs('test4','51','1','女'),cs('test5','13','1','男')]
data = spark.createDataFrame(row_list)
data.show()+-----+---+---+---+
| name|age| id|gender|
+-----+---+---+---+
| ldsx| 12|  1| 男|
|test1| 20|  1| 女|
|test2| 26|  1| 男|
|test3| 19|  1| 女|
|test4| 51|  1| 女|
|test5| 13|  1| 男|
+-----+---+---+---+
data.printSchema()
root|-- name: string (nullable = true)|-- age: string (nullable = true)|-- id: string (nullable = true)|-- gender: string (nullable = true)

使用元组与scheam

park.createDataFrame([('ldsx1','12','1','男'),('ldsx2','12','1','男')],schema).show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|ldsx1| 12|  1|    男|
|ldsx2| 12|  1|    男|
+-----+---+---+------+

使用字典与scheam

spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}]).show()
+---+------+---+----+
|age|gender| id|name|
+---+------+---+----+
| 12|    女|  1|ldsx|
+---+------+---+----+

注意

scheam设置优先级高于row设置,dict设置的key

schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("测试", StringType(), True),
])
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}],schema).show()
+----+---+---+----+
|name|age| id|测试|
+----+---+---+----+
|ldsx| 12|  1|null|
+----+---+---+----+

agg 聚合操作

在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。可以结合groupby使用。

from spark>pyspark.sql import functions as sf
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.agg({'age':'max'}).show()
+--------+
|max(age)|
+--------+
|      51|
+--------+
data.agg({'age':'max','gender':"max"}).show()
+-----------+--------+
|max(gender)|max(age)|
+-----------+--------+
|         男|      51|
+-----------+--------+data.agg(sf.min(data.age)).show()
+--------+
|min(age)|
+--------+
|      12|
+--------+
data.agg(sf.min(data.age),sf.min(data.name)).show()
+--------+---------+
|min(age)|min(name)|
+--------+---------+
|      12|     ldsx|
+--------+---------+

结合groupby使用

data.groupBy('gender').agg(sf.min('age')).show()+------+--------+
|gender|min(age)|
+------+--------+
|    女|      19|
|    男|      12|
+------+--------+
data.groupBy('gender').agg(sf.min('age'),sf.max('name')).show()
+------+--------+---------+
|gender|min(age)|max(name)|
+------+--------+---------+
|    女|      19|    test4|
|    男|      12|    test5|
+------+--------+---------+

alias 设置别名

字段设置别名

#字段设置别名
data.select(data['name'].alias('rename_name')).show()
+-----------+
|rename_name|
+-----------+
|       ldsx|
|      test1|
|      test2|
|      test3|
|      test4|
|      test5|
+-----------+

设置dataframe别名

d1 = data.alias('ldsx1')
d2 = data2.alias('ldsx2')
d1.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
d2.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|测试1| 12|  1|    男|
|测试2| 20|  1|    男|
+-----+---+---+------+d3 = d1.join(d2,col('ldsx1.gender')==col('ldsx2.gender'),'inner')
d3.show()
+-----+---+---+------+-----+---+---+------+
| name|age| id|gender| name|age| id|gender|
+-----+---+---+------+-----+---+---+------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|
|test2| 26|  1|    男|测试1| 12|  1|    男|
|test2| 26|  1|    男|测试2| 20|  1|    男|
|test5| 13|  1|    男|测试1| 12|  1|    男|
|test5| 13|  1|    男|测试2| 20|  1|    男|
+-----+---+---+------+-----+---+---+------+d3[['name']].show()
#报错提示
spark>pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`ldsx1`.`name`, `ldsx2`.`name`].
# 使用别名前缀获取
d3[['ldsx1.name']].show()
+-----+
| name|
+-----+
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
+-----+
>>> d3[['ldsx2.name']].show()
+-----+
| name|
+-----+
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
+-----+
d3.select('ldsx1.name','ldsx2.name').show()
+-----+-----+
| name| name|
+-----+-----+
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
+-----+-----+

cache 缓存

dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER

df.cache()
# 查看逻辑计划和物理计划
df.explain()

checkpoint RDD持久化到外部存储

Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。
CheckPoint支持写入HDFS。CheckPoint被认为是安全的

sc = spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir('hdfs:///ldsx_checkpoint')
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()

在这里插入图片描述

coalesce 设置dataframe分区数量

# 设置dataframe分区数量
d3 = d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()

collect 拉取数据

当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点,数据量很大慎用防止dirver炸掉。

d3.collect()
[Row(name='ldsx', age='12', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='ldsx', age='12', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试2', age='20', id='1', gender='男')]

columns 获取dataframe列

>>> d3.columns
['name', 'age', 'id', 'gender', 'name', 'age', 'id', 'gender']d3.withColumn('ldsx1.name_1',col('ldsx1.name')).show()
+-----+---+---+------+-----+---+---+------+------------+
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
+-----+---+---+------+-----+---+---+------+------------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|        ldsx|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|        ldsx|
|test2| 26|  1|    男|测试1| 12|  1|    男|       test2|
|test2| 26|  1|    男|测试2| 20|  1|    男|       test2|
|test5| 13|  1|    男|测试1| 12|  1|    男|       test5|
|test5| 13|  1|    男|测试2| 20|  1|    男|       test5|
+-----+---+---+------+-----+---+---+------+------------+# 重命名列名
d3.withColumnRenamed('ldsx1.name_1',col('ldsx1.name')).show()

http://www.ppmy.cn/server/115858.html

相关文章

Web开发详解

要做Web开发,就好像你在厨房里要做一顿丰盛的晚餐,从准备食材到最后上桌,整个过程得协调得当。Web开发的流程有前端、后端、数据库、API,以及其他的工具和技术来共同组成。别担心,听起来复杂,但我会给你讲得…

C# 通过拖控件移动窗体

目录 引言一、通过控件事件移动窗体1、创建窗体界面2、添加控件事件3、添加代码 二、通过windowsAPI移动窗体1、 构建窗体和添加事件2、代码展示 三、其它方式 引言 在C#Form窗体设计中,如果我们不需要使用默认边框设计自己个性化的窗体(FromBorderStyl…

Qt Base64数据保存成QImage 踏坑(data:image/png;base64)

项目场景 项目中,Qt程序收到JS发过来的base64数据,按理说,下面代码就可以直接保存成本地图片。 //str是base64数据 void RemoteDataManager::base64_string_2_img(const QString & str) {QImage img;QByteArray arr_base64 str.toLat…

Dcoker 运行es

1,创建network docker network create my-network 2,docker运行es容器 docker run -d --name es-container --net my-network -p 9200:9200 -p 9300:9300 -e "discovery.typesingle-node" docker.elastic.co/elasticsearch/elasticsearch:7…

常见 HTTP 状态码详解与Nginx 文件上传大小限制

在我们日常使用 Nginx 搭建网站或应用服务时,可能会遇到很多与文件上传和请求响应相关的问题。今天我们就来聊聊 如何限制文件上传的大小,并介绍一些常见的 HTTP 状态码 及其在 Nginx 中的处理方式。 一、文件上传大小限制 有时,我们需要限…

lxml官方入门教程(The lxml.etree Tutorial)翻译

lxml官方入门教程(The lxml.etree Tutorial)翻译 说明: 首次发表日期:2024-09-05官方教程链接: https://lxml.de/tutorial.html使用KIMI和豆包机翻水平有限,如有错误请不吝指出 这是一个关于使用lxml.et…

OpenObserve云原生可观测平台本地Docker部署与远程访问实战教程

文章目录 前言1. 安装Docker2. Docker镜像源添加方法3. 创建并启动OpenObserve容器4. 本地访问测试5. 公网访问本地部署的OpenObserve5.1 内网穿透工具安装5.2 创建公网地址 6. 配置固定公网地址 前言 本文主要介绍如何在Linux系统使用Docker快速本地化部署OpenObserve云原生可…

1.1 IP地址与端口

欢迎大家订阅【网络安全】学习专栏,开启你的网络安全学习之旅! 文章目录 前言一、什么是IP地址?1. 定义2. 类型 二、如何判断IP地址是公网还是内网?三、什么是端口? 前言 随着信息技术的迅猛发展,网络安全成为了企业…

基于百度AIStudio飞桨paddleRS-develop版道路模型开发训练

基于百度AIStudio飞桨paddleRS-develop版道路模型开发训练 参考地址:https://aistudio.baidu.com/projectdetail/8271882 基于python35paddle120env环境 预测可视化结果: (一)安装环境: 先上传本地下载的源代码Pad…

使用matlab的热门问题

MATLAB广泛应用于科学计算、数据分析、信号处理、图像处理、机器学习等多个领域,因此热门问题也涵盖了这些方面。以下是一些可能被认为当前最热门的MATLAB问题: 深度学习与神经网络: 如何使用MATLAB的深度学习工具箱(Deep Learni…

C++ namespace(域)

namespace的价值 避免命名冲突:在大型项目或使用多个库的情况下,不同部分可能会定义相同名称的实体(如变量、函数、类等)。使用命名空间可以有效地避免这些命名冲突。【这一点的尤其关键的,在C语言里面,是没…

面对Redis数据量庞大时的应对策略

面对Redis数据量庞大时的应对策略,我们可以从多个维度出发,包括数据分片、内存优化、持久化策略、使用集群、硬件升级、数据淘汰策略、以及数据结构选择等。以下是对这些策略的详细探讨: 一、数据分片(Sharding) 当R…

Nginx反向代理功能及动静分离实现

一:Nginx支持正向代理和反向代理 1.正向代理 正向代理,指的是通过代理服务器 代理浏览器/客户端去重定向请求访问到目标服务器 的一种代理服务。 正向代理服务的特点是代理服务器 代理的对象是浏览器/客户端,也就是对于目标服务器 来说浏览…

sql中索引查看是否生效

在pg数据库中有多种索引存在,在一般情况下我们取使用普通索引 以下是一些常见导致索引未命中的原因和优化策略 1.如果查询中的条件与索引字段的顺序不匹配,或者索引字段没有完全包含在查询条件中,索引可能不会被使用。 2.在查询中使用函数…

Leetcode 109.有序链表转换二叉搜索树(Medium)

给定一个单链表的头节点 head ,其中的元素 按升序排序 ,将其转换为 平衡 二叉搜索树。 示例 1: 输入: head [-10,-3,0,5,9] 输出: [0,-3,9,-10,null,5] 解释: 一个可能的答案是[0,-3,9,-10,null,5],它表示所示的高度…

SpringBoot中使用log遇到的一些问题

1、springboot logger.level与<logger>优先级&#xff1f; 百度结果如下&#xff1a; 所以得出结论&#xff0c;如果在yml种配置&#xff1a; logging:level:com.ruoyi: debugorg.springframework: warn 同时在logback-spring.xml中配置了 <!-- 系统模块日志级别控…

C++多态讲解

多态 多态的概念 通俗来说&#xff0c;就是多种形态。多态分为编译时多态(静态多态)和运行时多态(动态多态)这里重点讲运行时多态。 运行时多态 运行时多态&#xff0c;具体点就是去完成某个行为(函数)&#xff0c;可以传不同的对象就会完成不同的行为&#xff0c;就达到多种形…

智能对决:提示词攻防中的AI安全博弈

智能对决&#xff1a;提示词攻防中的AI安全博弈 在2024年上海AIGC开发者大会上&#xff0c;知名提示词爱好者工程师云中嘉树发表了关于AI提示词攻防与安全博弈的精彩演讲。他深入探讨了当前AI产品的安全现状&#xff0c;提示词攻击的常见手段及其应对策略。本文将对他的演讲进…

【Android】使用和风天气API获取天气数据吧!(天气预报系列之一)

【Android】使用和风天气API获取天气数据吧&#xff01;&#xff08;天气预报系列之一&#xff09; 古话说得好&#xff0c;要有天气预报&#xff0c;首先需要有天气&#xff0c;和预报。 今天给大家介绍一个好用的天气预报API&#xff1a;和风天气。以及webAPI的使用方法~&a…

使用Cerbot---Let’s Encrypt生成免费的ssl证书,并设置自动更新证书

安装Certbot客户端 yum install certbot 获取证书 certbot certonly --webroot -w /var/www/demo.com -d demo.com 按照步骤 输入邮箱 同意条例 成功申请证书 修改对应的nginx的conf文件 server {listen 80;listen [::]:80;server_name demo.com;# 将 HTTP 请求重定向到 H…