Spark学习(6)-Spark SQL

news/2024/11/16 21:39:23/

1 快速入门

SparkSQL是Spark的一个模块, 用于处理海量结构化数据
SparkSQL是非常成熟的 海量结构化数据处理框架.
学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等。
  • 企业大面积在使用SparkSQL处理业务数据。
    • 离线开发
    • 数仓搭建
    • 科学计算
    • 数据分析

特点:
在这里插入图片描述

2 SparkSQL概述

2.1 SparkSQL和Hive的异同

在这里插入图片描述

2.2 SparkSQL的数据抽象

在这里插入图片描述
在这里插入图片描述

2.3 SparkSession对象

在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象。
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

所以,后续执行环境入口对象,统一变更为SparkSession对象。

在这里插入图片描述
2.4 SparkSession对象

# coding:utf8
# SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建SparkSession对象, 这个对象是 构建器模式 通过builder方法来构建
spark = SparkSession.builder.\
appName("local[*]").\
config("spark.sql.shuffle.partitions", "4").\
getOrCreate()
# appName 设置程序名称, config设置一些常用属性
# 最后通过getOrCreate()方法 创建SparkSession对象

3 DataFrame入门和操作

3.1 DataFrame的组成

在结构层面:

  • StructType对象描述整个DataFrame的表结构。
  • StructField对象描述一个列的信息。

在数据层面:

  • Row对象记录一行数据。
  • Column对象记录一列数据并包含列的信。

StructType描述,如下图:
在这里插入图片描述
一个StructField记录:列名、列类型、列是否运行为空。
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。

3.2 DataFrame的代码构建 - 基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构。

# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

3.3 DataFrame的代码构建 - 基于RDD方式2

通过StructType对象来定义DataFrame的“表结构”转换RDD

# 创建DF , 首先创建RDD 将RDD转DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
df = spark.createDataFrame(rdd, schema)

3.4 DataFrame的代码构建 - 基于RDD方式3

使用RDD的toDF方法转换RDD

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
# 方式1: 只传列名, 类型靠推断, 是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()
# 方式2: 传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()

3.5 DataFrame的代码构建 - 基于Pandas的DataFrame

将Pandas的DataFrame对象,转变为分布式的SparkSQL

# 构建Pandas的DF
pdf = pd.DataFrame({
"id": [1, 2, 3],
"name": ["张大仙", '王晓晓', '王大锤'],
"age": [11, 11, 11]
})
# 将Pandas的DF对象转换成Spark的DF
df = spark.createDataFrame(pdf)

3.6 DataFrame的代码构建 - 读取外部数据

通过SparkSQL的统一API进行数据读取构建DataFrame

sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")

读取text数据源

使用format(“text”)读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value。

schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("../data/sql/people.txt")

读取json数据源

使用format(“json”)读取json数据

df = spark.read.format("json").\
load("../data/sql/people.json")
# JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
df.printSchema()
df.show()

读取csv数据源

使用format(“csv”)读取csv数据

df = spark.read.format("csv")\
.option("sep", ";")\ # 列分隔符
.option("header", False)\ # 是否有CSV标头
.option("encoding", "utf-8")\ # 编码
.schema("name STRING, age INT, job STRING")\ # 指定列名和类型
.load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()

读取parquet数据源

使用format(“parquet”)读取parquet数据

# parquet 自带schema, 直接load啥也不需要了
df = spark.read.format("parquet").\
load("../data/sql/users.parquet")
df.printSchema()
df.show()

注意:
parquet: 是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多, 他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema (列名\ 列类型\ 是否为空)。
  • 存储是以列作为存储格式。
  • 存储是序列化存储在文件中的(有压缩属性体积小)。

Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:
在这里插入图片描述

3.7 DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:DSL风格和SQL风格。

  • DSL语法风格:
    DSL称之为:领域特定语言。其实就是指DataFrame的特有API,DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()
  • SQL语法风格
    SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)

DSL - show 方法

功能:展示DataFrame中的数据, 默认展示20条。
语法:

df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True

DSL - printSchema方法

功能:打印输出df的schema信息
语法:

df.printSchema()

例如:
在这里插入图片描述

DSL - select

功能:选择DataFrame中的指定列(通过传入参数进行指定)
语法:
在这里插入图片描述
可传递:

  • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
    列名来指定列。
  • List[Column]对象或者List[str]对象, 用来选择多个列。

在这里插入图片描述

DSL - filter和where

功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
语法:

df.filter()
df.where()

where和filter功能上是等价的。
在这里插入图片描述

DSL - groupBy 分组

功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
语法:

df.groupBy()

传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark
按照哪个列分组。

在这里插入图片描述

GroupedData对象

GroupedData对象是一个特殊的DataFrame数据集,其类全名:<class 'pyspark.sql.group.GroupedData'>,这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据。
GroupedData对象其实也有很多API,像:min、max、avg、sum、等等许多方法都存在。

SQL风格语法 - 注册DataFrame成为表

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中,使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
在这里插入图片描述

SQL风格语法 - 使用SQL查询

在这里插入图片描述

pyspark.sql.functions 包

PySpark提供了一个包: pyspark.sql.functions,这个包里面提供了 一系列的计算函数供SparkSQL使用。

导包
from pyspark.sql import functions as F

3.8 SparkSQL Shuffle 分区数目

在这里插入图片描述

3.9 SparkSQL 数据清洗API

在大数据处理之前,首先要对数据进行清洗,有去重,删除缺值,填充缺值等等。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.10 DataFrame数据写出

DataFrame数据写出
在这里插入图片描述

3.11 DataFrame 通过JDBC读写数据库(MySQL示例)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


http://www.ppmy.cn/news/38.html

相关文章

http和dubbo接口调用主动设置超时时间

http接口超时方案 方案1&#xff1a;多个resttemplate&#xff0c;不同超时时间的使用不同的template&#xff0c;优点&#xff1a;简单&#xff0c;扩展起来复制粘贴&#xff0c;缺点&#xff1a;代码冗余&#xff0c;多个template占用内存不够优雅 方案2&#xff1a;单个res…

深度学习day01

Marchine leariing 机器学习就是自动找函式 告诉机器要找的函式用 Supervised Learning 函式的Loss ——评价函式的好坏 Reinforcement就是让机器自己下象棋&#xff0c;输赢自己尝试&#xff0c;没像监督学习那样有人为规定 给函式寻找范围&#xff1a; 函式寻找方法——…

基于单RGB相机的全新三维表示方法|NeurIPS 2022

随着深度学习的发展&#xff0c;基于单张RGB图像的人体三维重建取得了持续进展。 但基于现有的表示方法&#xff0c;如参数化模型、体素栅格、三角网格和隐式神经表示&#xff0c;难以构筑兼顾高质量结果和实时速度的系统。 针对上述问题&#xff0c;天津大学团队联合清华大学…

Spire.XLS for Java 12.11.8 Excel to PDF bug Fix

谷歌找破解版Spire.XLS for Java is a professional Java Excel API that enables developers to create, manage, manipulate, convert and print Excel worksheets without using Microsoft Office or Microsoft Excel. Spire.XLS for Java supports both for the old Excel …

《WEB前端框架开发技术》HTML5响应式旅游景区网站——榆林子州HTML+CSS+JavaScript

&#x1f468;‍&#x1f393;学生HTML静态网页基础水平制作&#x1f469;‍&#x1f393;&#xff0c;页面排版干净简洁。使用HTMLCSS页面布局设计,web大学生网页设计作业源码&#xff0c;这是一个不错的旅游网页制作&#xff0c;画面精明&#xff0c;排版整洁&#xff0c;内容…

C++基础知识

目录 C的基本使用 C数据的输入与输出 C使用命令行 具体案例 C生成随机数 关键字 标识符命名规则 数据类型 整形 实型&#xff08;浮点型&#xff09; 浮点型变量分为2种 表示小数的两种方式 案例演示 字符型 案例演示 字符串类型 两种风格 两种风格字符串之间…

机器学习:一文从入门到读懂PCA(主成分分析)

深度学习&#xff1a;PCA白化前置知识内积的几何意义基基变换不同基下的向量变换逆矩阵不同基下的空间变换方差协方差协方差矩阵协方差矩阵对角化特征值分解、空间变换主成分分析&#xff08;PCA&#xff09;两个原则公式推导求解流程代码实现PCA的优缺点优点缺点前置知识 维度…

深入浅出Spring Boot接口

如何优雅的写 Controller 层代码&#xff1f; 后端思想篇&#xff1a;设计好接口的36个锦囊&#xff01; 瞧瞧&#xff0c;人家这后端API接口写得&#xff0c;那叫一个巴适~&#xff0c;再看看我的&#xff0c;像坨屎&#xff01; 实战总结&#xff01;18种接口优化方案的总结 …