Spark的一些高级用法

news/2024/9/17 8:50:24/ 标签: spark, 大数据, 分布式

Java 中实现 Spark 的一些高级用法。

1. 使用 DataFrame 和 Spark SQL

在 Spark 中,使用 DataFrame 来处理结构化数据并执行 SQL 查询是非常常见的。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class SparkSQLExample {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("Spark SQL Example").master("local[*]").getOrCreate();// 读取 JSON 文件为 DataFrameDataset<Row> df = spark.read().json("hdfs://path/to/json");// 注册 DataFrame 为临时视图df.createOrReplaceTempView("people");// 使用 Spark SQL 查询Dataset<Row> result = spark.sql("SELECT name, age FROM people WHERE age > 25");// 显示结果result.show();// 显示物理和逻辑执行计划result.explain(true);spark.stop();}
}

2. 缓存和持久化 (Caching and Persistence)

在频繁使用某个 DataFrame 时,可以通过缓存提高性能。这里是 Java 代码示例:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class SparkCachingExample {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("Caching Example").master("local[*]").getOrCreate();// 读取 Parquet 文件为 DataFrameDataset<Row> df = spark.read().parquet("hdfs://path/to/parquet");// 缓存 DataFramedf.cache();// 对缓存数据进行操作long count = df.filter(df.col("age").gt(30)).count();System.out.println("Count: " + count);// 清除缓存df.unpersist();spark.stop();}
}

3. UDF (用户自定义函数)

你可以通过 UDF 在 Spark SQL 中使用自定义的 Java 函数。以下是如何在 Java 中注册并使用 UDF 的示例:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;public class SparkUDFExample {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("UDF Example").master("local[*]").getOrCreate();// 读取 JSON 文件为 DataFrameDataset<Row> df = spark.read().json("hdfs://path/to/json");// 注册 UDF 将字符串转换为大写spark.udf().register("toUpperCase", new UDF1<String, String>() {@Overridepublic String call(String s) {return s.toUpperCase();}}, DataTypes.StringType);// 使用 UDF 在 SQL 查询中调用df.createOrReplaceTempView("people");Dataset<Row> result = spark.sql("SELECT name, toUpperCase(name) AS name_upper FROM people");// 显示结果result.show();spark.stop();}
}

4. 持久化 (Persistence) 和自定义存储级别

在 Spark 中,可以选择不同的持久化级别,以适应不同的场景。以下是使用 Java 代码实现自定义存储级别的示例:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.sql.SparkSession;public class SparkPersistExample {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("Persistence Example").master("local[*]").getOrCreate();JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());// 创建 RDDJavaRDD<String> rdd = sc.textFile("hdfs://path/to/text");// 使用自定义持久化级别,将数据存储在内存和磁盘中rdd.persist(StorageLevel.MEMORY_AND_DISK());// 对持久化的 RDD 进行操作long count = rdd.count();System.out.println("Count: " + count);// 清除持久化rdd.unpersist();spark.stop();}
}

5. Structured Streaming 实时流处理

Spark Structured Streaming 是用于实时流处理的高级 API。你可以从 Kafka 等数据源读取数据,并对其进行实时处理。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SparkStreamingExample {public static void main(String[] args) throws StreamingQueryException {SparkSession spark = SparkSession.builder().appName("Structured Streaming Example").master("local[*]").getOrCreate();// 从 Kafka 中读取实时数据Dataset<Row> kafkaStream = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test_topic").load();// 将 Kafka 的消息转为字符串Dataset<Row> values = kafkaStream.selectExpr("CAST(value AS STRING)");// 开始流式查询,将结果输出到控制台StreamingQuery query = values.writeStream().outputMode("append").format("console").start();// 等待流式查询结束query.awaitTermination();}
}

总结

通过 DataFrame、SQL、持久化、UDF 和流处理,你可以更高效地处理不同场景下的大数据任务。在实际应用中,结合合适的存储和优化策略,可以显著提升 Spark 作业的性能。


http://www.ppmy.cn/news/1522822.html

相关文章

2024高教社杯数学建模国赛ABCDE题选题建议+初步分析

提示&#xff1a;DS C君认为的难度&#xff1a;C<B<A&#xff0c;开放度&#xff1a;A<C<B 。 D、E题推荐选E题&#xff0c;后续会直接更新E论文和思路&#xff0c;不在这里进行选题分析&#xff0c;以下为A、B、C题选题建议及初步分析 A题&#xff1a;“板凳龙”…

RPC使用的关键技术

RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;是分布式系统中常用的一种通信方式&#xff0c;它允许程序调用位于不同计算机上的方法或函数&#xff0c;就像调用本地方法一样。为了实现这种透明且高效的远程调用&#xff0c;RPC 框架依赖于多种关…

【软件设计】常用设计模式--策略模式

软件设计模式&#xff08;三&#xff09; 策略模式&#xff08;Strategy Pattern&#xff09;1. 概念2. 模式结构3. UML 类图4. 实现方式C# 示例步骤1&#xff1a;定义策略接口步骤2&#xff1a;实现具体策略类步骤3&#xff1a;实现上下文类步骤4&#xff1a;使用策略模式 Jav…

驱动(RK3588S)第八课时:平台设备总线

目录 目标一、平台设备总线的概念1、什么是平台设备总线2、平台设备总线 platform 的匹配3、设备树和平台设备总线的关系&#xff0c;以及匹配 二、平台设备总线的函数接口1、注册设备端的资源信息2、设备端提供的资源的信息3、注销申请的设备端的资源4、驱动端的函数&#xff…

逻辑表达式,最小项

目录 得到此图的逻辑电路 1.画出它的真值表 2.根据真值表写出逻辑式 3.画逻辑图 逻辑函数的表示 逻辑表达式 最小项 定义 基本性质 最小项编号 最小项表达式 得到此图的逻辑电路 1.画出它的真值表 这是同或的逻辑式。 2.根据真值表写出逻辑式 3.画逻辑图 有两种画法…

Android Fragment 学习备忘

1.fragment的动态添加与管理&#xff0c;fragment生命周期在后面小节&#xff1a;https://www.bilibili.com/video/BV1Ng411K7YP/?p37&share_sourcecopy_web&vd_source982a7a7c05972157e8972c41b546f9e4https://www.bilibili.com/video/BV1Ng411K7YP/?p37&share_…

Python 读取 Excel 数据|数据处理|Pandas|Excel操作

目录 1. 为什么选择 Python 读取 Excel 数据 2. Python 读取 Excel 数据的基本工具 2.1 Pandas 库 2.2 Openpyxl 库 2.3 xlrd 库 3. 读取 Excel 文件的高级操作 3.1 读取特定的工作表 3.2 读取特定的列和行 3.3 处理缺失数据 4. 实践应用示例 4.1 数据分析和可视化 …

ngrok | 内网穿透,支持 HTTPS、国内访问、静态域名

前言 当我们需要把本地开发的应用展示给外部用户时&#xff0c;常常会因为无法直接访问而陷入困境。 就为了展示一下&#xff0c;买服务、域名&#xff0c;搭环境&#xff0c;费钱又费事。 那有没有办法&#xff0c;让客户直接访问自己本机开发的应用呢&#xff1f; 这种需…

表格多列情况下,loading不显示问题

问题描述&#xff1a; 用element plus 做得表格&#xff0c;如下图&#xff0c;列数较多&#xff0c;且部分表格内容显示比较复杂&#xff0c;数据量中等的情况下&#xff0c;有一个switch 按钮&#xff0c;切换部分列的显示和隐藏&#xff0c;会发现&#xff0c;切换为显示的时…

逻辑运算基础知识

关系运算符 <:小于 <:小于等于 >:大于 >:大于等于 以上优先级相同&#xff1a;高 &#xff1a;等于 !&#xff1a;不等于 以上优先级相同&#xff1a;低 说明&#xff1a; 关系运算符的 优先级 低于 算数运算符 关系运算符的 优先级 大于 赋值运算符 逻辑运算&a…

前向渲染路径

1、前向渲染路径处理光照的方式 前向渲染路径中会将光源分为以下3种处理方式&#xff1a; 逐像素处理&#xff08;需要高等质量处理的光&#xff09;逐顶点处理&#xff08;需要中等质量处理的光&#xff09;球谐函数&#xff08;SH&#xff09;处理&#xff08;需要低等质量…

如何使用 PHP 函数与其他 Web 服务交互?

在 PHP 中&#xff0c;我们可以使用 cURL 或者 file_get_contents 函数与其他 Web 服务进行交互。 使用 cURL 函数 cURL 是一个库&#xff0c;它允许你使用各种类型的协议来发送数据&#xff0c;并从服务器获取数据。 $curl curl_init(‘http://example.com/api’); curl_s…

SprinBoot+Vue漫画天堂网的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质…

前端框架有哪些?以及每种框架的详细介绍

目录 前言1. React2. Vue.js3. Angular4. Bootstrap5. Foundation总结 前言 前端框架是Web开发中不可或缺的工具&#xff0c;它们为开发者提供了丰富的工具和抽象&#xff0c;使得构建复杂的Web应用变得更加容易。当前&#xff0c;前端框架种类繁多&#xff0c;其中一些最受欢…

【全网最全】2024年数学建模国赛A题30页完整建模文档+17页成品论文+保奖matla代码+可视化图表等(后续会更新)

您的点赞收藏是我继续更新的最大动力&#xff01; 一定要点击如下的卡片那是获取资料的入口&#xff01; 【全网最全】2024年数学建模国赛A题30页完整建模文档17页成品论文保奖matla代码可视化图表等&#xff08;后续会更新&#xff09;「首先来看看目前已有的资料&#xff0…

应用开发“取经路”,华为应用市场送出全周期服务“助攻”

最近大量国内外玩家被西游神话圈粉&#xff0c;化身游戏人物角色&#xff0c;踏上了充满冒险的取经路。如果让莘莘学子或创业者们&#xff0c;在自己的职业生涯中&#xff0c;也选一个机遇跟挑战并存的角色&#xff0c;“开发者”一定榜上有名。 智能手机和移动互联网的普及&am…

30天pandas挑战

大的国家 挑选出符合要求的行 def big_countries(world: pd.DataFrame) -> pd.DataFrame:df world[(world[area] > 3000000) | (world[population] > 25000000)]return df[[name,population,area]] 在Pandas中&#xff0c;当你使用条件过滤时&#xff0c;应该使用 …

记一次升级 Viper、ETCD V3操作Toml

前一阵子碰到Go写的一项目&#xff0c;使用viper和ETCD进行Toml文件的存储与写入。在当我安装新版本的ETCD和升级Go依赖包之后出现了不兼容的问题。旧版viper为1.10版本&#xff0c;使用github.com/coreos/go-etcd v2.0.0incompatible 作为请求包。看了源码之后发现新的版本中废…

HashMap 底层原理解析

HashMap 是 Java 中非常常用的一个数据结构&#xff0c;它基于哈希表实现&#xff0c;提供了快速的键值对存储和检索。本文将深入探讨 HashMap 的底层实现原理&#xff0c;包括其数据结构、哈希函数、冲突解决机制以及扩容机制。 1. 哈希表基础 哈希表是一种通过哈希函数将键…

【重学 MySQL】二十、运算符的优先级

【重学 MySQL】二十、运算符的优先级 MySQL 运算符的优先级&#xff08;由高到低&#xff09;注意事项示例 在 MySQL 中&#xff0c;运算符的优先级决定了在表达式中各个运算符被计算的先后顺序。了解运算符的优先级对于编写正确且高效的 SQL 语句至关重要。以下是根据高权威性…