【小贪】大数据处理:Pyspark, Pandas对比及常用语法

news/2024/9/20 1:24:58/ 标签: python, 大数据处理, Pyspark, Pandas

近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:

  • ✅数据库常用:MySQL, Hive SQL, Spark SQL
  • 大数据处理常用:Pyspark, Pandas
  • ⚪ 图像处理常用:OpenCV, matplotlib
  • ⚪ 机器学习常用:SciPy, Sklearn
  • ⚪ 深度学习常用:Pytorch, numpy
  • ⚪ 常用数据结构语法糖:itertools, collections
  • ⚪ 常用命令: Shell, Git, Vim

以下整理错误或者缺少的部分欢迎指正!!!

Pyspark_Pandas_12">大数据处理常用:Pyspark, Pandas

性能对比

PysparkPandas
运行环境分布式计算集群(Hadoop/Apache Spark集群)单个计算机
数据规模亿级大规模百万级小规模
优势分布式计算->并行处理,处理速度快API简单->数据处理简单
延迟机制lazy execution, 执行动作之前不执行任务eager execution, 任务立即被执行
内存缓存persist()/cache()将转换的RDDs保存在内存单机缓存
DataFrame可变性不可变,修改则返回一个新的DataFrame可变
可扩展性
列名允许重复×

常用语法对比

python"># 头文件
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType  # 或者直接导入*
import pandas as pd# 创建SparkSession对象
spark = SparkSession.builder \.appName("username") \.getOrCreate()# 创建空表
schema = StructType([StructField('id', LongType()),StructField('type', StringType()),])  # spark需要指定列名和类型
spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)
pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2])# 根据现有数据创建
data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)]
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("birth_year", IntegerType(), True)
])
spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"])
spark_df = spark.createDataFrame(data, schema)
pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"])# 读取csv文件
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
pandas_df = pd.read_csv("data.csv", sep="\t")  # read_excel
# 保存数据到csv
spark_df.write.csv('data.csv', header=True)
pandas_df.to_csv("data.csv", index=False)# 读取hive表数据
spark_df = spark.sql('select * from tab')
# 保存数据到hive表
spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name')# 相互转换
spark_df = SQLContext.createDataFrame(pandas_df)
pandas_df = spark_df.toPandas()# 转换数据类型
spark_df = spark_df.withColumn("A", col("age").cast(StringType))
pandas_df["A"] = pandas_df['A'].astype("int")# 重置索引
spark_df = spark_df.withColumn("id", monotonically_increasing_id())  # 生成一个增长的id列
pandas_df.reset_index()# 切片
pandas_df['a':'c']  # a-c三行
pandas_df.iloc[1:3, 0:2]  # 1-2行,0-1列。左闭右开
pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列
pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列# 选择列
spark_df.select('A', 'B')
pandas_df[['A', 'B']]# 删除列
spark_df.drop('A', 'B')
pandas_df.drop(['A', 'B'], axis=1, inplace=True)  # inplace表示是否创建新对象# 新增列,设置列值
spark_df = spark_df.withColumn('name', F.lit(0))
pandas_df['name'] = 0# 修改列值
spark_df.withColumn('name', 1)
pandas_df['name'] = 1
# 使用函数修改列值
spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code))# 修改列名
spark_df.withColumnRenamed('old_name', 'new_name')
pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True)# 显示数据
spark_df.limit(10) # 前10行
spark_df.show/take(10)  # collect()返回全部数据
spark_df/pandas_df.first/head/tail(10)# 表格遍历
saprk_df.collect()[:10]
spark_df.foreach(lambda row: print(row['c1'], row['c2']))
for i, row in pandas_df.iterrows():print(row["c1"], row["c2"])# 排序
spark/pandas_df.sort()  # 按列值排序
pandas_df.sort_index()  # 按轴排序
pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True)  # 指定列升序/降序排序# 过滤
spark_df.filter(df['col_name'] > 1)     # spark_df.where(df['col_name'] > 1)
pandas_df[pandas_df['col_name'] > 1]
pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)]# 去重
spark_df.select('col_name').distinct()
spark_df_filter = spark_df.drop_duplicates(["col_name"])
pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True)# 缺失数据处理
spark_df.na.fill()
spark_df.na.drop(subset=['A', "B"])  # 同dropna
pandas_df.fillna()
pandas_df.dropna(subset=['A', "B"], how="any", inplace=True)# 空值过滤 filter=choose
spark_df.filter(~(F.isnull(spark_df.d)))
spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull()))   # 选出列值不为空的行  isnan()=isNull()<->isNOtnan()
pandas_df[pandas_df['A'].isna()]  # 选出列值为空的行
pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行# 统计
spark/pandas_df.count()  # spark返回总行数,pandas返回列非空总数
spark/pandas_df.describe() # 描述列的count, mean, min, max...# 计算某一列均值
average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0]
average_value = pandas_df["col_name"].mean()# 表合并
# 按行合并,相当于追加
spark_df = spark_df.unionAll(spark_df1)
pandas_df = pd.concat([df_up, df_down], axis=0)
# 按列合并
spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id)  # df1.id==spark_df.id也可写成['id](当且仅当列名相同)
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")  # 聚合函数
spark_df_collect = spark_df.groupBy('number').agg(F.collect_set('province').alias('set_province'),F.first('city').alias('set_city'),F.collect_list('district').alias('set_district'),F.max('report_user').alias('set_report_user'),F.min('first_type').alias('set_first_type'))
# 分组聚合
spark_df.groupBy('A').agg(F.avg('B'), F.min('B'))
spark/pandas_df.groupby('A').avg('B')# 根据函数分组聚合
def func(x):return pd.DataFrame({"A": x["A"].tolist()[0],"B": sum(x["B"])}, index=[0])
pandas_df_result = pandas_df.groupby(["A"]).apply(func)# spark udf函数和pandas apply函数
def func1(a, b):return a + b
spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b))  # spark_df['a']或F.col("a")))
def func2(x,y):return 1 if x > np.mean(y) else 0
pandas_df['A'].apply(func2, args=(pandas_df['B'],))
pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1)# spark创建临时表
spark_df.createOrReplaceTempView('tmp_table')  # 用sql API
res1 = spark.sql('select * from tmp_table')
spark_df.registerTempTable('tmp_table') # 用dataframe API
res2 = spark.table('tmp_table') 

其他常用设置

python">class SparkUtils:def __init__(self):self.spark = Nonedef get_spark(self):if self.spark is None:self.spark = SparkSession.builder.appName("username") \.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \.config("spark.sql.broadcastTimeout", "3600") \.config("spark.driver.memory", "200g") \.config("spark.executor.memory", "40g") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.getOrCreate()self.spark.sql('SET hive.exec.dynamic.partition=true')self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict')return self.sparkspark = SparkUtils()# 生成dataframe
spark_data = spark.sql("""select id, usernamefrom tab1where status in (1, 2, 3)and dt = '{}'""".format(date))# pandas常用显示设置
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

http://www.ppmy.cn/news/1425711.html

相关文章

css设置文字撑满盒子

效果如上&#xff1a; <div style"width: 250px;background-color:red;text-align-last:justify;word-break: keep-all;">为中国崛起而读书</div>

整除分块(上下取整)

参考&#xff1a; 整除分块 - 知乎 董晓算法 G33 整除分块&#xff08;数论分块&#xff09; 图都是摘的上面的。 整除分块 整除分块是数论中的一个知识点。一个整除式子在分母不固定的时候&#xff0c;得到的结果也有可能不同&#xff0c;但是因为是整除&#xff0c;所以…

【C++打怪之路】-- C++开篇

&#x1f308; 个人主页&#xff1a;白子寰 &#x1f525; 分类专栏&#xff1a;C打怪之路&#xff0c;python从入门到精通&#xff0c;魔法指针&#xff0c;进阶C&#xff0c;C语言&#xff0c;C语言题集&#xff0c;C语言实现游戏&#x1f448; 希望得到您的订阅和支持~ &…

关于ERA5气压和温度垂直补偿公式的对比情况

1. 气压和温度垂直补偿对比 「谨代表给个人观点&#xff0c;杠精请自测&#xff0c;对对对&#xff0c;好好好&#xff0c;你说啥都对」。 使用2020-2022陆态网GNSS与探空站并址的48个站点实验&#xff0c;以探空站为真值&#xff0c;验证ERA5精度。怎么确定并址请看前面文章…

我们一起看看《看漫画学C++》中如何讲解对象的动态创建与销毁

《看漫画学C》这本书中会用图文的方式生动地解释对象的动态创建与销毁。在C中&#xff0c;动态创建对象是通过new运算符来实现的&#xff0c;而销毁对象则是通过delete运算符来完成的。这种方式可以让程序在需要时分配内存给对象&#xff0c;并在对象不再需要时释放内存&#x…

「51媒体」媒体邀约新闻稿件发布应该如何筛选媒体?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 在媒体邀约新闻稿件发布的过程中&#xff0c;筛选媒体是一个至关重要的环节。我们需要考虑以下因素&#xff1a; 目标受众匹配度&#xff1a;首先&#xff0c;需要明确新闻稿件的目标受众…

ChatGPT API和llm的GUI。支持代理,基于文件的QA, GPT微调和web搜索查询

ChatGPT API和llm的GUI。支持代理&#xff0c;基于文件的QA, GPT微调和web搜索查询 文章目录 ChatGPT API和llm的GUI。支持代理&#xff0c;基于文件的QA, GPT微调和web搜索查询一、 强力功能二、 System Prompt三、 基础对话四、 对话历史五、 小而美的体验六、 极客功能七、 …

buuctf——[ZJCTF 2019]NiZhuanSiWei

buuctf——[ZJCTF 2019]NiZhuanSiWei 1.绕过file_get_contents()函数 file_get_contents函数介绍 定义和用法 file_get_contents() 把整个文件读入一个字符串中。 该函数是用于把文件的内容读入到一个字符串中的首选方法。如果服务器操作系统支持&#xff0c;还会使用内存映射…

网站备案期间怎么关闭首页显示无法访问-文章及其它页面正常访问

自从做了开发者之后才发现每个人博主的需求都是不同的&#xff0c;的的确确颠覆了我的观点&#xff0c;无论是页面布局还是SEO相关的设置&#xff0c;可能是因为站点属性不同所以需求不同&#xff0c;慢慢的就会在主题加入一些自定接口来满足不同人的需求&#xff0c;有人需要P…

软考136-上午题-【软件工程】-风险管理

一、风险管理 般认为软件风险包含两个特性&#xff1a;不确定性、损失。不确定性是指风险可能发生也可能不发生&#xff1b;损失是指如果风险发生&#xff0c;就会产生恶性后果。 在进行风险分析时&#xff0c;重要的是量化每个风险的不确定程度和损失程度。为了实现这一点&a…

麦克斯韦方程简单理解波粒二象性粒子退相干(Quantum Decoherence):微观的动态,宏观的静态量子

目录 麦克斯韦方程简单理解 波粒二象性 粒子退相干(Quantum Decoherence):微观的动态,宏观的静

MyBatis 框架学习(I)

MyBatis 框架学习(I) 文章目录 MyBatis 框架学习(I)1. 介绍2. 准备&测试3. MyBatis 注解基础操作3.1 日志输出3.2 Insert 操作3.3 Delete 操作3.4 Update 操作3.5 Select 操作 总结 1. 介绍 之前我们学习过利用JDBC操作数据库进行项目开发&#xff0c;但我们发现它操作起来…

Flutter 插件站新升级: 加入优秀 GitHub 开源项目

Flutter 插件站新升级: 加入优秀 GitHub 开源项目 视频 https://youtu.be/qa49W6FaDGs https://www.bilibili.com/video/BV1L1421o7fV/ 前言 原文 https://ducafecat.com/blog/flutter-awesome-github-repo-download 这几天晚上抽空把 Flutter 插件站升级&#xff0c;现在支…

Java语言中字符串处理最常见的操作以及注意事项

0. 前言 由于最近线上连续出现跟字符串处理相关的故障&#xff0c;实属不应该。也从这些故障中看到大家对常见的字符串处理API有一些淡忘&#xff0c;希望通过收集整体并总结常见的处理方法&#xff0c;大家温故而知新。 1. 创建和初始化&#xff1a; a. 使用双引号直接创建…

(四)相关性分析 学习简要笔记 #统计学 #CDA学习打卡

目录 一. 相关性分析简介 二. 相关性分析方法 1&#xff09;连续型变量vs连续型变量&#xff1a;Pearson/Spearman &#xff08;a&#xff09;Pearson &#xff08;b&#xff09;Spearman等级相关系数 2&#xff09;二分类变量&#xff08;自然&#xff09;vs连续型变量&…

计算机网络---第十一天

生成树协议 stp作用&#xff1a; 作用&#xff1a;stp用于解决二层环路问题。 BPDU&#xff1a; 含义&#xff1a;桥协议数据单元&#xff0c;用于传递stp协议相关报文 分类&#xff1a;配置bpdu---用于传递stp的配置信息 tcn bpdu---用于通告拓扑变更信息 包含信息&…

解决方案:用决策树算法如何生成决策树图及生成SQL规则

文章目录 一、现象二、解决方案 一、现象 一开始没有做过生成决策树图及生成SQL规则&#xff0c;一时犯了难&#xff0c;百度很多找到了解决方法&#xff0c;于是乎&#xff0c;写一篇博客&#xff0c;这样下次就能直接拿来使用咯 二、解决方案 见最后三块代码&#xff0c;为…

C++ 类和对象(终篇)

初始化列表 就是给我们每一个成员变量找了一个定义的位置&#xff0c;不然像const这样的成员不好处理 所有的成员能在初始化列表初始化的都在里面初始化 拷贝构造函数和构造函数都允许初始化 构造函数体中的语句只能将其称作为赋初值&#xff0c;而不能称作初始化。 因为初始…

[大模型]Qwen-7B-hat Transformers 部署调用

Qwen-7B-hat Transformers 部署调用 环境准备 在autodl平台中租一个3090等24G显存的显卡机器&#xff0c;如下图所示镜像选择PyTorch–>2.0.0–>3.8(ubuntu20.04)–>11.8 接下来打开刚刚租用服务器的JupyterLab&#xff0c;并且打开其中的终端开始环境配置、模型下…

ubuntu-22.04 tenda U9安装wifi驱动

背景 前面写过《ubuntu-18.04 tenda U9安装wifi驱动》&#xff0c;当然20.04也是支持的。 但是当系统升级为22.04.1之后&#xff0c;之前的已经不能用了&#xff0c;22.04.1的内核版本是6.5.0-26-generic&#xff0c;github上的代码(https://github.com/brektrou/rtl8821CU.g…