SparkSQL函数综合实践

embedded/2025/1/22 7:04:00/

文章目录

  • 1. 实战概述
  • 2. 实战步骤
    • 2.1 创建项目
    • 2.2 添加依赖
    • 2.3 设置源目录
    • 2.4 创建日志属性文件
    • 2.5 创建hive配置文件
    • 2.6 创建数据分析对象
      • 2.6.1 导入相关类
      • 2.6.2 创建获取Spark会话方法
      • 2.6.3 创建表方法
      • 2.6.4 准备数据文件
      • 2.6.5 创建加载数据方法
      • 2.6.6 创建薪水排行榜方法
      • 2.6.7 创建主方法
      • 2.6.8 查看完整代码
    • 2.7 启动metastore服务
    • 2.8 运行程序,查看结果
    • 2.8 在Spark Shell里运行程序
  • 3. 实战小结

1. 实战概述

  • 通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。通过创建 Hive 表、加载 JSON 数据并使用 Spark SQL 查询每个城市工资最高的前 N 名员工,实现了数据的高效处理与分析。实战涵盖了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数的使用,适用于大数据处理场景。

2. 实战步骤

2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述
  • java目录改成scala目录
    在这里插入图片描述

2.2 添加依赖

  • pom.xml文件里添加相关依赖
    在这里插入图片描述
  • 刷新项目依赖
    在这里插入图片描述

2.3 设置源目录

  • pom.xml里设置源目录
    在这里插入图片描述

2.4 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

2.5 创建hive配置文件

  • resources里创建hive-site.xml文件
    在这里插入图片描述
  • bigdata1云主机上执行命令:$HIVE_HOME/conf/hive-site.xml,拷贝其内容到resources里的hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value></property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value></property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value></property><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value></property><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><property><name>hive.server2.thrift.port</name><value>10000</value></property><property><name>hive.server2.thrift.bind.host</name><value>bigdata1</value></property><property><name>hive.metastore.uris</name><value>thrift://bigdata1:9083</value></property><property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value></property><property><name>hive.metastore.schema.verification</name><value>false</value></property><property><name>hive.server2.active.passive.ha.enable</name><value>true</value></property>
</configuration>

2.6 创建数据分析对象

  • 添加scala-sdk到项目
    在这里插入图片描述

  • 单击【Add to Modules…】菜单项
    在这里插入图片描述

  • 单击【OK】按钮即可

  • 创建net.huawei.sql
    在这里插入图片描述

  • net.huawei.sql包里创建DataAnalysis对象
    在这里插入图片描述

2.6.1 导入相关类

  • 导入三个类:SparkConfSparkSessionDataFrame
    在这里插入图片描述

2.6.2 创建获取Spark会话方法

  • 创建getSparkSession()方法
    在这里插入图片描述
// 获取SparkSession对象                                      
def getSparkSession(): SparkSession = {                  // 创建SparkConf对象                                       val conf = new SparkConf()                             conf.setMaster("local[*]")                             conf.setAppName("DataAnalysis")                        conf.set("dfs.client.use.datanode.hostname", "true")   // 创建SparkSession对象                                    SparkSession.builder()                                 .config(conf)                                        .enableHiveSupport()                                 .getOrCreate()                                       
}                                                                           

2.6.3 创建表方法

  • 创建createTable()方法
    在这里插入图片描述
// 创建表                                                   
def createTable(spark: SparkSession): Unit = {           spark.sql(                                             s"""                                                 |CREATE TABLE IF NOT EXISTS salary_info           |  (city string, name string, salary double)      |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  |""".stripMargin                                  )                                                      
}                                                        

2.6.4 准备数据文件

  • 在项目根目录创建data目录,在里面创建salary.json文件
    在这里插入图片描述
{"city": "北京", "name": "陈燕文", "salary": 5000.0}
{"city": "上海", "name": "李伟强", "salary": 8000.0}
{"city": "广州", "name": "王丽娜", "salary": 5500.0}
{"city": "北京", "name": "赵建国", "salary": 5200.0}
{"city": "上海", "name": "孙志强", "salary": 5300.0}
{"city": "广州", "name": "方云龙", "salary": 6800.0}
{"city": "北京", "name": "周晓峰", "salary": 6400.0}
{"city": "上海", "name": "吴雅婷", "salary": 5100.0}
{"city": "广州", "name": "郑文杰", "salary": 5600.0}
{"city": "上海", "name": "王海涛", "salary": 7500.0}
{"city": "北京", "name": "李雪梅", "salary": 5800.0}
{"city": "广州", "name": "童玉明", "salary": 7800.0}

2.6.5 创建加载数据方法

  • 创建loadData()方法
    在这里插入图片描述
// 加载数据                                                                          
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {val fileDF: DataFrame = spark.read.format("json").load(inputPath)              fileDF.write.insertInto(tableName)                                             
}                                                                                

2.6.6 创建薪水排行榜方法

  • 创建salaryTopN()方法
    在这里插入图片描述
// 查询工资topN                                                                           
def salaryTopN(spark: SparkSession, topN: Int): Unit = {                              spark.sql(                                                                          s"""                                                                              |SELECT                                                                        |  city, name, salary                                                          |FROM                                                                          |  (                                                                           |    SELECT                                                                    |      city, name, salary,                                                     |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num   |    FROM                                                                      |      salary_info                                                             |  ) salary_rank                                                               |WHERE row_num <= $topN                                                        |""".stripMargin                                                               ).show()                                                                            
}                                                                                     
  • 代码说明salaryTopN 方法用于查询每个城市工资最高的前 topN 名员工。通过 row_number() 窗口函数按城市分组并按工资降序排序,生成行号 row_num,然后筛选出行号小于等于 topN 的记录。最终结果展示每个城市工资最高的前 topN 名员工的姓名和工资。

2.6.7 创建主方法

  • 通过 getSparkSession() 获取 SparkSession 实例,使用 createTable()Hive 中创建表,调用 loadData() 加载数据并写入 Hive 表,通过 salaryTopN() 查询每个城市工资最高的前 N 名员工信息,最后释放资源。
    在这里插入图片描述
// 主方法                                                   
def main(args: Array[String]): Unit = {                  // 获取SparkSession对象                                    val spark = getSparkSession()                          // 创建表                                                 createTable(spark)                                     // 加载数据                                                loadData(spark, "data/salary.json", "salary_info")     // 查询工资top3                                            salaryTopN(spark, 3)                                   
}                                                        

2.6.8 查看完整代码

package net.huawei.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 功能:数据分析对象* 作者:华卫* 日期:2025年01月21日*/
object DataAnalysis {// 获取SparkSession对象def getSparkSession(): SparkSession = {// 创建SparkConf对象val conf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("DataAnalysis")conf.set("dfs.client.use.datanode.hostname", "true")// 创建SparkSession对象SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}// 创建表def createTable(spark: SparkSession): Unit = {spark.sql(s"""|CREATE TABLE IF NOT EXISTS salary_info|  (city string, name string, salary double)|  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','|""".stripMargin)}// 加载数据def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {val fileDF: DataFrame = spark.read.format("json").load(inputPath)fileDF.write.insertInto(tableName)}// 查询工资topNdef salaryTopN(spark: SparkSession, topN: Int): Unit = {spark.sql(s"""|SELECT|  city, name, salary|FROM|  (|    SELECT|      city, name, salary,|     row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num|    FROM|      salary_info|  ) salary_rank|WHERE row_num <= $topN|""".stripMargin).show()}// 主方法def main(args: Array[String]): Unit = {// 获取SparkSession对象val spark = getSparkSession()// 创建表createTable(spark)// 加载数据loadData(spark, "data/salary.json", "salary_info")// 查询工资top3salaryTopN(spark, 3)}
}

2.7 启动metastore服务

  • 执行命令:hive --service metastore &
    在这里插入图片描述

2.8 运行程序,查看结果

  • 运行DataAnalysis对象
    在这里插入图片描述
  • hive客户端,查看创建的c
    在这里插入图片描述
  • 查看salary_info表的内容
    在这里插入图片描述
  • 在HDFS上查看salary_info表对应的目录
    在这里插入图片描述
  • 下载文件,查看内容
    在这里插入图片描述

2.8 在Spark Shell里运行程序

  • salary.json上传到HDFS的/data目录
    在这里插入图片描述

  • 在spark shell里执行命令::paste,粘贴代码
    在这里插入图片描述

  • Ctrl + D,查看结果
    在这里插入图片描述

3. 实战小结

  • 本次实战通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。首先,我们创建了 Hive 表并加载了 JSON 数据,随后通过 Spark SQL 查询每个城市工资最高的前 N 名员工。实战中,我们使用了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数等技术,实现了数据的高效处理与分析。通过本次实战,我们掌握了 Spark 和 Hive 的基本操作,并学会了如何在大数据场景下进行数据分析和处理。

http://www.ppmy.cn/embedded/155994.html

相关文章

Android系统定制APP开发_如何对应用进行系统签名

前言 当项目开发需要使用系统级别权限或frame层某些api时&#xff0c;普通应用是无法使用的&#xff0c;需要在AndroidManifest中配置sharedUserId&#xff1a; AndroidManifest.xml中的android:sharedUserId“android.uid.system”&#xff0c;代表的意思是和系统相同的uid&a…

WebSocket知识点笔记(一)

WebSocket ​ WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务端之间的消息传递更加高效&#xff0c;允许服务器主动向客户端推送数据。 一.WebSocket全双工通信 WebSocket提供了真正的双向通信&#xff0c;客户端和服务端可以同时发送和接收消息 …

计算机毕业设计PySpark+Hadoop+Hive机票预测 飞机票航班数据分析可视化大屏 航班预测系统 机票爬虫 飞机票推荐系统 大数据毕业设计

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

被动扫描和主动扫描的区别

在网络安全和漏洞检测中&#xff0c;被动扫描和主动扫描是两种常见的技术&#xff0c;它们在工作方式和应用场景上有显著的区别。 被动扫描 被动扫描是一种在目标无法察觉的情况下进行的信息收集方法。它通过监听网络流量、代理等方式获取数据&#xff0c;而不主动与目标系统…

Python保留字与标识符及常变量

1、保留字 保留字&#xff1b;严格区分大小写 不可以把保留字作为变量、函数、类、模块和其他对象的名称来使用 import keyword print(keyword.kwlist) # 输出所有的保留字 print(len(keyword.kwlist)) # 获取保留字的个数 true 真 # True 真 #属于保留字&#xff0c;会…

微服务知识——4大主流微服务架构方案

文章目录 1、微服务聚合模式2、微服务共享模式3、微服务代理模式4、微服务异步消息模式 微服务是大型架构的必经之路&#xff0c;也是大厂重点考察对象&#xff0c;下面我就重点详解4大主流微服务架构方案。 1、微服务聚合模式 微服务聚合设计模式&#xff0c;解决了如何从多个…

谈谈对分布式事务Seata的理解

最近一个微服务项目中有用到Seata,联调的时候发现了一些数据不回滚的问题&#xff0c;特此记录一下&#xff1a; 1.Seata介绍&#xff1a; Seata 是一个开源的分布式事务解决方案&#xff0c;致力于在微服务架构下提供高性能和易用的分布式事务服务。它由阿里巴巴集团发起并贡…

Unity环境搭建

在Unity中开发环境搭建的步骤如下&#xff1a; 1. 安装Unity 访问 Unity官网&#xff0c;并下载并安装Unity Hub。Unity Hub是一个用于管理Unity安装版本、项目和组件的工具。安装Unity Hub后&#xff0c;打开Unity Hub&#xff0c;登录您的Unity账号或创建一个新账号。在Uni…