【小贪】大数据处理:Pyspark, Pandas对比及常用语法

ops/2024/9/20 1:26:09/ 标签: python, 大数据处理, Pyspark, Pandas

近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:

  • ✅数据库常用:MySQL, Hive SQL, Spark SQL
  • 大数据处理常用:Pyspark, Pandas
  • ⚪ 图像处理常用:OpenCV, matplotlib
  • ⚪ 机器学习常用:SciPy, Sklearn
  • ⚪ 深度学习常用:Pytorch, numpy
  • ⚪ 常用数据结构语法糖:itertools, collections
  • ⚪ 常用命令: Shell, Git, Vim

以下整理错误或者缺少的部分欢迎指正!!!

Pyspark_Pandas_12">大数据处理常用:Pyspark, Pandas

性能对比

PysparkPandas
运行环境分布式计算集群(Hadoop/Apache Spark集群)单个计算机
数据规模亿级大规模百万级小规模
优势分布式计算->并行处理,处理速度快API简单->数据处理简单
延迟机制lazy execution, 执行动作之前不执行任务eager execution, 任务立即被执行
内存缓存persist()/cache()将转换的RDDs保存在内存单机缓存
DataFrame可变性不可变,修改则返回一个新的DataFrame可变
可扩展性
列名允许重复×

常用语法对比

python"># 头文件
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType  # 或者直接导入*
import pandas as pd# 创建SparkSession对象
spark = SparkSession.builder \.appName("username") \.getOrCreate()# 创建空表
schema = StructType([StructField('id', LongType()),StructField('type', StringType()),])  # spark需要指定列名和类型
spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)
pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2])# 根据现有数据创建
data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)]
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("birth_year", IntegerType(), True)
])
spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"])
spark_df = spark.createDataFrame(data, schema)
pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"])# 读取csv文件
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
pandas_df = pd.read_csv("data.csv", sep="\t")  # read_excel
# 保存数据到csv
spark_df.write.csv('data.csv', header=True)
pandas_df.to_csv("data.csv", index=False)# 读取hive表数据
spark_df = spark.sql('select * from tab')
# 保存数据到hive表
spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name')# 相互转换
spark_df = SQLContext.createDataFrame(pandas_df)
pandas_df = spark_df.toPandas()# 转换数据类型
spark_df = spark_df.withColumn("A", col("age").cast(StringType))
pandas_df["A"] = pandas_df['A'].astype("int")# 重置索引
spark_df = spark_df.withColumn("id", monotonically_increasing_id())  # 生成一个增长的id列
pandas_df.reset_index()# 切片
pandas_df['a':'c']  # a-c三行
pandas_df.iloc[1:3, 0:2]  # 1-2行,0-1列。左闭右开
pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列
pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列# 选择列
spark_df.select('A', 'B')
pandas_df[['A', 'B']]# 删除列
spark_df.drop('A', 'B')
pandas_df.drop(['A', 'B'], axis=1, inplace=True)  # inplace表示是否创建新对象# 新增列,设置列值
spark_df = spark_df.withColumn('name', F.lit(0))
pandas_df['name'] = 0# 修改列值
spark_df.withColumn('name', 1)
pandas_df['name'] = 1
# 使用函数修改列值
spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code))# 修改列名
spark_df.withColumnRenamed('old_name', 'new_name')
pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True)# 显示数据
spark_df.limit(10) # 前10行
spark_df.show/take(10)  # collect()返回全部数据
spark_df/pandas_df.first/head/tail(10)# 表格遍历
saprk_df.collect()[:10]
spark_df.foreach(lambda row: print(row['c1'], row['c2']))
for i, row in pandas_df.iterrows():print(row["c1"], row["c2"])# 排序
spark/pandas_df.sort()  # 按列值排序
pandas_df.sort_index()  # 按轴排序
pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True)  # 指定列升序/降序排序# 过滤
spark_df.filter(df['col_name'] > 1)     # spark_df.where(df['col_name'] > 1)
pandas_df[pandas_df['col_name'] > 1]
pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)]# 去重
spark_df.select('col_name').distinct()
spark_df_filter = spark_df.drop_duplicates(["col_name"])
pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True)# 缺失数据处理
spark_df.na.fill()
spark_df.na.drop(subset=['A', "B"])  # 同dropna
pandas_df.fillna()
pandas_df.dropna(subset=['A', "B"], how="any", inplace=True)# 空值过滤 filter=choose
spark_df.filter(~(F.isnull(spark_df.d)))
spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull()))   # 选出列值不为空的行  isnan()=isNull()<->isNOtnan()
pandas_df[pandas_df['A'].isna()]  # 选出列值为空的行
pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行# 统计
spark/pandas_df.count()  # spark返回总行数,pandas返回列非空总数
spark/pandas_df.describe() # 描述列的count, mean, min, max...# 计算某一列均值
average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0]
average_value = pandas_df["col_name"].mean()# 表合并
# 按行合并,相当于追加
spark_df = spark_df.unionAll(spark_df1)
pandas_df = pd.concat([df_up, df_down], axis=0)
# 按列合并
spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id)  # df1.id==spark_df.id也可写成['id](当且仅当列名相同)
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")  # 聚合函数
spark_df_collect = spark_df.groupBy('number').agg(F.collect_set('province').alias('set_province'),F.first('city').alias('set_city'),F.collect_list('district').alias('set_district'),F.max('report_user').alias('set_report_user'),F.min('first_type').alias('set_first_type'))
# 分组聚合
spark_df.groupBy('A').agg(F.avg('B'), F.min('B'))
spark/pandas_df.groupby('A').avg('B')# 根据函数分组聚合
def func(x):return pd.DataFrame({"A": x["A"].tolist()[0],"B": sum(x["B"])}, index=[0])
pandas_df_result = pandas_df.groupby(["A"]).apply(func)# spark udf函数和pandas apply函数
def func1(a, b):return a + b
spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b))  # spark_df['a']或F.col("a")))
def func2(x,y):return 1 if x > np.mean(y) else 0
pandas_df['A'].apply(func2, args=(pandas_df['B'],))
pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1)# spark创建临时表
spark_df.createOrReplaceTempView('tmp_table')  # 用sql API
res1 = spark.sql('select * from tmp_table')
spark_df.registerTempTable('tmp_table') # 用dataframe API
res2 = spark.table('tmp_table') 

其他常用设置

python">class SparkUtils:def __init__(self):self.spark = Nonedef get_spark(self):if self.spark is None:self.spark = SparkSession.builder.appName("username") \.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \.config("spark.sql.broadcastTimeout", "3600") \.config("spark.driver.memory", "200g") \.config("spark.executor.memory", "40g") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.getOrCreate()self.spark.sql('SET hive.exec.dynamic.partition=true')self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict')return self.sparkspark = SparkUtils()# 生成dataframe
spark_data = spark.sql("""select id, usernamefrom tab1where status in (1, 2, 3)and dt = '{}'""".format(date))# pandas常用显示设置
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

http://www.ppmy.cn/ops/3200.html

相关文章

Spring Boot 经典面试题(五)

1.Spring Boot的事务管理是如何实现的&#xff1f; Spring Boot 使用 Spring Framework 中的事务管理功能来实现事务管理。Spring Framework 提供了几种不同的事务管理方式&#xff0c;其中最常用的是基于注解的声明式事务管理。 在 Spring Boot 中&#xff0c;你可以通过 Tr…

【行为型模式】策略模式

一、策略模式概述 策略模式(又叫政策Policy模式)&#xff0c;属于对象行为模式下的&#xff1a;Strategy类提供了可插入式(Pluggable)算法的实现方案。 策略模式的定义-意图&#xff1a;定义一系列算法&#xff0c;将每一个算法封装起来&#xff0c;并让它们互相替换。策略模式…

Flume在大数据集群下的配置以及监控工具Ganglia的部署安装

前提&#xff1a;需要有三台虚拟机&#xff08;hadoop102,103,104&#xff09;配置好相关基础环境 安装 将安装包上传到/opt/software中 tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/修改 apache-flume-1.9.0-bin 的名称为 flume mv /opt/module/…

动态规划(Dynamic Programming,简称 DP)

动态规划(Dynamic Programming,简称 DP)是一种在数学、计算机科学和经济学中使用的,通过把原问题分解为相对简单的子问题的方式来求解复杂问题的方法。动态规划常常适用于有重叠子问题和最优子结构性质的问题。通过保存和重用已经解决的子问题的解,来避免重复计算,从而大…

设计模式代码实战-责任链模式

1、问题描述 小明所在的公司请假需要在OA系统上发布申请&#xff0c;整个请求流程包括多个处理者&#xff0c;每个处理者负责处理不同范围的请假天数&#xff0c;如果一个处理者不能处理请求&#xff0c;就会将请求传递给下一个处理者&#xff0c;请你实现责任链模式&#xff…

后台管理系统加水印(react)

效果 代码图片 代码 window.waterMark function (config) {var defaultConfig {content: 我是水印,fontSize: 16px,opacity: 0.3,rotate: -15,color: #ADADAD,modalId: J_waterMarkModalByXHMAndDHL,};config Object.assign({}, defaultConfig, config);var existMarkModal…

draw.io使用心得

使用draw.io的体验通常是积极且高效的&#xff0c;许多用户发现它对于创建和分享各类图表非常有用。以下是一些使用draw.io的心得&#xff1a; 界面简洁直观&#xff1a;用户通常首先会注意到draw.io界面的简洁性&#xff0c;这使得即使是初次使用者也能快速上手并开始绘制图表…

web server apache tomcat11-09-JNDI Datasource

前言 整理这个官方翻译的系列&#xff0c;原因是网上大部分的 tomcat 版本比较旧&#xff0c;此版本为 v11 最新的版本。 开源项目 从零手写实现 tomcat minicat 别称【嗅虎】心有猛虎&#xff0c;轻嗅蔷薇。 系列文章 web server apache tomcat11-01-官方文档入门介绍 web…

C++:STL-list模拟实现:迭代器的封装

STL-list模拟实现细节 一. 模拟实现的思想细节1.迭代器实现&#xff1a;用类进行封装2.和--的重载3.奇怪的->重载4.const迭代器 二.实现源码 一. 模拟实现的思想细节 1.迭代器实现&#xff1a;用类进行封装 为什么不使用原生指针&#xff1a; ​ 相比于vector和string&am…

中国社科院-新加坡社科大学工商管理博士入学面试主要考核哪些内容?

中国社科院-新加坡社科大学工商管理博士入学面试主要考核哪些内容&#xff1f; 主要侧重申请人的管理经验&#xff0c;及时考察管理学知识也是结合实际管理经验。只有面试&#xff0c;无笔试。&#xff08;中文交流&#xff09; 入学考试是申请本项目的重要环节之一&#xff0c…

90年代女神返港行李失踪 怒斥国泰航空

现年51岁的童爱玲在1993年拍摄电影《火蝴蝶》入行&#xff0c;外形出众的她当年曾与梁朝伟、黎明等男神合作&#xff0c;因而被封为「男神磁石」。虽然童爱玲与台湾富商王敦民结婚诞下一子后&#xff0c;便淡出演艺圈&#xff0c;但她曾在2022年复出拍摄ViuTV剧集《季前赛》&am…

SAP SD学习笔记06 - 受注的据否,受注的理由,简易变更(一括处理)

上文讲了一括处理和Block&#xff08;冻结&#xff09;处理。 SAP SD学习笔记05 - SD中的一括处理&#xff08;集中处理&#xff09;&#xff0c;出荷和请求的冻结&#xff08;替代实现承认功能&#xff09;-CSDN博客 本章继续讲SAP的流程中一些常用的操作。 1&#xff0c;受注…

面经学习(上海旭千实习)

个人总结 难度中等&#xff0c;Java的基础部分有点忘了&#xff0c;还有那个token的实现流程也有点给忘了&#xff0c;项目问的比较少&#xff0c;还是要注重Java基础部分的复习。 1.jdk8和jdk7有什么区别呢&#xff1f; jdk8的新特性主要就是流式编程和lambda&#xff0c;区…

RabbitMQ项目实战(二)

文章目录 项目改造实现步骤 项目改造 以前把任务提交到线程池&#xff0c;然后在线程池提交中编写处理程序的代码&#xff0c;线程池内排队。 如果程序中断了&#xff0c;任务就没了&#xff0c;就丢了。 改造后的流程&#xff1a; 把任务提交改为向队列发送消息写一个专门接…

PyTorch如何保存验证集上效果最好的模型

PyTorch如何保存验证集上效果最好的模型 验证集的作用是在训练过程中监测是否出现过拟合。通常情况下&#xff0c;我们期望验证集的损失函数值在训练过程中首先下降&#xff0c;然后趋于稳定或上升。当损失函数值达到最小值时&#xff0c;表示模型在验证集上的泛化能力最佳&am…

实时数据同步之Maxwell和Canal

文章目录 一、概述1、实时同步工具概述1.1 Maxwell 概述1.2 Canal概述 2、数据同步工作原理2.1 MySQL 主从复制过程2.2 两种工具工作原理 3、MySQL 的 binlog详解3.1 什么是 binlog3.2 binlog 的开启3.3 binlog 的分类设置 4、Maxwell和Canal对比5、环境安装 二、Maxwell 使用1…

notepad++快捷键和宏录制

想要一个删除行的快捷键, 发现没有 有个现成的快捷键 CtrlL : 剪切当前行 , 不是我想要的效果 宏录制很方便,于是就录制一个删除行的宏,设置快捷键为: CtrlD 录制宏: 1, 点击"开始录制" 2, 依次按键: End , Space , Shift Home , Delete , Delete 3, 点击"停止…

Create2024百度AI开发者大会记录

去年2023.3.16日百度文心大模型发布&#xff0c;今天2024.4.16日 代码智能体&#xff1a;思考模型代码解释器 思考模型整合提示&#xff0c;输入给代码解释器 代码解释器出结果&#xff0c;返回给思考模型&#xff0c;然后迭代 智能代码助手 baidu Comate 多模型推理 种子模型…

设计模式(022)行为型之解释器模式

解释器模式是一种行为型设计模式&#xff0c;用于定义一种语言的文法&#xff0c;并且在该语言中解释句子的意义。这种模式通常用于实现编程语言解释器、正则表达式引擎等场景。 在解释器模式中&#xff0c;主要有以下几个角色&#xff1a;① 抽象表达式&#xff08;AbstractEx…

深入探讨虚拟现实中的新型安全威胁:“盗梦攻击”及其防御策略

随着虚拟现实&#xff08;VR&#xff09;技术的飞速发展&#xff0c;用户体验达到了前所未有的沉浸水平&#xff0c;但也暴露在一系列新的安全威胁之下。本文着重介绍了近期出现的一种高度隐秘且影响深远的攻击手段——“盗梦攻击”。这一概念由芝加哥大学的研究人员提出&#…