spark:数据的关联与合并、缓存和checkpoint

server/2024/10/18 12:16:27/

文章目录

  • 1. 数据的关联与合并
    • 1.1 join关联
      • 1.1.1 内关联
      • 1.1.2 左关联
      • 1.1.3 右关联
    • 1.2 Union合并
  • 2. 缓存和checkpoint

1. 数据的关联与合并

1.1 join关联

students表数据:
在这里插入图片描述

1.1.1 内关联

内关联只返回两个 DataFrame 中在连接键上匹配的行。

# join 关联
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')#join 关联
df_join = df1.join(df2,'id') #默认时内关联
df_join.show()

运行结果:
在这里插入图片描述

1.1.2 左关联

左关联以左 DataFrame 为基础,返回左 DataFrame 的所有行以及在右 DataFrame 中与左 DataFrame 连接键匹配的行。如果右 DataFrame 中没有匹配的行,则相应的列将填充为 null

# join 关联
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')
#左关联
df_left_join = df1.join(df2,'id','left')
df_left_join.show()

运行结果:
在这里插入图片描述

1.1.3 右关联

右关联以右 DataFrame 为基础,返回右 DataFrame 的所有行以及在左 DataFrame 中与右 DataFrame 连接键匹配的行。如果左 DataFrame 中没有匹配的行,则相应的列将填充为 null

# join 关联
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')#右关联
df_right_join = df1.join(df2,'id','right')
df_right_join.show()

运行结果
在这里插入图片描述

1.2 Union合并

在 Spark 中,union用于合并两个或多个相同数据结构的数据集(DataFrame 或 Dataset)。

# union合并  上下行合并要保证字段数量和类型一致
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=True,sep=',')# 合并
df_union = df1.union(df2)
df_union.show(100)df_unionAll = df1.unionAll(df2)  # 和union效果一样
df_unionAll.show(100)# 合并后去重
df_distinct =  df_union.distinct()
df_distinct.show(100)

注意:union合并时,上下行合并要保证字段数量和类型一致。

2. 缓存和checkpoint

# 缓存和checkpoint
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# 指定checkpoint位置
sc = ss.sparkContext
sc.setCheckpointDir('hdfs://node1:8020/df_checpoint')# 读取文件数据转为df
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')# df1数据缓存
df1.persist()# df1数据checkpoint
df1.checkpoint()# df中的缓存和checkpoint不需要触发执行,内部会自动触发

在这里插入图片描述


http://www.ppmy.cn/server/132756.html

相关文章

JavaWeb技术支持的Spring Boot在线考试系统详解

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统,它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等,非常…

探索 ffmpeg-python:Python 中的多媒体处理神器

🎬 探索 ffmpeg-python:Python 中的多媒体处理神器 一、背景介绍 在多媒体处理领域,尤其是视频和音频处理,Python 社区一直缺乏一个强大且易用的库。幸运的是,ffmpeg-python 库的出现填补了这一空白。它是一个 Python…

分享一个IDEA里面的Debug调试设置

1.问题来源 其实我们在这个IDEA里面的这个进行调试的时候,这个是只有步入,出去的选项的; 之前学习这个sort的底层源码的时候,进不去,我们是设置了一个取消java*什么的选项,然后使用这个step into就可以进…

掌握 C# 设计模式:从基础到依赖注入

设计模式是一种可以在开发中重复使用的解决方案,能够提高代码的可维护性、扩展性和复用性。C# 中常见的设计模式包括单例模式、工厂模式、观察者模式、策略模式等。本文将介绍这些常见的设计模式,并探讨 SOLID 原则和依赖注入(Dependency Inj…

[实时计算flink]日志实时入仓快速入门

Flink全托管产品提供丰富强大的日志数据实时入仓能力。本文为您介绍如何在Flink全托管控制台上快速构建一个从Kafka到Hologres的数据同步作业。 背景信息 假设消息队列Kafka实例中有一个名称为users的Topic,其中有100条JSON数据,代表通过日志文件采集工…

艾体宝干货丨网络安全指南:如何使用Profishark和IOTA检测Blast-RADIUS

随着网络安全威胁的不断增加,了解并预防可能的攻击变得至关重要。Blast-RADIUS 是一种严重影响 RADIUS 协议的安全漏洞,能够让攻击者绕过身份验证获取未经授权的访问权限。本篇文章将深入探讨该漏洞的工作原理、检测方法及应对措施,帮助您有效…

SQL Server LocalDB 表数据中文乱码问题

--查看数据库设置 SELECT name, collation_name FROM sys.databases;--出现了The database could not be exclusively locked to perform the operation这个错误, --无法修改字符集为Chinese_PRC_CI_AS;所以需要先设置为单用户模式 ALTER DATABASE MySma…

环境变量(Linux)

文章目录 一、什么是环境变量?二、环境变量的作用1. 方便命令执行:2.配置系统和应用程序:3.用户自定义环境变量: 三、Linux 常见环境变量四、设置环境变量1.临时设置:2.永久设置: 五、环境变量的优先级六、…