Pyspark下操作dataframe方法(3)

ops/2024/9/20 1:27:01/ 标签: python, pyspark, hive

文章目录

  • Pyspark dataframe操作方式3
    • df.foreach 逐条执行
    • foreachPartition 按分区逐条执行
    • freqltems
    • groupBy 分组
    • head 获取指定数量开头
    • hint 查询优化
    • intersect 获取交集(去重)
    • isEmpty 判断dataframe是否为空
    • join 关联
    • limit 限定数量
    • mapInPandas 迭代处理
    • maplnArrow 迭代处理
    • fill 填充
    • orderBy 排序
    • persist 持久化缓存
    • printSchema 打印架构

Pyspark dataframe操作方式3

df.foreach 逐条执行

df.foreach() == df.rdd.foreach()

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
def func(row):print(row.name)# row对象进入func执行
df.foreach(func)
Alice
Bob

foreachPartition 按分区逐条执行

df.show()
+---+-----+
|age| name|
+---+-----+
| 14|  Tom|
| 23|Alice|
| 16|  Bob|
+---+-----+
def func(itr):for person in itr:print(person.name)df.foreachPartition(func)
Tom
Alice
Bob

freqltems

df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  1| 11|
|  1| 11|
|  3| 10|
|  4|  8|
|  4|  8|
+---+---+
df.freqItems(["c1", "c2"]).show()
+------------+------------+
|c1_freqItems|c2_freqItems|
+------------+------------+
|   [1, 3, 4]| [8, 10, 11]|
+------------+------------+

groupBy 分组

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  2|  Bob|
|  5|  Bob|
+---+-----+df.groupBy("name").agg({"age": "sum"}).show()
+-----+--------+
| name|sum(age)|
+-----+--------+
|  Bob|       9|
|Alice|       2|
+-----+--------+df.groupBy("name").agg({"age": "max"}).withColumnRenamed('max(age)','new_age').sort('new_age').show()
+-----+-------+
| name|new_age|
+-----+-------+
|Alice|      2|
|  Bob|      5|
+-----+-------+

head 获取指定数量开头

df.head(2)
[Row(age=2, name='Alice'), Row(age=2, name='Bob')]

hint 查询优化

处理大表join时,spark默认策略可能不是最优解,通过hint 可以设置join类型

其他hints: merge,shuffle,coalesce

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])                                                                                             
df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
df.join(df2, "name").explain()  
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]+- SortMergeJoin [name#1641], [name#1645], Inner:- Sort [name#1641 ASC NULLS FIRST], false, 0:  +- Exchange hashpartitioning(name#1641, 200), ENSURE_REQUIREMENTS, [plan_id=1916]:     +- Filter isnotnull(name#1641):        +- Scan ExistingRDD[age#1640L,name#1641]+- Sort [name#1645 ASC NULLS FIRST], false, 0+- Exchange hashpartitioning(name#1645, 200), ENSURE_REQUIREMENTS, [plan_id=1917]+- Filter isnotnull(name#1645)+- Scan ExistingRDD[height#1644L,name#1645]df.join(df2.hint("broadcast"), "name").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]+- BroadcastHashJoin [name#1641], [name#1645], Inner, BuildRight, false:- Filter isnotnull(name#1641):  +- Scan ExistingRDD[age#1640L,name#1641]+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=1946]+- Filter isnotnull(name#1645)+- Scan ExistingRDD[height#1644L,name#1645]

intersect 获取交集(去重)

df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
PyDev console: starting.
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
|  c|  4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+
df1.intersect(df2).show()+---+---+
| C1| C2|
+---+---+
|  b|  3|
|  a|  1|
+---+---+

intersectAll 获取交集(保留重复项)

df1.intersectAll(df2).show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+

isEmpty 判断dataframe是否为空

# 空返回True 非空返回False
df1.isEmpty()
False

join 关联

注意聚合方式可能会影响show出来的列

单列聚合

df2.show()
+------+----+
|height|name|
+------+----+
|    80| Tom|
|    85| Bob|
+------+----+
df4.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+
df4.join(df2,df4.name == df2.name,how='left').show()
+----+------+-----+------+----+
| age|height| name|height|name|
+----+------+-----+------+----+
|   5|  null|  Bob|    85| Bob|
|  10|    80|Alice|  null|null|
|null|  null|  Tom|    80| Tom|
|null|  null| null|  null|null|
+----+------+-----+------+----+
df4.join(df2,df4.name == df2.name).show()
+----+------+----+------+----+
| age|height|name|height|name|
+----+------+----+------+----+
|   5|  null| Bob|    85| Bob|
|null|  null| Tom|    80| Tom|
+----+------+----+------+----+# 会合并同列名
df4.join(df2,'name').show()
+-----+----+------+------+
| name| age|height|height|
+-----+----+------+------+
|Alice|  10|    80|    80|
|  Bob|   5|  null|    85|
|  Tom|null|  null|    80|
+-----+----+------+------+

多列聚合

df2.show()
+------+-----+
|height| name|
+------+-----+
|    80|  Tom|
|    85|  Bob|
|    80|Alice|
+------+-----+
df4.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+
df4.join(df2,[df4.name == df2.name,df4.age==df2.age]).show()
+---+------+-----+------+-----+
|age|height| name|height| name|
+---+------+-----+------+-----+
| 10|    80|Alice|    80|Alice|
+---+------+-----+------+-----+# 会合并同列名
df4.join(df2,['name','height']).show()
+-----+------+---+
| name|height|age|
+-----+------+---+
|Alice|    80| 10|
+-----+------+---+df4.join(df2,[df4.name == df2.name,df4.height==df2.height],how='left').show()
+----+------+-----+------+-----+
| age|height| name|height| name|
+----+------+-----+------+-----+
|  10|    80|Alice|    80|Alice|
|   5|  null|  Bob|  null| null|
|null|  null|  Tom|  null| null|
|null|  null| null|  null| null|
+----+------+-----+------+-----+df4.join(df2,'name').show()
+-----+----+------+------+
| name| age|height|height|
+-----+----+------+------+
|Alice|  10|    80|    80|
|  Bob|   5|  null|    85|
|  Tom|null|  null|    80|
+-----+----+------+------+
df4.join(df2,'name').select(df4.height).show()
+------+
|height|
+------+
|    80|
|  null|
|  null|
+------+
df4.join(df2,'name').select(df4.height,df2.height).show()
+------+------+
|height|height|
+------+------+
|    80|    80|
|  null|    85|
|  null|    80|
+------+------+

limit 限定数量

df = spark.createDataFrame( [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
df.limit(1).show()
+---+----+
|age|name|
+---+----+
| 14| Tom|
+---+----+
df.limit(0).show()
+---+----+
|age|name|
+---+----+
+---+----+

mapInPandas 迭代处理

使用pandas dataframe的迭代器

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):for pdf in iterator:print(pdf,type(pdf))yield pdf[pdf.id == 1]df.mapInPandas(filter_func, df.schema).show()  
# 进入filter_func变成了dataframe处理id  age
0   1   21 <class 'pandas.core.frame.DataFrame'>id  age
0   2   30 <class 'pandas.core.frame.DataFrame'>
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+

maplnArrow 迭代处理

该函数应采用pyarrow的迭代器

import pyarrow  
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):for batch in iterator:print(batch,type(batch))pdf = batch.to_pandas()print(pdf,type(pdf))yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1])df.mapInArrow(filter_func, df.schema).show()
pyarrow.RecordBatch
id: int64
age: int64 <class 'pyarrow.lib.RecordBatch'>id  age
0   1   21 <class 'pandas.core.frame.DataFrame'>
pyarrow.RecordBatch
id: int64
age: int64 <class 'pyarrow.lib.RecordBatch'>id  age
0   2   30 <class 'pandas.core.frame.DataFrame'>
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+

fill 填充

d1 = spark.sql("SELECT 1 AS c1, int(NULL) AS c2")
d1.show()
+---+----+
| c1|  c2|
+---+----+
|  1|null|
+---+----+
d1.na.fill(2).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

orderBy 排序

df.orderBy('age').show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  5|  Bob|
+---+-----+

persist 持久化缓存

from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)

printSchema 打印架构

以树格式打印出架构

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  5|  Bob|
+---+-----+df.printSchema()
root|-- age: long (nullable = true)|-- name: string (nullable = true)

http://www.ppmy.cn/ops/111258.html

相关文章

PHP 环境搭建教程

搭建一个稳定的PHP开发环境是开发Web应用的基础。在Linux系统上&#xff0c;LAMP&#xff08;Linux, Apache, MySQL/MariaDB, PHP&#xff09;堆栈是最广泛使用的组合。本文将详细介绍如何在Linux上搭建PHP开发环境&#xff0c;涵盖安装步骤、配置和测试。更多内容&#xff0c;…

0911(绘制事件,qt中的网络通信)

一、实现一个时钟 1)代码 头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPainter> #include <QPaintEvent> #include <QTimer> #include <QTime> #include <QTimerEvent>QT_BEGIN_NAMESPACE nam…

大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

Linux 文件查找命令which,find详解

which which命令可以查找可执行文件的路径&#xff0c;根据环境变量PATH的值搜索指令命令的路径 例如此时要搜索ls命令文件的路径&#xff0c;则执行 which ls 返回 /usr/bin/ls 若此时要查找python的路径&#xff0c;则执行 which python 返回 /root/anaconda3/bin/py…

Qt事件处理机制

用qt实现简单闹钟 widget.h #ifndef WIDGET_H #define WIDGET_H #include<QPushButton> #include<QTextEdit> #include<QLabel> #include <QWidget> #include<QMouseEvent> #include<QPoint> #include<QTime> #include<QTimer&…

NASA数据集:ASTER L2 地表辐射率 VNIR 和 SWIR V003

目录 简介 摘要 代码 引用 网址推荐 0代码在线构建地图应用 机器学习 ASTER L2 地表辐射率 VNIR 和 SWIR V003 简介 ASTER 地表辐照度可见近红外和短波红外(AST_09)是一个多文件产品(https://lpdaac.usgs.gov/documents/996/ASTER_Earthdata_Search_Order_Instruct…

GlusterFS分布式存储

目录 一、GlusterFS分布式存储概述 1、GFS概念 2、GFS特点 3、GFS术语 4、GFS架构 5、GFS工作流程 6、GlusterFs的卷类型 6.1、 分布式卷&#xff08;Distributed Volume&#xff09; 6.2、条带卷&#xff08;Striped Volume&#xff09; 6.3、复制卷&#xff08;Rep…

laravel 11 区分多模块的token

数据表&#xff1a;用户表&#xff08;users&#xff09;、管理员表&#xff08;admin_user&#xff09;&#xff0c; 配置bootstrap/app.php guards > [web > [driver > session,provider > admin_users,],home > [driver > sanctum,provider > users,]…

android os 多用户介绍

AOSP (Android Open Source Project) 中的 multi-user 支持允许设备上存在多个用户账户&#xff0c;每个用户都有自己的环境、应用和数据。这种特性对于平板电脑或家庭娱乐设备尤其有用&#xff0c;因为它允许多个家庭成员或朋友共享同一设备而不互相干扰。 下面是一些与 AOSP…

前端网络请求库:Axios

目录 1. 网络请求的基本概念 1.1 网络请求的基础HTTP协议 1.2 HTTP工作原理 1.3 TCP连接 1.31 建立TCP连接 1.31 关闭TCP连接 1.4 HTTP的请求方法 1.5 HTTP的响应状态码分类 1.6. 前端网络请求方式 2. Axios在在vue项目中的使用 2.1安装与使用 2.2 Axios使用HTTP请求…

erlang学习: Mnesia Erlang数据库2

Mnesia数据库增加与查询学习 -module(test_mnesia).-record(shop, {item, quantity, cost}). -record(cost, {name, price}). -record(design, {info, plan}). %% API -export([insert/3,select/1,start/0]). start() ->mnesia:start().insert(Name, Quantity, Cost) ->…

QT 中使用QXlsx 读写文件

PC 工具中需要操作xlsx格式的数据&#xff0c;前面使用的是QAxObject&#xff0c;需要一些设置&#xff0c;还要安装office的控件或者wps的控件&#xff0c;在一些电脑上面有的时候还会有异常&#xff0c;后面发现一个好像的开源库QXls&#xff0c;发现读写的速度比以前快&…

在Flask中实现跨域请求(CORS)

在Flask中实现跨域请求&#xff08;CORS&#xff0c;Cross-Origin Resource Sharing&#xff09;主要涉及到对Flask应用的配置&#xff0c;以允许来自不同源的请求访问服务器上的资源。以下是在Flask中实现CORS的详细步骤和方法&#xff1a; 一、理解CORS CORS是一种机制&…

Vue与React的Diff算法

虚拟DOM 定义 虚拟DOM是一种用于在前端开发中模拟真实DOM的技术。它是一种抽象的数据结构&#xff08;简单来说就是一个Javascript对象&#xff09;&#xff0c;用于描述HTML或XML文档的结构和内容。通过将页面的状态和结构保存在内存中&#xff0c;而不是直接操作真实的DOM&am…

智能路口安全预警系统:精准提醒降低事故发生率

路口安全预警系统的应用&#xff0c;无疑是提升道路交通安全管理水平、保障公众出行安全的重要措施。其综合了多种先进技术和设备&#xff0c;形成了一个高效、智能的安全防护网&#xff0c;具体优势体现在以下几个方面&#xff1a; 智能识别与预警&#xff1a;利用AI智能识别…

Linux学习-ELK(一)

配置三台elasticsearch服务器 安装包 elasticsearch.j2 报错 #---执行rsync命令报以下错误 [rootes1 ~]# rsync -av /etc/hosts 192.168.29.172:/etc/hosts root192.168.29.172s password: bash: rsync: 未找到命令 rsync: connection unexpectedly closed (0 bytes receive…

【React Native】第三方组件

WebView Picker mode {‘dropdown’} 只在 android 生效 Swiper 搭配 ScrollView 使用 AsyncStorage AsyncStorage.setItem()AsyncStorage.getItem()AsyncStorage.removeItem()AsyncStorage.clear() Geolocation 配置添加获取定位信息的授权许可&#xff0c;在 androi…

自定义封装输入框组件时的一些默认样式问题处理、原生input标签样式边线等处理 之 appearance: none 魔法 真的记住了

appearance: none 魔法 appearance: none; /* 确保覆盖所有默认样式 */-moz-appearance: textfield; /* 移除 Firefox 默认的外观 */-webkit-appearance: none; /* 移除 WebKit 浏览器的默认样式 */然后就如你所愿了&#xff1a;

K8S - Volume - NFS 卷的简介和使用

在之前的文章里已经介绍了 K8S 中两个简单卷类型 hostpath 和 emptydir k8s - Volume 简介和HostPath的使用 K8S - Emptydir - 取代ELK 使用fluentd 构建logging saidcar 但是这两种卷都有同1个限制&#xff0c; 就是依赖于 k8s nodes的空间 如果某个service pod中需要的vol…

Android C++ Binder 的两种实现方式

Binder 机制是 Android 中使用最广泛的进程间通信机制&#xff0c;借助 Binder 开发者可以方便的开发各种实现应用间信息传输、分享的应用。对于 Android 应用开发者来说&#xff0c;开发都是基于 Java 语言进行的&#xff0c;但其实 Android 系统提供了实现 C 语言层的 Binder…