SparkSQL数据源与数据存储综合实践

news/2025/1/19 8:44:10/

文章目录

  • 1. 打开项目
  • 2. 查看数据集
    • 2.1 查看JSON格式数据
    • 2.2 查看CSV格式数据
    • 2.3 查看TXT格式数据
  • 3. 添加单元测试依赖
  • 4. 创建数据加载与保存对象
    • 4.1 创建Spark会话对象
    • 4.2 创建加载JSON数据方法
    • 4.3 创建加载CSV数据方法
    • 4.4 创建加载Text数据方法
    • 4.5 创建加载JSON数据扩展方法
    • 4.6 创建加载CSV数据扩展方法
    • 4.7 创建加载Text数据扩展方法
    • 4.8 创建保存文本文件方法
    • 4.9 查看程序完整代码
  • 5. 实战小结

1. 打开项目

2. 查看数据集

2.1 查看JSON格式数据

  • 查看users.json文件
    在这里插入图片描述
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

2.2 查看CSV格式数据

  • 查看users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,,45
童安格,,26
陈燕文,,18
王晓明,,32
张丽华,,29
刘伟强,,40
赵静怡,,22
孙强东,,35

2.3 查看TXT格式数据

  • 查看users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35

3. 添加单元测试依赖

  • pom.xml里添加单元测试框架依赖
    在这里插入图片描述
<dependency>                                    <groupId>junit</groupId>                    <artifactId>junit</artifactId>              <version>4.13.2</version>                   
</dependency>                                   
  • 刷新项目依赖
    在这里插入图片描述

4. 创建数据加载与保存对象

  • 创建net.huawei.practice
    在这里插入图片描述
  • practice子包里创建DataLoadAndSave对象
    在这里插入图片描述
  • 创建DataLoadAndSave伴生类
    在这里插入图片描述

4.1 创建Spark会话对象

  • 创建spark常量
    在这里插入图片描述
// 获取或创建Spark会话对象                                  
val spark = SparkSession.builder() // 创建Builder对象  .appName("DataLoadAndSave") // 设置应用程序名称          .master("local[*]") // 运行模式:本地运行                 .getOrCreate() // 获取或创建Spark会话对象                 

4.2 创建加载JSON数据方法

  • 创建loadJSONData()方法
    在这里插入图片描述
// 加载JSON数据方法                                       
def loadJSONData(filePath: String): DataFrame = {   spark.read.json(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadJSONData()方法
    在这里插入图片描述
@Test                                                      
def testLoadJSONData(): Unit = {                           // 加载JSON数据                                              val df = DataLoadAndSave.loadJSONData("data/users.json") // 显示数据                                                  df.show()                                                
}                                                          
  • 运行testLoadJSONData()测试方法,查看结果
    在这里插入图片描述

4.3 创建加载CSV数据方法

  • 创建loadCSVData()方法
    在这里插入图片描述
// 加载CSV数据方法                                           
def loadCSVData(filePath: String): DataFrame = {       spark.read                                           .option("header", "true")                          .option("inferSchema", "true")                     .csv(filePath)                                     
}                                                      
  • 在伴生类里创建单元测试方法testLoadCSVData()方法
    在这里插入图片描述
@Test                                                       
def testLoadCSVData(): Unit = {                             // 加载CSV数据                                                val df = DataLoadAndSave.loadCSVData("data/users.csv")    // 显示数据                                                   df.show()                                                 
}                                                           
  • 运行testLoadCSVData()测试方法,查看结果
    在这里插入图片描述

4.4 创建加载Text数据方法

  • 创建loadTextData()方法
    在这里插入图片描述
// 加载TEXT数据方法                                       
def loadTextData(filePath: String): DataFrame = {   spark.read.text(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadTextData()方法
    在这里插入图片描述
  • 运行testLoadTextData()测试方法,查看结果
    在这里插入图片描述

4.5 创建加载JSON数据扩展方法

  • 创建loadJSONDataExpand()方法
    在这里插入图片描述
// 加载JSON数据扩展方法                                         
def loadJSONDataExpand(filePath: String): DataFrame = { spark.read.format("json").load(filePath)              
}                                                       
  • 在伴生类里创建单元测试方法testLoadJSONDataExpand()方法
    在这里插入图片描述
  • 运行testLoadJSONDataExpand()测试方法,查看结果
    在这里插入图片描述

4.6 创建加载CSV数据扩展方法

  • 创建loadCSVDataExpand()方法
    在这里插入图片描述
// 加载CSV数据扩展方法                                            
def loadCSVDataExpand(filePath: String): DataFrame = {    spark.read.format("csv")                                .option("header", "true")                             .option("inferSchema", "true")                        .load(filePath)                                       
}                                                         
  • 在伴生类里创建单元测试方法testLoadCSVDataExpand()方法
    在这里插入图片描述
  • 运行testLoadCSVDataExpand()测试方法,查看结果
    在这里插入图片描述

4.7 创建加载Text数据扩展方法

  • 创建loadTextDataExpand()方法
    在这里插入图片描述
//  加载TEXT数据扩展方法                                          
def loadTextDataExpand(filePath: String): DataFrame = {   spark.read.format("text").load(filePath)                
}                                                         
  • 在伴生类里创建单元测试方法testLoadTextDataExpand()方法
    在这里插入图片描述
  • 运行testLoadTextDataExpand()测试方法,查看结果
    在这里插入图片描述

4.8 创建保存文本文件方法

  • 创建saveTextFile()方法
    在这里插入图片描述
// 保存数据到文本文件方法                                                   
def saveTextFile(inputPath: String, outputPath: String): Unit = {// 加载文本数据                                                      val df = spark.read.format("text").load(inputPath)             // 保存文本数据                                                      df.write.mode("overwrite").format("text").save(outputPath)     
}                                                                
  • 在伴生类里创建单元测试方法testSaveTextFile()方法
    在这里插入图片描述
  • 运行testSaveTextFile()测试方法,查看结果
    在这里插入图片描述

4.9 查看程序完整代码

package net.huawei.practiceimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test/*** 功能:数据加载与保存* 作者:华卫* 日期:2025年01月18日*/
object DataLoadAndSave {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("DataLoadAndSave") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 加载JSON数据方法def loadJSONData(filePath: String): DataFrame = {spark.read.json(filePath)}// 加载CSV数据方法def loadCSVData(filePath: String): DataFrame = {spark.read.option("header", "true").option("inferSchema", "true").csv(filePath)}// 加载TEXT数据方法def loadTextData(filePath: String): DataFrame = {spark.read.text(filePath)}// 加载JSON数据扩展方法def loadJSONDataExpand(filePath: String): DataFrame = {spark.read.format("json").load(filePath)}// 加载CSV数据扩展方法def loadCSVDataExpand(filePath: String): DataFrame = {spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(filePath)}//  加载TEXT数据扩展方法def loadTextDataExpand(filePath: String): DataFrame = {spark.read.format("text").load(filePath)}// 保存数据到文本文件方法def saveTextFile(inputPath: String, outputPath: String): Unit = {// 加载文本数据val df = spark.read.format("text").load(inputPath)// 保存文本数据df.write.mode("overwrite").format("text").save(outputPath)}
}// 伴生类
class DataLoadAndSave {@Testdef testLoadJSONData(): Unit = {// 加载JSON数据val df = DataLoadAndSave.loadJSONData("data/users.json")// 显示数据df.show()}@Testdef testLoadCSVData(): Unit = {// 加载CSV数据val df = DataLoadAndSave.loadCSVData("data/users.csv")// 显示数据df.show()}@Testdef testLoadTextData(): Unit = {// 加载TEXT数据val df = DataLoadAndSave.loadTextData("data/users.txt")// 显示数据df.show()}@Testdef testLoadJSONDataExpand(): Unit = {// 加载JSON数据val df = DataLoadAndSave.loadJSONDataExpand("data/users.json")// 显示数据df.show()}@Testdef testLoadCSVDataExpand(): Unit = {// 加载CSV数据val df = DataLoadAndSave.loadCSVDataExpand("data/users.csv")// 显示数据df.show()}@Testdef testLoadTextDataExpand(): Unit = {// 加载TEXT数据val df = DataLoadAndSave.loadTextDataExpand("data/users.txt")// 显示数据df.show()}@Testdef testSaveTextFile(): Unit = {// 保存数据到文本文件DataLoadAndSave.saveTextFile("data/users.txt", "result/users")}
}

5. 实战小结

  • 在本次实战中,我们通过SparkSQLDataSource项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()loadCSVData()loadTextData(),分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()方法灵活加载数据,并实现了数据保存功能,如saveTextFile()方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。

http://www.ppmy.cn/news/1564360.html

相关文章

第五章:VRRP和HSRP的网关冗余配置与管理

一、HRSP 1、简介 在骨干网的设备连接中&#xff0c;单一的设备容易出现故障造成网络的中断&#xff0c;可靠性较差&#xff0c;如图所示&#xff0c;如果核心交换机出现问题&#xff0c;不能正常工作&#xff0c;会影响整个网络的通信&#xff0c;因为整个网络的数据转发是通…

软件测试 —— Selenium(等待)

软件测试 —— Selenium&#xff08;等待&#xff09; 一个例子强制等待使用示例&#xff1a;为什么不推荐使用强制等待&#xff1f;更好的选择 隐式等待 implicitly_wait&#xff08;&#xff09;隐式等待和强制等待的区别隐式等待&#xff08;Implicit Wait&#xff09;强制等…

Jenkins-基于Role的鉴权机制

jenkins自带了一些全局性的安全配置。 但无法通过job等相对细粒度的来控制使用者的权限。但它可以借助相关的插件实现细颗粒的权限控制。 插件&#xff1a; Role-based Authorization Strategy 需要在configure global security中配置授权策略如下&#xff1a; 保存后&#x…

游戏引擎学习第81天

仓库:https://gitee.com/mrxiao_com/2d_game 或许我们应该尝试在地面上添加一些绘图 在这段时间的工作中&#xff0c;讨论了如何改进地面渲染的问题。虽然之前并没有专注于渲染部分&#xff0c;因为当时主要的工作重心不在这里&#xff0c;但在实现过程中&#xff0c;发现地面…

疫苗预约小程序ssm+论文源码调试讲解

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的&#xff0c;在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值&#xff0c;吸引更多的访问者访问系统&#xff0c;以及让来访用户可以花费更多时间停留在系统上&#xff0c;则表明该系统设计得比较专…

Spring Boot经典面试题及答案

一、Spring Boot基础知识 什么是Spring Boot&#xff1f; 答案&#xff1a; Spring Boot是Spring开源组织下的子项目&#xff0c;是Spring组件一站式解决方案。它简化了Spring应用程序的初始化和开发过程&#xff0c;通过“约定大于配置”的原则&#xff0c;减少了手动配置的繁…

leetcode300.最长递增子序列

给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] 的子序列。 示例 1&…

ReaderLM v2:HTML 转 Markdown 和 JSON 的前沿小型语言模型

2024 年 4 月&#xff0c;我们发布了 Jina Reader(https://jina.ai/reader)&#xff0c;这是一个非常实用的 API&#xff0c;用户只需在 URL 前添加 r.jina.ai 前缀&#xff0c;就能将任何网页转换为大模型友好的 Markdown。紧接着&#xff0c;在同年 9 月&#xff0c;我们又发…