Spark SQL DSL

embedded/2024/10/30 19:43:04/

1、 Spark sql   -- 代替hive的(并非完全代替)  

(1) Spark sql 和 hive 区别 :

     两者都是写sql的,区别是计算引擎不一样  

 hive        -- 计算引擎是MapReduce ,是通过MR做计算的

 Spark sql   -- 计算引擎是Saprk Core,是通过Spark Core做计算的

     Spark sql 功能比 hive 强大 :   并非只能写sql

 hive只能在shell行写sql

 spark可以在代码中写sql  

(2) Spark sql结构 :

1、 Data Source API(读数据) :   可以读取 csv(文本文件)、 json、 jdbc 等各种各样的数据做处理

2、 Data Frame API(提供了两个API):

        Dataframe DSL      -- 写代码      (DSL :  类SQL语法,与SQL差不多,但它是代码)

    Spark SQL and HQL  -- 写SQL

(3) DataFrame :   数据框(二维的表结构,类似hive的一张表)

    写SQL的前提 :  有表

DataFrame 是基于 RDD 做了封装, 在上面提供了 列名和列类型 的概念,即表的结构的概念。

          可以基于 DataFrame 去写 SQL 。

2、 写Spark SQL :   

spark sql中, shuffle之后分区数不是由前面的RDD决定的,而是有默认值, 默认200个。 可以指定参数修改。

    (1) 导入Spark SQL依赖         -- 在Spark项目的pom文件中加入

<!--  Spark sql核心依赖  -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.4.5</version>

        </dependency>

(2) 在Spark项目下创建sql包    -- 新的模块一定要新建新的包  

*项目名称一定要小写,多个单词之间用-分割 : s1-v1_1.2   

*包名也要小写,一般是公司域名倒写 : com.shujia.spark

(3) 创建Spark sql环境 :

    val spark: SparkSession = SparkSession

.builder()    // 构建

.appName("wordCount")

.master("local")

// 设置 sparkSQL 在 shuffle 之后 DF 的分区数,默认是200

            .config("spark.sql.shuffle.partitions", 1)

.getOrCreate()   // 当前环境有SparkSession就获取, 反之则创建

(4) 返回值不再是 RDD,  而是 DataFrame (DF)

    查看数据不再是 foreach(),  而是 show()

(5) 针对于sql语句有多行的情况, 可以使用 """ """ 格式书写

val wordCountDF = spark.sql(

"""

|select word,count(1) as c from (

|select explode(split(line,',')) word from lines

|) as d

|group by word

|""".stripMargin)       // stripMargin :  删除"|"  并合并以上sql语句

(6) 创建 DataFrame 的方式:

 1、  读取 csv 格式的数据创建 DF

    val studentDF: DataFrame = spark

    .read

  .format("csv")

  .option("sep", ",")     //列的分割方式

  .schema("id STRING, name STRING, age INT, gender STRING, clazz STRING")  // 指定字段名和字段类型, 必须按照数据顺序指定

  .load("data/students.txt")     //指定读取的路径

 2、  读取 json 格式的数据构建 DF

      (spark 会自动解析json格式)

val studentJsonDF: DataFrame = spark

  .read

  .format("json")

  .load("data/a.json")

         3、  读取 jdbc 数据构建 DF

              (通过网络远程读取 mysql 中的数据,  需要添加mysql依赖)

  

    val jdbcDF: DataFrame = spark.read

  .format("jdbc")

  .option("url", "jdbc:mysql://master:3306")

  .option("dbtable", "bigdata.students")

  .option("user", "root")

  .option("password", "123456")

  .load()

4、  读取 parquet 格式的数据构建 DF

             (parquet格式的数据中自带 列名 和 列类型,

             parquet会对数据进行压缩, 体积变小, 解压和压缩需要时间)

// 保存一个parquet格式的文件

studentDF

  .write

  .format("parquet")

  .mode(SaveMode.Overwrite)

  .save("data/parquet")

// 读取parquet格式的数据

val parquetDF: DataFrame = spark

  .read

  .format("parquet")

  .load("data/parquet")

3、 DSL语法   -- 类sql语法

    // spark sql 中必须要导入隐式转换, 才可以使用 $方法 获取列对象

import spark.implicits._

//导入 DSL 所有的函数

    import org.apache.spark.sql.functions._       

(1) show   :   查看前面20条数据,  相当于action算子

                   action算子  -- 每一个Action算子都会触发一个job

(2) select :   选择字段,  和 sql 中 select 是一样

(3) $ : 是一个方法,作用是通过列名获取列的对象

studentDF.select($"id", $"age" + 2 as "age").show()

(4) where :  过滤数据

    = : 赋值    == : 判断    === : 等于

(5) group by :   分组

(6) agg :   分组之后进行聚合计算

            只能在分组后使用, 即一般跟在group函数后面

studentDF

  .groupBy($"clazz")

  // 分组之后做聚合计算   -- 可以写多个

  .agg(count($"clazz") as "c", avg($"age") as "avgAge")

  .show()

(7) join :   表关联

(8) 开窗函数     --  统计每个班级总分前2的学生   

    withColumn  :   给 DF 增加新的列

joinDF

  // 按照 id 和 班级 分组

  .groupBy($"id", $"clazz")

  // 对分数求和

  .agg(sum($"sco") as "sumSco")

  // 使用开窗函数                  -- row_number() over (partition by clazz order by sumSco desc)

//    .select($"id", $"clazz", $"sumSco", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc) as "r")

  // 在前面 DF 的基础上增加列 ( 上面的简写, 省去写 $"id", $"clazz", $"sumSco" 步骤, 直接将 "r" 加在 "sumSco" 后面  )

  .withColumn("r", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))

  // 取 班级前2

  .where($"r" <= 2).show()

(9) orderBy :  排序

DSL 语法 与 SQL 的异同 :

1、 DSL 和 SQL 功能相同, 但写法不同, 代码更简洁

    2、 DSL 不需要做 子查询


http://www.ppmy.cn/embedded/133685.html

相关文章

云腾五洲的智联引擎是什么?

智联引擎是成都云腾五洲科技有限公司旗下的数智化转型服务平台&#xff0c;它提供云边协同的分布式物联网平台引擎服务。这一平台以其强大的功能和灵活性&#xff0c;为全行业提供数智化转型的新动力&#xff0c;帮助企业在数智化升级中实现持续增长。 核心能力 智联引擎的核心…

在面试了些外包以后,我有了些自己的思考

大家好&#xff0c;我是洋子&#xff0c;最近公司在降本增效&#xff0c;需要把外包从北京迁移到陕西的某新一线城市&#xff0c;其实就是变相裁员&#xff0c;减少外包的成本&#xff0c;裁掉现有的员工&#xff0c;重新招聘新人 在整个测试行业&#xff0c;外包测试的比重是…

STM32F103C8T6 IO 操作

1.开启相关时钟 在 STM32 微控制器中&#xff0c;开启 GPIO 端口的时钟是确保 IO 口可以正常工作的第一步。 查找 RCC 寄存器使能时钟 在 STM32 中&#xff0c;时钟控制的寄存器通常位于 RCC (Reset and Clock Control) 模块中。不同的 STM32 系列&#xff08;如 STM32F1、STM…

【Windows电脑通过cmd命令查看电脑电池健康度】

Cmd输入 powercfg /batteryreport打印电池使用报告&#xff0c;打开即可 此处查看FULL CHARGE CAPACITY&#xff08;充满电容量&#xff09;

研二了,该想想做啥呢?

写于22年10月24日&#xff0c;之前删了&#xff0c;今天回看&#xff0c;自己当时还挺有意思哈哈哈 一、自我介绍 行秋&#xff0c;男&#xff0c;24岁&#xff0c;电子信息专业&#xff0c;硕士在读。 二、新学期目标 1、好好学习 ① 学好一门语言&#xff0c;学好英语&am…

提升RAG系统的回答质量:PDF解析代码详解-PdfParser核心流程

在上一篇文章中&#xff0c;我们探讨了如何通过计算机视觉大模型提升RAG系统解析PDF文档的能力&#xff0c;并展示了该技术在行业文档识别中的实际应用效果。文章发布后&#xff0c;受到了广泛关注。读者主要关心两个问题&#xff1a;其一&#xff0c;如何在PDF文档识别过程中编…

推荐一款多显示器屏幕亮度调节工具:Twinkle Tray

Twinkle Tray中文版使您可以轻松管理多台显示器的亮度级别。 尽管 Windows 10 能够调节大多数显示器的背光&#xff0c;但它通常不支持外部显示器。 Windows 还缺乏管理多台显示器的亮度的任何功能。 该应用程序将一个新图标插入系统托盘&#xff0c;您可以在其中单击以立即访问…

Linux云计算 |【第五阶段】ARCHITECTURE-DAY5

主要内容&#xff1a; 搭建Zookeeper高可用集群、搭建分布式消息队列kafka、搭建高可用hadoop集群 一、Zookeeper 介绍 Zookeeper是一个开源分布式应用程序协调服务&#xff0c;主要用于解决分布式集群中应用系统的一致性问题。它能提供类似文件系统的目录节点树方式的数据存…