SpringBoot操作spark处理hdfs文件

news/2025/1/12 21:05:56/

sparkhdfs_0">SpringBoot操作spark处理hdfs文件

  • 在这里插入图片描述

1、导入依赖

  • <!--        spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.2.2</version></dependency>
    

spark_24">2、配置spark信息

  • 建立一个配置文件,配置spark信息
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//将文件交于spring管理
@Configuration
public class SparkConfig {//使用yml中的配置@Value("${spark.master}")private String sparkMaster;@Value("${spark.appName}")private String sparkAppName;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Beanpublic SparkConf sparkConf() {SparkConf conf = new SparkConf();conf.setMaster(sparkMaster);conf.setAppName(sparkAppName);// 添加HDFS配置conf.set("fs.defaultFS", hdfsPath);conf.set("spark.hadoop.hdfs.user",hdfsUser);return conf;}@Beanpublic SparkSession sparkSession() {return SparkSession.builder().config(sparkConf()).getOrCreate();}
}

3、controller和service

  • controller类

    • import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      import xyz.zzj.traffic_main_code.service.SparkService;@RestController
      @RequestMapping("/spark")
      public class SparkController {@Autowiredprivate SparkService sparkService;@GetMapping("/run")public String runSparkJob() {//读取Hadoop HDFS文件String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";sparkService.executeHadoopSparkJob(filePath);return "Spark job executed successfully!";}
      }
      
  • 处理地铁数据的service

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {private final SparkSession spark;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Autowiredpublic SparkReadHdfsImpl(SparkSession spark) {this.spark = spark;}/*** 读取HDFS上的CSV文件并上传到HDFS* @param filePath*/@Overridepublic void sparkSubway(String filePath) {try {// 设置Hadoop配置JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());Configuration hadoopConf = jsc.hadoopConfiguration();hadoopConf.set("fs.defaultFS", hdfsPath);hadoopConf.set("hadoop.user.name", hdfsUser);// 读取HDFS上的文件Dataset<Row> df = spark.read().option("header", "true") // 指定第一行是列名.option("inferSchema", "true") // 自动推断列的数据类型.csv(filePath);// 显示DataFrame的所有数据
//            df.show(Integer.MAX_VALUE, false);// 对DataFrame进行清洗和转换操作// 检查缺失值df.select("number", "people", "dateTime").na().drop().show();// 对数据进行类型转换Dataset<Row> df2 = df.select(col("number").cast(DataTypes.IntegerType),col("people").cast(DataTypes.IntegerType),to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime"));// 去重Dataset<Row> df3 = df2.dropDuplicates();// 数据过滤,确保people列没有负数Dataset<Row> df4 = df3.filter(col("people").geq(0));
//            df4.show();// 数据聚合,按dateTime分组,统计每天的总客流量Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
//            df6.show();sparkForSubway(df6,"/time_subwayData.csv");//数据聚合,获取每天人数最多的地铁numberDataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));sparkForSubway(df7,"/everyday_max_subwayData.csv");//数据聚合,计算每天的客流强度:每天总people除以632840Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));sparkForSubway(df8,"/everyday_strength_subwayData.csv");} catch (Exception e) {e.printStackTrace();}}private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {// 保存处理后的数据到HDFSdf6.coalesce(1).write().mode("overwrite").option("header", "true").csv("hdfs://192.168.44.128:9000/time_subwayData");// 创建Hadoop配置Configuration conf = new Configuration();// 获取FileSystem实例FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);// 定义临时目录和目标文件路径Path tempDir = new Path("/time_subwayData");FileStatus[] files = fs.listStatus(tempDir);// 检查目标文件是否存在,如果存在则删除Path targetFile1 = new Path(hdfsPath);if (fs.exists(targetFile1)) {fs.delete(targetFile1, true); // true 表示递归删除}for (FileStatus file : files) {if (file.isFile() && file.getPath().getName().startsWith("part-")) {Path targetFile = new Path(hdfsPath);fs.rename(file.getPath(), targetFile);}}// 删除临时目录fs.delete(tempDir, true);}}

4、运行

  • 项目运行完后,打开浏览器
    • spark处理地铁数据
      • http://localhost:8686/spark/dispose
  • 观察sparkhdfs
    • http://192.168.44.128:8099/
    • http://192.168.44.128:9870/explorer.html#/
      • image-20250109095551610

http://www.ppmy.cn/news/1562601.html

相关文章

《PC 上的开源神经网络多模态模型:开启智能交互新时代》

《PC 上的开源神经网络多模态模型&#xff1a;开启智能交互新时代》 一、引言二、多模态模型基础剖析&#xff08;一&#xff09;核心概念解读&#xff08;二&#xff09;技术架构探秘 三、开源多模态模型的独特魅力&#xff08;一&#xff09;开源优势尽显&#xff08;二&…

【面试题】简单聊一下什么是云原生、什么是k8s、容器,容器与虚机相比优势

云原生&#xff08;Cloud Native&#xff09; 定义&#xff1a;云原生是一种构建和运行应用程序的方法&#xff0c;旨在充分利用云计算的优势。它涵盖了一系列技术和理念&#xff0c;包括容器化、微服务架构、自动化部署与管理等。特点&#xff1a;云原生应用程序被设计为可弹性…

C++:字符数组

一、字符数组介绍 数组的元素如果是字符类型&#xff0c;这种数组就是字符数组&#xff0c;字符数组可以是一维数组&#xff0c;可以是二维数组 &#xff08;多维数组&#xff09;。我们接下来主要讨论的是一维的字符数组。 char arr1[5]; //⼀维字符数组 char arr2[3][5];//⼆…

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验六----流域综合处理(超超超详细!!!)

流域综合处理 流域综合治理是根据流域自然和社会经济状况及区域国民经济发展的要求,以流域水流失治理为中心,以提高生态经济效益和社会经济持续发展为目标,以基本农田优化结构和高效利用及植被建设为重点,建立具有水土保持兼高效生态经济功能的半山区流域综合治理模式。数字高程…

Python AI教程之十五:监督学习之决策树(6)高级算法C5.0决策树算法介绍

C5.0决策树算法 C5 算法由 J. Ross Quinlan 创建,是 ID3 决策树方法的扩展。它通过根据信息增益(衡量通过按特定属性进行划分而实现的熵减少量)递归地划分数据来构建决策树。 对于分类问题,C5.0 方法是一种决策树算法。它构建规则集或决策树,这是对 C4.5 方法的改进。根…

【Linux】Linux开发:GDB调试器与Git版本控制工具指南

Linux相关知识点可以通过点击以下链接进行学习一起加油&#xff01;初识指令指令进阶权限管理yum包管理与vim编辑器GCC/G编译器make与Makefile自动化构建 在 Linux 开发中&#xff0c;GDB 调试器和 Git 版本控制工具是开发者必备的利器。GDB 帮助快速定位代码问题&#xff0c;G…

linux下shell中使用上下键翻出历史命名时出现^[[A^[[A^[[A^[[B^[[B的问题解决

前言 今天在使用linux的时候&#xff0c;使用上下键想翻出历史命令时&#xff0c;却出现[[A[[A[[A[[B^[[B这种东东&#xff0c;而tab键补全命令的功能也无法使用。最终发现是由于当前用户使用的shell是/bin/sh的原因。 解决方法 运行以下命令&#xff0c;将默认 shell 设置为…

计算机网络之---ICMP协议与Ping命令

ICMP 协议 ICMP (Internet Control Message Protocol) 是一种网络层协议&#xff0c;主要用于在 IP 网络中传递控制消息。ICMP 主要用于网络设备之间的故障报告和诊断&#xff0c;帮助设备检测网络连接问题。它是 IP 协议的核心部分之一&#xff0c;用于发送错误消息和操作信息…