Spark-Streaming receiver模式源码解析

ops/2024/12/21 9:16:30/

一、上下文

《Spark-Streaming初识》博客中我们用NetworkWordCount例子大致了解了Spark-Streaming receiver模式的运行。下面我们就通过该代码进行源码分析,深入了解其原理。

二、构建StreamingContext

它是Spark Streaming功能的主要入口点。并提供了从各种输入源创建[org.apache.spark.streaming.dstream.DStream]的方法,且此时就要指明最小的微批时间。

此外StreamingContext里面包含了SparkContext,SparkContext又是Spark功能的入口点,SparkContext中有SparkConf、SparkEnv、TaskScheduler、DAGScheduler、SparkStatusTracker。可以用它与Spark集群的连接,并在该集群上创建RDD、累加器和广播变量,和任务的调度。

从StreamingContext就可以看出,Spark Streaming是每隔指定的微批时间进行job的调度去处理该批次数据,这也是Spark Streaming处理数据的最细粒度:一个批次的数据。

三、创建ReceiverInputDStream

NetworkWordCount中是从TCP源hostname:port创建输入流。使用TCP socket 接收数据,接收字节被解释为UTF8编码的“\n”分隔行。因此返回的是SocketInputDStream。如果数据源是来自HDFS,那么返回的将是FileInputDStream,他们都属于InputDStream(再往上是DStream)。

这里要传一个StorageLevel,如果没有传,默认值为:StorageLevel.MEMORY_AND_DISK_SER_2,这也意为这获取到的微批数据是存在了BlockManager中,且存储级别为内存或磁盘序列化,份数为2。

从源码中可以看出,SocketInputDStream会创建一个名为Socket Receiver的线程去创建一个socket去读取数据,并将其转化为迭代器,存储到Spark的BlockManager中。

四、调用算子进行计算

val words = lines.flatMap(_.split(" "))
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))}

是不是和RDD的操作非常像,RDD经过转化算子还是返回RDD,DStream经过转化算子同样返回DStream

val resultRdd = sourceRdd.flatMap(_.split(","))
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))}

且两者都有compute方法,RDD的compute()是返回一个迭代器,DStream的compute()是返回一个Option[RDD[T]]。但最终还是会调用RDD的转化算子,因此最终还是利用了迭代器模式。

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](parent: DStream[T],flatMapFunc: T => TraversableOnce[U]) extends DStream[U](parent.ssc) {override def dependencies: List[DStream[_]] = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))}
}

parent.getOrCompute(validTime)的返回结果就是一个Option[RDD[T]],getOrCompute中会先判断缓存中是否有RDD,如果有就返回,否则创建并将其缓存并checkpoint。这里面有一个知识点:checkpoint之前先进行persist,因为checkpoint会重新启动一个Job,如果不进行persist,前面RDD的转化会重复走一遍。先将其进行persist不仅可以加速后续的计算,还可以加速checkpoint的过程。这里将RDD进行缓存时为了加速其他窗口的计算和当下job的失败重试。

_.flatMap(flatMapFunc):可以看出最终还是调用了RDD的flatMap算子。

五、调用foreachRDD触发Job生成

wordCounts.print()
abstract class DStream[T: ClassTag] (@transient private[streaming] var ssc: StreamingContext) extends Serializable with Logging {//打印此DStream中生成的每个RDD的前十个元素。这是一个输出运算符,因此此DStream将被注册为输出流并在那里执行前面依赖的所有算子def print(): Unit = ssc.withScope {print(10)}def print(num: Int): Unit = ssc.withScope {//将要传递给foreachRDD的函数def foreachFunc: (RDD[T], Time) => Unit = {(rdd: RDD[T], time: Time) => {val firstNum = rdd.take(num + 1)// 格式化输出操作println("-------------------------------------------")println(s"Time: $time")println("-------------------------------------------")firstNum.take(num).foreach(println)if (firstNum.length > num) println("...")println()}}foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)}private def foreachRDD(foreachFunc: (RDD[T], Time) => Unit,displayInnerRDDOps: Boolean): Unit = {new ForEachDStream(this,context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}}
private[streaming]
class ForEachDStream[T: ClassTag] (parent: DStream[T],foreachFunc: (RDD[T], Time) => Unit,displayInnerRDDOps: Boolean) extends DStream[Unit](parent.ssc) {override def dependencies: List[DStream[_]] = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[Unit]] = None//在给定时间内生成SparkStreaming作业。同样的也生成相应的RDD的作业,override def generateJob(time: Time): Option[Job] = {parent.getOrCompute(time) match {case Some(rdd) =>val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {foreachFunc(rdd, time)}Some(new Job(time, jobFunc))case None => None}}
}

可以看到print()最终生成了ForEachDStream,并将输出流添加到outputStreams中。并最终生成Job去运行,因此一个微批中可以式1个job,也可以式多个job。

六、启动流运行

    ssc.start()             // 开始计算ssc.awaitTermination()  // 等待计算终止

最终会通过StreamingContext来启动该流式计算。

1、StreamingContext 

class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {def start(): Unit = synchronized {state match {case INITIALIZED =>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized {StreamingContext.assertNoOtherContextIsActive()try {//这里会校验以下几点://1、outputStreams是否为空//2、如果设置了checkpoint 那么 时间间隔也需要设置,一般是微批的间隔//3、如果设置了checkpoint 那么 需要支持序列化//4、如果启用了动态分配,需要对不可重放的源启用预写日志validate()//注册监听程序registerProgressListener()//启动一个新的线程:streaming-start//在新线程中启动流式调度程序,以便可以重置调用站点和作业组等线程本地属性,而不会影响当前线程的属性。ThreadUtils.runInNewThread("streaming-start") {sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))//启动JobSchedulerscheduler.start()}state = StreamingContextState.ACTIVEscheduler.listenerBus.post(StreamingListenerStreamingStarted(System.currentTimeMillis()))} catch {//......}StreamingContext.setActiveContext(this)}//......}}}

2、JobScheduler

JobScheduler使用JobGenerator生成作业并在Spark上进行调度。JobScheduler使用线程池运行Job。

private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {def start(): Unit = synchronized {if (eventLoop != null) return // scheduler 已经启动logDebug("Starting JobScheduler")//一个事件循环,用于从调用者接收事件并处理事件线程中的所有事件。它将启动一个独占事件线程来处理所有事件。eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}//启动事件处理eventLoop.start()//连接输入流的速率控制器以接收批处理完成更新for {//获取源的输入流inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)//流式监听器总线,用于将事件转发到流式监听器//启动listenerBus.start()//此类管理ReceiverInputDStreams接收器的执行。必须在添加所有输入流并调用StreamingContext.start()后创建此类的实例,因为它在实例化时需要最后一组输入流。receiverTracker = new ReceiverTracker(ssc)//此类管理所有输入流及其输入数据统计信息。这些信息将通过StreamingListener进行监控。inputInfoTracker = new InputInfoTracker(ssc)//与集群管理器通信以请求或杀死executor的客户端。目前仅在YARN模式下支持此功能。val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]case _ => null}//管理分配给StreamingContext的executor的类,并根据流计算的统计数据动态请求或杀死executors//这与核心动态分配政策不同;核心策略依赖于executor在一段时间内处于空闲状态,//但流式处理的微批处理模型可以防止任何特定的executor长时间处于空闲状态。相反,“闲置”的衡量标准需要基于处理每一批所花费的时间。//从高层来看,该类实施的政策如下://1、使用StreamingListener接口获取已完成批次的批处理时间//2、定期计算平均批处理完成时间,并与批处理间隔进行比较//3、如果(平均过程时间/批处理间隔)>=按比例放大,则请求更多executors。请求的executors数量基于比率=(平均proc.time/批处理间隔)。//4、如果(avg.proc.time/batch interval)<=按比例缩小,则尝试杀死一个没有运行接收器的executor。executorAllocationManager = ExecutorAllocationManager.createIfEnabled(executorAllocClient,receiverTracker,ssc.conf,ssc.graph.batchDuration.milliseconds,clock)//依次将ExecutorAllocationManager添加到StreamingListenerexecutorAllocationManager.foreach(ssc.addStreamingListener)//启动endpoint和接收器执行线程//且这里会循环 将ReceiverInputDStream 获取的数据 做成一个 rdd//内部会创建一个BlockGenerator对象,用于将接收到的数据流划分为数据块//这些数据块存储在BlockManager中,//BlockManager是负责Spark上所有的数据的存储与管理的一个极其重要的组件。//每个Executor都有一个BlockManager,而Executor上的BlockManager实例是由Driver端上的BlockManagerMaster统一管理,为spark运行job提供数据支持receiverTracker.start()//开始创建job//将接收到的块分配给批处理jobGenerator.start()executorAllocationManager.foreach(_.start())logInfo("Started JobScheduler")}}

http://www.ppmy.cn/ops/143719.html

相关文章

Day28 C++ 命名空间

2024.12.20 C 命名空间 假设这样一种情况&#xff0c;当一个班上有两个名叫 Zara 的学生时&#xff0c;为了明确区分它们&#xff0c;我们在使用名字之外&#xff0c;不得不使用一些额外的信息&#xff0c;比如他们的家庭住址&#xff0c;或者他们父母的名字等等。 同样的情况…

【Spring】Spring的模块架构与生态圈—数据访问与集成(JDBC、ORM、Transactions)

在企业级应用中&#xff0c;数据的存储和访问是核心功能之一。Java开发语言通过Spring框架提供了多种方式来实现数据访问和集成&#xff0c;包括JDBC&#xff08;Java Database Connectivity&#xff09;、ORM&#xff08;对象关系映射&#xff09;以及事务管理。这些技术的有效…

出现 java.io.UncheckedIOException: Cannot delete Local\Temp\tomcat xxx.tmp 文件无法删除

目录 1. 问题所示2. 原理分析3. 解决方法3.1 kill(初审)3.2 代码Bug(严查)3.3 核心Bug(严查)3.4 版本(暂定)1. 问题所示 执行代码的时候,出现如下问题: java.io.UncheckedIOException: Cannot delete C:\Users\lixiaosong\AppData\Local\Temp\tomcat.48080.1595710…

Spring Boot应用关闭分析

优质博文&#xff1a;IT-BLOG-CN 一、使用spring容器的close方法关闭。 可通过在代码中获取SpringContext并调用close方法去关闭容器。 使用SpringApplication的exit方法。 public static int exit(ApplicationContext context,ExitCodeGenerator... exitCodeGenerators) {…

Serverless监控和调试、持续集成和持续部署

接下来,我们将探讨Serverless架构中的监控和调试,以及如何在Serverless环境中实现持续集成和持续部署(CI/CD)。 在Serverless架构中,监控和调试是确保应用健康运行的关键。以下是一些监控和调试的最佳实践: 日志聚合:使用云服务提供商的日志服务(如AWS CloudWatch、Azu…

LCD1602显示模块详解

LCD1602&#xff08; Liquid Crystal Display 1602&#xff09;&#xff0c;一种常见的字符型液晶显示模块。虽然它接线多&#xff0c;但是LCD1602是每个嵌入式工程师的必经之路。它能够显示16列2行&#xff0c;共32个字符字符&#xff0c;每个字符都由5x8像素点阵构成&#xf…

百货中心供应链管理系统【源码+文档】

目录 1引言 1 1.1课题背景 1 1.2目的和意义 2 1.3研究内容和组织结构 2 2开发工具及技术 3 2.1开发工具 3 2.2使用技术 4 3可行性分析 6 3.1法律的可行性 6 3.2经济的可行性 6 3.3技术的可行性 6 4需求分析 7 4.1系统功能分析 7 4.1.1经理用例图 7 4.1.2人事部员工用例图 7 4.1…

JAVA:代理模式(Proxy Pattern)的技术指南

1、简述 代理模式(Proxy Pattern)是一种结构型设计模式,用于为其他对象提供一种代理,以控制对这个对象的访问。通过代理模式,我们可以在不修改目标对象代码的情况下扩展功能,满足特定的需求。 设计模式样例:https://gitee.com/lhdxhl/design-pattern-example.git 2、什…