大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流

server/2024/10/18 16:54:07/

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark Streaming 基础概述
  • Spark Streaming 架构概念
  • 编程模型
  • 优点缺点概括
  • 与 Kafka 集成

在这里插入图片描述

基础概念

基础数据源包括:

  • 文件系统(File System):Spark Streaming 支持监控 HDFS、S3、本地文件系统等目录中的新文件,并将这些文件作为数据流的一部分进行处理。这个数据源适用于处理批量生成的文件。
  • Socket 数据流(Socket Stream):这是最简单的数据源之一,Spark Streaming 可以通过 TCP 套接字连接接收文本数据流。例如,你可以使用 nc(Netcat)工具向指定端口发送数据,Spark Streaming 可以实时读取这些数据。
  • Kafka:Kafka 是一个分布式消息系统,常用于构建实时流处理应用。Spark Streaming 提供了直接和高级两种 Kafka 数据源集成方式,支持从 Kafka 主题中读取数据流。
  • Flume:Apache Flume 是一个分布式、可靠且高可用的系统,用于高效收集、聚合和传输大量日志数据。Spark Streaming 可以通过 Flume 接收数据并处理,常用于日志收集和分析。
  • Kinesis:Amazon Kinesis 是一个实时流处理服务,Spark Streaming 提供了 Kinesis 数据源的支持,能够从 Kinesis 流中读取数据,并进行实时分析。
  • 自定义数据源:Spark Streaming 允许用户实现自定义的输入源。用户可以通过实现 Receiver 类或使用 Direct DStream API 来创建新的数据源。

在这里插入图片描述

引入依赖

我们使用的话,需要引入依赖:

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version>
</dependency>

文件数据流

基础概念

通过 textFileStreama 方法进行读取 HDFS 兼容的文件系统文件
Spark Streaming 将会监控 directory 目录,并不断处理移动进来的文件

  • 不支持嵌套目录
  • 文件需要有相同的数据格式
  • 文件进入 Directory 的方式需要通过移动或者重命名来实现
  • 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据
  • 文件流不需要接收器(Receiver),不需要单独分配CPU核

编写代码

package icu.wzkimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object FileDStream {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")// 时间间隔val ssc = new StreamingContext(conf, Seconds(5))// 本地文件,也可以使用 HDFS 文件val lines = ssc.textFileStream("goodtbl.java")val words = lines.flatMap(_.split("\\s+"))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)// 打印信息wordCounts.print()ssc.start()ssc.awaitTermination()}
}

代码解析

  • object FileDStream: 定义了一个名为 FileDStream 的单例对象,包含 main 方法,这是 Scala 中的入口点,相当于 Java 的 public static void main 方法。
  • Logger.getLogger(“org”).setLevel(Level.ERROR): 这行代码将日志级别设置为 ERROR,以减少控制台输出的日志信息,只显示错误级别的信息。这通常是为了避免不必要的日志干扰核心的输出。
  • val conf = new SparkConf(): 创建一个 SparkConf 对象,包含了应用程序的配置信息。
  • setAppName(“FileDStream”): 设置应用程序的名称为 “FileDStream”。这个名称会在 Spark Web UI 中显示,用于识别应用。
  • setMaster("local[]"): 设置 Spark 的运行模式为本地模式(local[]),这意味着应用程序将在本地运行,并使用所有可用的 CPU 核心。
  • val ssc = new StreamingContext(conf, Seconds(5)): 创建一个 StreamingContext 对象,负责管理 Spark Streaming 应用程序的上下文。Seconds(5) 指定了微批处理的时间间隔为 5 秒,也就是每 5 秒钟会处理一次数据。
  • val words = lines.flatMap(_.split(“\s+”)): 对每一行文本内容进行处理,使用空格或其他空白字符(\s+)进行分割,将每行文本拆分成单词。flatMap 操作会将结果展开为一个包含所有单词的 DStream。
  • val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _): 通过 map 操作将每个单词映射为 (word, 1) 形式的键值对,然后使用 reduceByKey 按键(即单词)进行聚合,计算每个单词的出现次数。
  • wordCounts.print(): 将计算结果打印到控制台,每 5 秒钟输出一次当前批次中每个单词的计数结果。
  • ssc.start(): 启动 Spark Streaming 的计算,这会使得 Spark 开始监听数据源并开始处理数据流。
  • ssc.awaitTermination(): 阻塞当前线程,等待流计算结束,通常是等待手动停止应用程序。这个方法会让程序保持运行,直到手动终止或遇到异常。

运行结果

【备注:使用 local[],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,使用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理接受到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

在这里插入图片描述

Socket数据流

编写代码

Spark Streaming 可以通过Socket端口监听并接受数据,然后进行相应处理:
打开一个新的命令窗口,启动 nc 程序。(在Flink中也这么用过)

# 如果没有的话 你需要安装一下
nc -lk 9999

编写运行的代码:

package icu.wzkimport org.apache.log4j.{Level, Logger}
import org.apachea.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object SocketDStream {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val conf = new SparkConf().setAppName("SocketStream").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(1));val lines = ssc.socketTextStream("0.0.0.0", 9999)val words = lines.flatMap(_.split("\\s+"))val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)wordCount.print()ssc.start()ssc.awaitTermination()}
}

随后可以在nc窗口中随意输入一些单词,监听窗口会自动获取单词数据流信息,在监听窗口每X秒就会打印出词频的统计信息,可以在屏幕是上出现结果。

运行结果

【备注:使用 local[],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,使用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理接受到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

在这里插入图片描述
此时,从控制台启动后,输入内容
在这里插入图片描述

RDD队列流

基础概念

调用 Spark Streaming应用程序的时候,可使用 streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
备注:

  • oneAtTime:缺省为true,一次处理一个RDD,设为False,一次处理全部RDD
  • RDD队列流可以使用 local[1]
  • 涉及到同时出队和入队操作,所以要做同步

每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处理,计算RDD中数据除10取余的个数。

队列流优点

  • 适用于测试和开发:RDD 队列流主要用于开发和调试阶段,它允许你在没有真实数据源的情况下测试 Spark Streaming 应用程序。
  • RDD 队列:你可以创建一个包含 RDD 的队列(Queue),Spark Streaming 会从这个队列中逐一获取 RDD,并将其作为数据流的一部分进行处理。
  • 灵活性:由于是手动创建的 RDD 队列,因此你可以完全控制数据的内容、数量以及生成的速度,从而测试各种场景下的应用表现。

编写代码

编写代码如下:

package icu.wzkimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.Queueobject RDDQueueDStream {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)val sparkConf = new SparkConf().setAppName("RDDQueueStream").setMaster("local[*]")val ssc = new StreamingContext(sparkConf, Seconds(1))val rddQueue = new Queue[RDD[Int]]()val queueStream = ssc.queueStream(rddQueue)val mappedStream = queueStream.map(r => (r % 10, 1))val reducedStream = mappedStream.reduceByKey(_ + _)reducedStream.print()ssc.start()for (i <- 1 to 5) {rddQueue.synchronized {val range = (1 to 100).map(_ * i)rddQueue += ssc.sparkContext.makeRDD(range, 2)}Thread.sleep(2000)}ssc.stop()}
}

运行结果

运行结果如图所示:
在这里插入图片描述


http://www.ppmy.cn/server/105464.html

相关文章

Ps:首选项 - 常规

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“常规” General选项卡主要用于调整 Photoshop 的整体工作行为和用户体验。这些设置让用户可以根据个人习惯和工作流程定制软件的响应方式和界面布局&#xff0c;从而提高工作…

Wireshark

下面是Wireshark 4.2.1版本的命令行参数整理成表格的形式&#xff1a; 参数/选项说明-i <interface>指定捕获接口的名称或索引&#xff08;默认&#xff1a;第一个非回环接口&#xff09;-f <capture filter>libpcap过滤语法的包过滤-s <snaplen>包快照长度…

C++基础面试题 | C和C++的区别?

人生如逆旅&#xff0c;我亦是行人。 - 《临江仙送钱穆父》(苏轼) 2024.8.23 回答重点&#xff1a;C可以认为是C语言的超集&#xff0c;绝大部分C语言代码可以使用C运行 在设计思想上&#xff1a;面向过程 vs 面向对象 C是一种面向过程的编程语言&#xff0c;它侧重于函数和过程…

鸿蒙笔记--WorkerTaskPool

这一节了解一下鸿蒙中的Worker和TaskPool,Worker和TaskPool的作用是为应用程序提供一个多线程的运行环境&#xff0c;用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务阻塞主线程&#xff0c;从而最大化系统的利用率&#xff0c;降低整体资源消耗&#xff0c;并…

C++前向声明简介

前向声明 class a; class b; class c:public d { ..... }类a和b已经实现了具体功能&#xff0c;类c在定义&#xff0c;在类c上面声明类a和b有什么作用 在类 c 的定义上面声明类 a 和 b 的作用主要是为了确保在编译时能够识别这两个类的存在&#xff0c;特别是在类 c 中可能会使…

数据、信息与知识

目录 什么是数据&#xff1f; 什么是信息&#xff0c;信息的特征是什么&#xff1f; 什么是知识&#xff1f; 他们之间存在的关系&#xff1f; 什么是数据&#xff1f; 数据&#xff0c;它是一种描述事物的符号记录&#xff0c;是信息的载体。 应用在社会的方方面面&#…

SSRF漏洞——pikachu

环境搭建 pikachu文件如下&#xff1a; 通过百度网盘分享的文件&#xff1a;pikachu-master.zip 链接&#xff1a;https://pan.baidu.com/s/1HuV2llJzx1c7Ii6u-r4s3Q?pwdqwer 提取码&#xff1a;qwer 解压至小皮WWW文件夹下&#xff0c;进入config.inc.php中修改MySQL名字…

如何保证支付服务和交易服务订单状态一致?

消息传给消费者&#xff0c;消费者自己弄丢 业务幂等 所有的业务都应该保证幂等性&#xff0c; 如何保障业务幂等性 非幂等业务表单重复提交&#xff0c;在进入表单之前生成唯一标识&#xff0c;未token&#xff0c;携带token进行请求&#xff0c;执行表单提交&#xff0c;把…