一、上下文
《Spark-Streaming初识》博客中我们用NetworkWordCount例子大致了解了Spark-Streaming receiver模式的运行。下面我们就通过该代码进行源码分析,深入了解其原理。
二、构建StreamingContext
它是Spark Streaming功能的主要入口点。并提供了从各种输入源创建[org.apache.spark.streaming.dstream.DStream]的方法,且此时就要指明最小的微批时间。
此外StreamingContext里面包含了SparkContext,SparkContext又是Spark功能的入口点,SparkContext中有SparkConf、SparkEnv、TaskScheduler、DAGScheduler、SparkStatusTracker。可以用它与Spark集群的连接,并在该集群上创建RDD、累加器和广播变量,和任务的调度。
从StreamingContext就可以看出,Spark Streaming是每隔指定的微批时间进行job的调度去处理该批次数据,这也是Spark Streaming处理数据的最细粒度:一个批次的数据。
三、创建ReceiverInputDStream
NetworkWordCount中是从TCP源hostname:port创建输入流。使用TCP socket 接收数据,接收字节被解释为UTF8编码的“\n”分隔行。因此返回的是SocketInputDStream。如果数据源是来自HDFS,那么返回的将是FileInputDStream,他们都属于InputDStream(再往上是DStream)。
这里要传一个StorageLevel,如果没有传,默认值为:StorageLevel.MEMORY_AND_DISK_SER_2,这也意为这获取到的微批数据是存在了BlockManager中,且存储级别为内存或磁盘序列化,份数为2。
从源码中可以看出,SocketInputDStream会创建一个名为Socket Receiver的线程去创建一个socket去读取数据,并将其转化为迭代器,存储到Spark的BlockManager中。
四、调用算子进行计算
val words = lines.flatMap(_.split(" "))
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))}
是不是和RDD的操作非常像,RDD经过转化算子还是返回RDD,DStream经过转化算子同样返回DStream
val resultRdd = sourceRdd.flatMap(_.split(","))
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))}
且两者都有compute方法,RDD的compute()是返回一个迭代器,DStream的compute()是返回一个Option[RDD[T]]。但最终还是会调用RDD的转化算子,因此最终还是利用了迭代器模式。
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](parent: DStream[T],flatMapFunc: T => TraversableOnce[U]) extends DStream[U](parent.ssc) {override def dependencies: List[DStream[_]] = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))}
}
parent.getOrCompute(validTime)的返回结果就是一个Option[RDD[T]],getOrCompute中会先判断缓存中是否有RDD,如果有就返回,否则创建并将其缓存并checkpoint。这里面有一个知识点:checkpoint之前先进行persist,因为checkpoint会重新启动一个Job,如果不进行persist,前面RDD的转化会重复走一遍。先将其进行persist不仅可以加速后续的计算,还可以加速checkpoint的过程。这里将RDD进行缓存时为了加速其他窗口的计算和当下job的失败重试。
_.flatMap(flatMapFunc):可以看出最终还是调用了RDD的flatMap算子。
五、调用foreachRDD触发Job生成
wordCounts.print()
abstract class DStream[T: ClassTag] (@transient private[streaming] var ssc: StreamingContext) extends Serializable with Logging {//打印此DStream中生成的每个RDD的前十个元素。这是一个输出运算符,因此此DStream将被注册为输出流并在那里执行前面依赖的所有算子def print(): Unit = ssc.withScope {print(10)}def print(num: Int): Unit = ssc.withScope {//将要传递给foreachRDD的函数def foreachFunc: (RDD[T], Time) => Unit = {(rdd: RDD[T], time: Time) => {val firstNum = rdd.take(num + 1)// 格式化输出操作println("-------------------------------------------")println(s"Time: $time")println("-------------------------------------------")firstNum.take(num).foreach(println)if (firstNum.length > num) println("...")println()}}foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)}private def foreachRDD(foreachFunc: (RDD[T], Time) => Unit,displayInnerRDDOps: Boolean): Unit = {new ForEachDStream(this,context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}}
private[streaming]
class ForEachDStream[T: ClassTag] (parent: DStream[T],foreachFunc: (RDD[T], Time) => Unit,displayInnerRDDOps: Boolean) extends DStream[Unit](parent.ssc) {override def dependencies: List[DStream[_]] = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[Unit]] = None//在给定时间内生成SparkStreaming作业。同样的也生成相应的RDD的作业,override def generateJob(time: Time): Option[Job] = {parent.getOrCompute(time) match {case Some(rdd) =>val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {foreachFunc(rdd, time)}Some(new Job(time, jobFunc))case None => None}}
}
可以看到print()最终生成了ForEachDStream,并将输出流添加到outputStreams中。并最终生成Job去运行,因此一个微批中可以式1个job,也可以式多个job。
六、启动流运行
ssc.start() // 开始计算ssc.awaitTermination() // 等待计算终止
最终会通过StreamingContext来启动该流式计算。
1、StreamingContext
class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {def start(): Unit = synchronized {state match {case INITIALIZED =>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized {StreamingContext.assertNoOtherContextIsActive()try {//这里会校验以下几点://1、outputStreams是否为空//2、如果设置了checkpoint 那么 时间间隔也需要设置,一般是微批的间隔//3、如果设置了checkpoint 那么 需要支持序列化//4、如果启用了动态分配,需要对不可重放的源启用预写日志validate()//注册监听程序registerProgressListener()//启动一个新的线程:streaming-start//在新线程中启动流式调度程序,以便可以重置调用站点和作业组等线程本地属性,而不会影响当前线程的属性。ThreadUtils.runInNewThread("streaming-start") {sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))//启动JobSchedulerscheduler.start()}state = StreamingContextState.ACTIVEscheduler.listenerBus.post(StreamingListenerStreamingStarted(System.currentTimeMillis()))} catch {//......}StreamingContext.setActiveContext(this)}//......}}}
2、JobScheduler
JobScheduler使用JobGenerator生成作业并在Spark上进行调度。JobScheduler使用线程池运行Job。
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {def start(): Unit = synchronized {if (eventLoop != null) return // scheduler 已经启动logDebug("Starting JobScheduler")//一个事件循环,用于从调用者接收事件并处理事件线程中的所有事件。它将启动一个独占事件线程来处理所有事件。eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}//启动事件处理eventLoop.start()//连接输入流的速率控制器以接收批处理完成更新for {//获取源的输入流inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)//流式监听器总线,用于将事件转发到流式监听器//启动listenerBus.start()//此类管理ReceiverInputDStreams接收器的执行。必须在添加所有输入流并调用StreamingContext.start()后创建此类的实例,因为它在实例化时需要最后一组输入流。receiverTracker = new ReceiverTracker(ssc)//此类管理所有输入流及其输入数据统计信息。这些信息将通过StreamingListener进行监控。inputInfoTracker = new InputInfoTracker(ssc)//与集群管理器通信以请求或杀死executor的客户端。目前仅在YARN模式下支持此功能。val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]case _ => null}//管理分配给StreamingContext的executor的类,并根据流计算的统计数据动态请求或杀死executors//这与核心动态分配政策不同;核心策略依赖于executor在一段时间内处于空闲状态,//但流式处理的微批处理模型可以防止任何特定的executor长时间处于空闲状态。相反,“闲置”的衡量标准需要基于处理每一批所花费的时间。//从高层来看,该类实施的政策如下://1、使用StreamingListener接口获取已完成批次的批处理时间//2、定期计算平均批处理完成时间,并与批处理间隔进行比较//3、如果(平均过程时间/批处理间隔)>=按比例放大,则请求更多executors。请求的executors数量基于比率=(平均proc.time/批处理间隔)。//4、如果(avg.proc.time/batch interval)<=按比例缩小,则尝试杀死一个没有运行接收器的executor。executorAllocationManager = ExecutorAllocationManager.createIfEnabled(executorAllocClient,receiverTracker,ssc.conf,ssc.graph.batchDuration.milliseconds,clock)//依次将ExecutorAllocationManager添加到StreamingListenerexecutorAllocationManager.foreach(ssc.addStreamingListener)//启动endpoint和接收器执行线程//且这里会循环 将ReceiverInputDStream 获取的数据 做成一个 rdd//内部会创建一个BlockGenerator对象,用于将接收到的数据流划分为数据块//这些数据块存储在BlockManager中,//BlockManager是负责Spark上所有的数据的存储与管理的一个极其重要的组件。//每个Executor都有一个BlockManager,而Executor上的BlockManager实例是由Driver端上的BlockManagerMaster统一管理,为spark运行job提供数据支持receiverTracker.start()//开始创建job//将接收到的块分配给批处理jobGenerator.start()executorAllocationManager.foreach(_.start())logInfo("Started JobScheduler")}}