SparkSQL数据源与数据存储

ops/2025/1/18 7:30:07/

文章目录

  • 1. 大数据分析流程
  • 2. Spark SQL数据源
  • 3. 本地文件系统加载数据
    • 3.1 本地文件系统加载JSON格式数据
      • 3.1.1 概述
      • 3.1.2 案例演示
    • 3.2 本地文件系统加载CSV格式数据
      • 3.2.1 概述
      • 3.2.2 案例演示
    • 3.3 本地文件系统加载TEXT格式数据
      • 3.3.1 概述
      • 3.3.2 案例演示
    • 3.4 本地文件系统加载Parquet格式数据
      • 3.4.1 概述
      • 3.4.2 案例演示
    • 3.5 通用加载文件方式加载各种格式数据
      • 3.5.1 概述
      • 3.5.2 案例演示
  • 4. 大数据存储概述
    • 4.1 数据存储的重要性
    • 4.2 常见的数据持久化外部系统
    • 4.3 大数据计算框架的基石
  • 5. 数据存储核心API使用
    • 5.1 持久化数据到外部文件系统步骤
    • 5.2 将数据帧保存到本地文件
    • 5.3 将数据帧保存到HDFS文件
  • 6. 数据源数据存储小结

1. 大数据分析流程

  • 在互联网产业中大数据生态体系的主要作用就是存储、处理海量数据为企业创造价值、推动社会进步,数据分析流程存在三个主要流程:
    • 计算系统可以加载外部数据源
    • 资源系统可以为计算系统分配运行资源
    • 计算系统数据分析最终结果可以持久化到外部系统
      在这里插入图片描述
  • 通过图片可以得知存储系统才是大数据计算体系中的基石,学习一个计算框架应该先从如何使用当前计算框架加载外部数据源开始。

2. Spark SQL数据源

  • Spark SQL 是 Apache Spark 的模块之一,提供对结构化数据的查询能力。它支持多种数据源,包括 HDFS、S3、Hive、Parquet、JSON 等,允许用户通过 SQL 语句或 DataFrame API 访问和处理数据。Spark SQL 的优化器可以自动优化查询计划,提高执行效率。此外,它还支持外部数据源的集成,使得在不同存储系统间进行数据交换和分析变得简单快捷。

SparkSQL_10">2.1 SparkSQL常见数据源

  • Hive 数据仓库
  • MySQL 关系型数据库
  • FileSystem 文件系统:本地文件系统、分布式文件系统
  • 由 RDD 生成 SparkSQL 数据源

SparkSQL_16">2.2 SparkSQL支持的文本格式

数据格式描述
csvCSV(字段与字段之间的分隔符为逗号)
jsonJSON(是一种轻量级的数据交换格式,采用完全独立于编程语言的文本格式来存储和表示数据。
简洁和清晰的层次结构、易于人阅读和编写,同时也易于机器解析和生成)
textText(文本数据,字段与字段之间的分隔符没有限制)
parquetParquet(Parquet是一种面向列存储的文件格式,主要用于Hadoop生态系统。
对数据处理框架、数据模型和编程语言无关)

2.3 加载外部数据源步骤

  • 创建 SparkSession 实例对象
  • 通过 SparkSession 实例对象提供的方法加载外部数据

3. 本地文件系统加载数据

3.1 本地文件系统加载JSON格式数据

3.1.1 概述

  • JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,也易于机器解析和生成。在本地文件系统中加载JSON格式数据时,可以使用DataFrameReaderjson()方法或通过format("json")指定格式。

3.1.2 案例演示

  • 在项目根目录创建data目录
    在这里插入图片描述
  • data里创建users.json文件
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

在这里插入图片描述

  • net.huawei.sql包里创建LoadJSON对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.SparkSession/*** 功能:加载JSON数据* 作者:华卫* 日期:2025年01月17日*/
object LoadJSON {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("LoadJSON") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 使用json()方法加载本地JSON文件val df_json = spark.read.json("data/users.json")// 显示数据df_json.show()// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述

3.2 本地文件系统加载CSV格式数据

3.2.1 概述

  • CSV(Comma-Separated Values)是一种常用的表格数据存储格式,数据以纯文本形式存储,字段间用逗号分隔。加载CSV格式数据时,可以使用DataFrameReadercsv()方法或通过format("csv")指定格式。

3.2.2 案例演示

  • data里创建users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,女,45
童安格,男,26
陈燕文,女,18
王晓明,男,32
张丽华,女,29
刘伟强,男,40
赵静怡,女,22
孙强东,男,35
  • net.huawei.sql包里创建loadCSV对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource.inferSchema
import org.json4s.scalap.scalasig.ClassFileParser.header/*** 功能:加载CSV数据* 作者:华卫* 日期:2025年01月17日*/
object LoadCSV {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("LoadCSV") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 使用csv()方法加载本地CSV文件val df_csv = spark.read.option("header", "true").option("inferSchema", "true").csv("data/users.csv")// 显示数据df_csv.show()// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述

3.3 本地文件系统加载TEXT格式数据

3.3.1 概述

  • TEXT格式数据通常指纯文本文件,每行数据作为一个字符串处理。加载TEXT格式数据时,可以使用DataFrameReadertext()方法或通过format("text")指定格式。

3.3.2 案例演示

  • data里创建users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35
  • net.huawei.sql包里创建LoadText对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.{DataFrame, SparkSession}/*** 功能:加载TEXT数据* 作者:华卫* 日期:2025年01月17日*/
object LoadText {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("LoadTEXT") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 使用text()方法加载本地TEXT文件val df_text = spark.read.text("data/users.txt")// 显示数据df_text.show()// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述

3.4 本地文件系统加载Parquet格式数据

3.4.1 概述

  • Parquet 是一种列式存储格式,广泛用于大数据处理。相比行式存储(如 CSV),Parquet 具有高效压缩、高性能查询和广泛兼容性(支持 Spark、Hive 等)。在 Spark 中,可通过 parquet()format("parquet") 加载 Parquet 文件,适合大规模数据存储与处理。

3.4.2 案例演示

  • 将CSV格式数据转换成Parquet格式数据
  • net.huawei.sql包里创建CSVToParquet对象
    在这里插入图片描述
package net.huawei.sql/*** 功能:将CSV转成Parquet* 作者:华卫* 日期:2025年01月17日*/
import org.apache.spark.sql.SparkSessionobject CSVToParquet {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("CSV To Parquet") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 读取CSV文件val df = spark.read.option("header", true) // 第一行作为列名.option("inferSchema", true) // 自动推断数据类型.csv("data/users.csv") // CSV文件路径// 打印Schema和数据println("===模式===")df.printSchema()println("===数据===")df.show()// 将DataFrame保存为Parquet文件df.write.parquet("data/users.parquet")println("成功生成users.parquet文件~")// 关闭Spark会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述
  • 查看users.parquetParquet文件是二进制格式,无法直接查看,但可以通过Spark或其他工具读取。
    在这里插入图片描述
  • net.huawei.sql包里创建LoadParequet对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.SparkSession/*** 功能:加载Parquet数据* 作者:华卫* 日期:2025年01月17日*/
object LoadParquet {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("LoadParquet") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 使用parquet()方法加载本地Parquet文件val df_parquet = spark.read.parquet("data/users.parquet")// 显示数据df_parquet.show()// 显示数据结构df_parquet.printSchema()// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述

3.5 通用加载文件方式加载各种格式数据

3.5.1 概述

  • 通过DataFrameReaderformat()load()方法,可以灵活地加载不同格式的数据文件。这种方式不仅适用于JSON格式,还可以用于CSV、TEXT、Parquet等其他格式。

3.5.2 案例演示

  • net.huawei.sql包里创建LoadData对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.SparkSession/*** 功能:加载各种格式数据* 作者:华卫* 日期:2025年01月17日*/
object LoadData {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("LoadData") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象//使用format()和load()方法加载本地JSON文件val df_json = spark.read.format("json").load("data/users.json")//使用format()和load()方法加载本地CSV文件val df_csv = spark.read.format("csv").option("header", true).option("inferSchema",true).load("data/users.csv")//使用format()和load()方法加载本地TEXT文件val df_text = spark.read.format("text").load("data/users.txt")//使用format()和load()方法加载本地Parquet文件val df_parquet = spark.read.format("parquet").load("data/users.parquet")// 显示数据println("===显示加载的JSON数据===")df_json.show()println("===显示加载的CSV数据===")df_csv.show()println("===显示加载的TEXT数据===")df_text.show()println("===显示加载的Parquet数据===")df_parquet.show()// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述

4. 大数据存储概述

4.1 数据存储的重要性

  • 在大数据生态系统中,存储系统是核心组成部分。无论是数据采集、数据处理,还是数据分析,都离不开高效、可靠的存储系统。存储系统不仅需要保存原始数据,还需要存储经过分析后的有价值的结果,以供各部门使用。

4.2 常见的数据持久化外部系统

  • 文件系统:包括本地文件系统和分布式文件系统(如HDFS)。文件系统适合存储大规模的非结构化或半结构化数据。
  • 关系型数据库:适用于结构化数据的存储和高效查询,常用于事务处理和复杂查询。
  • Hive数据仓库:基于Hadoop的数据仓库工具,适合大规模数据的批处理和分析。
  • 其他存储系统:如果以上系统不能满足业务需求,我们可以将DataFrame或DataSet转换为RDD,利用RDD支持的多种外部存储系统。

4.3 大数据计算框架的基石

  • 存储系统是大数据计算框架的基石。一个计算框架首先需要从存储系统中加载数据,形成可处理的数据模型(如DataFrameDataSetRDD)。基于这些数据模型,我们可以进行各种数据分析操作。最终,分析结果需要持久化到外部存储系统,以便各部门使用。

5. 数据存储核心API使用

5.1 持久化数据到外部文件系统步骤

  • 创建SparkSession实例对象
  • 通过SparkSession实例对象提供的方法加载外部数据
  • 数据分析
  • 对数据分析结果进行持久化

5.2 将数据帧保存到本地文件

  • net.huawei.sql包里创建SaveData对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.SparkSession/*** 功能:保存数据到本地文件* 作者:华卫* 日期:2025年01月17日*/
// 声明用户样例类
case class User(name: String, gender: String, age: Long)object SaveData {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("SaveData") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.getOrCreate() // 获取或创建Spark会话对象// 导入隐式转换import spark.implicits._// 基于序列创建数据帧val userDF = Seq(User("陈燕文", "女", 20),User("张小文", "男", 27),User("王丽霞", "女", 18)).toDF()// 显示数据userDF.show()// 保存数据到本地文件userDF.write.mode("overwrite").save("log/users.parquet")println("users.parquet保存成功~")userDF.write.mode("overwrite").csv("log/users.csv")println("users.csv保存成功~")userDF.write.mode("overwrite").json("log/users.json")println("users.json保存成功~")// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述
  • 查看保存在本地的各种格式的数据文件
    在这里插入图片描述

5.3 将数据帧保存到HDFS文件

  • net.huawei.sql包里创建SaveDataToHDFS对象
    在这里插入图片描述
package net.huawei.sqlimport org.apache.spark.sql.SparkSession/*** 功能:保存数据到HDFS* 作者:华卫* 日期:2025年01月17日*/
object SaveDataToHDFS {def main(args: Array[String]): Unit = {// 获取或创建Spark会话对象val spark = SparkSession.builder() // 创建Builder对象.appName("SaveDataToHDFS") // 设置应用程序名称.master("local[*]") // 运行模式:本地运行.config("dfs.client.use.datanode.hostname", "true") // 设置HDFS节点名称.getOrCreate() // 获取或创建Spark会话对象// 导入隐式转换import spark.implicits._// 基于序列创建数据帧val userDF = Seq(User("陈燕文", "女", 20),User("张小文", "男", 27),User("王丽霞", "女", 18)).toDF()// 显示数据userDF.show()// 保存数据到HDFS文件userDF.write.mode("overwrite").json("hdfs://bigdata1:9000/log/users.json")println("hdfs://bigdata1:9000/log/users.json保存成功~")// 关闭会话对象spark.stop()}
}
  • 运行程序,查看结果
    在这里插入图片描述

  • 执行命令:hdfs dfs -ls /log/users.json
    在这里插入图片描述

  • 执行命令:hdfs dfs -cat /log/users.json/*
    在这里插入图片描述

6. 数据源数据存储小结

  • 在大数据生态系统中,数据源数据存储是核心组成部分。Spark SQL 支持多种数据源,包括 HDFS、S3、Hive、Parquet、JSON 等,能够灵活加载和处理结构化数据。通过 DataFrameReaderDataFrameWriter,用户可以轻松地从本地文件系统或分布式文件系统加载数据,并将分析结果持久化到外部存储系统。常见的存储格式如 CSV、JSON、Parquet 各有优势:CSV 适合人类阅读,JSON 灵活易用,而 Parquet 则以高效的列式存储和压缩性能著称。数据存储不仅是数据处理的起点,也是分析结果的归宿,选择合适的存储格式和系统对提升数据处理效率至关重要。

http://www.ppmy.cn/ops/151034.html

相关文章

每打开一个chrome页面都会【自动打开F12开发者模式】,原因是 使用HBuilderX会影响谷歌浏览器的浏览模式

打开 HBuilderX,点击 运行 -> 运行到浏览器 -> 设置web服务器 -> 添加chrome浏览器安装路径 chrome谷歌浏览器插件 B站视频下载助手插件: 参考地址:Chrome插件 - B站下载助手(轻松下载bilibili哔哩哔哩视频&#xff09…

近红外数据预处理和简单分析matlab

近红外的数据,预处理过程很多开源的工具包可以使用,像homer3、NIRS_SPM、NIRS_KIT等等,B站资源也很多,都可以学习。本次主要记录下,分析的一个fNIRS数据。首先,使用NIRS_KIT做了下预处理,然后再…

数据库的DML

1.insert 数据库于表创建成功后,需要向数据库的表中插入数据。在MySQL中可以使用insert语句向数据库已有的表中插入一行或者多行元组数据 基本语法: insert 语句有两种语法形式,分别是insert…values语句和insert…set语句 insert into&l…

剑指offer第2版:树系列(一)

一、p62-重建二叉树 重建二叉树_牛客题霸_牛客网 我们可以通过前序遍历来定位根,然后去中序遍历里面找根,然后他左边的数字就是他的左子树,右边的数字就是他的右子树,然后再转化成子问题 class Solution {public:TreeNode* buil…

css‘s hover VS mobile

.animation {animation: 30s move infinite linear;/* &:hover {animation-play-state: paused;*/ }原本写的好好的,测试说:“移动端点击滚动条,跳转到其他页面后,返回当前页面,滚动条不滚动;可以优化位…

20250117在Ubuntu20.04.6下使用灵思FPGA的刷机工具efinity刷机

20250117在Ubuntu20.04.6下使用灵思FPGA的刷机工具efinity刷机 2025/1/17 18:30 缘起:做Rockchip的项目RK3566/RK3588,由于编译服务器是ubuntu,RK3566/RK3588有Linux/Ubuntu下的刷机工具。 就顺手要了一下易灵思的FPGA的刷机工具,…

Oracle数据库diag目录下 incident、trace等文件详解

1.什么是ADR ADR是Automatic Diagnostic Repository首字母缩写,它是一个数据库外的基于文件的、并且可以通过事件编号检索和分析的存储库。 故障诊断基础设施有助于预防、检测、诊断和解决问题。 特别针对的问题是严重错误,例如由代码错误、元数据损坏…

中电金信:源启AI开发与服务平台:大模型能力服务化,推动企业智能化转型

导语:日前,源启AI开发与服务平台发布了最新版本。在本次升级中,源启AI开发与服务平台在原有机器学习能力的基础上,提升了深度学习建模能力,优化了模型服务能力,新增了大模型开发工具链能力,全面…