Spark RPC 学习总结

ops/2025/1/16 9:48:38/

前言

本文从API层面学习总结Spark RPC,暂不涉及源码分析。

Spark 通信历史

最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty。
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

废弃Akka的原因

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5293
主要原因是解决用户的Spark Application中akka版本和Spark内置的akka版本冲突的问题。比如,用户开发的Spark Application中用到了Spray框架,Spray依赖的akka版本跟Spark的不一致就会导致冲突:

  1. 很多Spark用户也使用Akka,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全一样的Akka版本,导致用户无法升级Akka。
  2. Spark的Akka配置是针对Spark自身来调优的,可能跟用户自己代码中的Akka配置冲突。
  3. Spark用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说少很多,debug比较容易。如果遇到什么bug,也可以自己马上fix,不需要等Akka上游发布新版本。而且,Spark升级Akka本身又因为第一点会强制要求用户升级他们使用的Akka,对于某些用户来说是不现实的。

参考:https://www.zhihu.com/question/61638635

RpcEnv

Rpc环境,RpcEndpoint 需要在 RpcEnv 中注册一个名称来接收消息。

先看源码中是如何创建 RpcEnv

  val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)def create(name: String,host: String,port: Int,conf: SparkConf,securityManager: SecurityManager,clientMode: Boolean = false): RpcEnv = {create(name, host, host, port, conf, securityManager, 0, clientMode)}def create(name: String,bindAddress: String,advertiseAddress: String,port: Int,conf: SparkConf,securityManager: SecurityManager,numUsableCores: Int,clientMode: Boolean): RpcEnv = {val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,numUsableCores, clientMode)new NettyRpcEnvFactory().create(config)}

本次测试用的代码

  def createRpcEnv(conf: SparkConf,name: String,port: Int,clientMode: Boolean = false): RpcEnv = {val config = RpcEnvConfig(conf, name, "localhost", "localhost", port,new SecurityManager(conf), 0, clientMode)new NettyRpcEnvFactory().create(config)}

clientMode: 是否客户端模式,默认false,默认会启动一个 NettyServer,具体在 TransportServer.init 中实现,可参考上篇文章。如果设置为true,则不启动服务,只作为一个客户端。
port: 为0时,会随机绑定一个端口号,这一点是Netty本身实现的,如果非0,则按照指定的端口绑定,但是要求端口号范围为[1024,65536),如果端口已占用,则尝试端口号+1,默认重试16次,可以通过配置 spark.port.maxRetries 修改最大重试次数 。

RpcEndpoint

很多都是RpcEndpoint的子类,如:MasterWorkerClientEndpointDriverEndpointCoarseGrainedExecutorBackendYarnCoarseGrainedExecutorBackendYarnDriverEndpointYarnSchedulerEndpoint 等。

RpcEndpoint 的生命周期:constructor -> onStart -> receive* -> onStop 。也就是首先会调用 onStart 方法。

RpcEndpoint 首先必须通过调用 rpcEnv.setupEndpoint 才能使用
setupEndpoint 使用名称注册 RpcEndpoint 并返回其 RpcEndpointRef

def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

RpcEndpointRef

RpcEndpointRef:远程 RpcEndpoint 的引用。RpcEndpointRef 是线程安全的

有两种方法可以返回RpcEndpointRef 一个是上面提到的setupEndpoint,另外一个则是 setupEndpointRef

  /*** Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.*/def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]/*** Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.*/def setupEndpointRefByURI(uri: String): RpcEndpointRef = {defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))}/*** Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`.* This is a blocking action.*/def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)}

setupEndpoint 返回的是本地 RpcEndpoint 的引用,主要作用是使用名称注册
setupEndpointRef 根据远程地址和名称返回 RpcEndpoint 的引用。例如:

// Worer 中返回 Master 的引用
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// CoarseGrainedExecutorBackend 中获取 Driver 的引用
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl)

方法调用

  • rpcEnv.setupEndpoint : 调用 rpcEndpoint.onStart
  • rpcEndpointRef.send(没有返回值) : 调用 rpcEndpoint.receive
  • rpcEndpointRef.ask*(有返回值): 调用 rpcEndpoint.receiveAndReply (rpcEndpointRef.ask* 最终都是在 NettyRpcEnv.askAbortable中实现)
  • rpcEnv.stop(rpcEndpointRef) : 调用 rpcEndpoint.onStop

测试代码

完整代码:https://gitee.com/dongkelun/java-learning/tree/master/scala-learning/src/main/scala/org/apache/spark/rpc

本地测试

package org.apache.spark.rpcimport org.apache.spark.{SparkConf, SparkEnv}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}import scala.concurrent.duration._object RpcLocalTest extends RpcParent {def main(args: Array[String]): Unit = {val conf = new SparkConf()val env = createRpcEnv(conf, "local", 8000)@volatile var message: String = nullval rpcEndpointRef = env.setupEndpoint("send-locally", new RpcEndpoint {override val rpcEnv = envoverride def onStart(): Unit = {println("start hello endpoint")}override def receive = {case msg: String =>println(msg)message = msg}})rpcEndpointRef.send("hello")eventually(timeout(5.seconds), interval(10.milliseconds)) {assert("hello" == message)}if (env != null) {env.shutdown()}SparkEnv.set(null)}
}

远程测试

RpcRemoteServer

package org.apache.spark.rpcimport org.apache.spark.SparkConfimport java.util.concurrent.CountDownLatchobject RpcRemoteServer extends RpcParent {def main(args: Array[String]): Unit = {val shutdownLatch = new CountDownLatch(1)val env = createRpcEnv(new SparkConf(), "local", 8000)println(s"地址:${env.address}")env.setupEndpoint("ask-remotely", new RpcEndpoint {override val rpcEnv = envoverride def onStart(): Unit = {println("onStart 被调用")}override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case msg: String =>context.reply(msg)}})shutdownLatch.await()}
}

RpcRemoteTest

package org.apache.spark.rpcimport org.apache.spark.SparkConfimport java.util.concurrent.CountDownLatchobject RpcRemoteTest extends RpcParent {def main(args: Array[String]): Unit = {val shutdownLatch = new CountDownLatch(1)val anotherEnv = createRpcEnv(new SparkConf(), "remote-client", 0, clientMode = true)println(s"地址:${anotherEnv.address}")val rpcEndpointRef = anotherEnv.setupEndpointRef(new RpcAddress("localhost", 8000), "ask-remotely")val reply = rpcEndpointRef.askSync[String]("hello Remote")println(reply)shutdownLatch.await()}
}

这里需要注意 RpcRemoteServerRpcEndpoint 的名称为 ask-remotely ,我们在 RpcRemoteTest 中不仅需要对应的IP、端口号,而且名称也一定要对应准确。


更多测试

完整代码:https://gitee.com/dongkelun/java-learning/blob/master/scala-learning/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

如测试 onStart 和 onStop

  test("onStart and onStop") {val stopLatch = new CountDownLatch(1)val calledMethods = mutable.ArrayBuffer[String]()val endpoint = new RpcEndpoint {override val rpcEnv = envoverride def onStart(): Unit = {println("onStart 被调用")calledMethods += "start"}override def receive: PartialFunction[Any, Unit] = {case msg: String =>}override def onStop(): Unit = {println("onStop 被调用")calledMethods += "stop"
//        stopLatch.countDown()}}println("调用setupEndpoint前")val rpcEndpointRef = env.setupEndpoint("start-stop-test", endpoint)println("调用setupEndpoint后")println("调用stop前")env.stop(rpcEndpointRef)println("调用stop后")stopLatch.await(10, TimeUnit.SECONDS)assert(List("start", "stop") === calledMethods)}

总结

  • 首先用 RpcEnv.create 创建 RpcEnv,这里底层会通过 Netty 创建一个 Server, 绑定对应的端口,这里也可以只使用客户端模式不创建 Server
  • 然后具体通信的实体类是在 RpcEndpoint 中实现,比如 MasterWorker 等都是 RpcEndpoint, RpcEndpoint 首先必须通过调用 rpcEnv.setupEndpoint 才能使用。
  • RpcEndpoint 的方法调用都是通过它对应引用 RpcEndpointRef 实现, rpcEnv.setupEndpoint 会返回本地引用,setupEndpointRef 根据远程地址和名称返回远程 RpcEndpoint 的引用,注意这里名称一定要对应准确。
  • RpcEndpoint 的方法调用顺序 onStart -> receive* -> onStop ,其中 onStart 做一些初始化的准备,setupEndpoint 会触发 onStart 方法;receive 方法没有返回值,receiveAndReply 方法有返回值,分别通过 rpcEndpointRef.sendrpcEndpointRef.ask* 触发,ask方法分同步调用和异步调用;而 onStop 则处理服务停止后的操作,通过 rpcEnv.stop 触发。

http://www.ppmy.cn/ops/150525.html

相关文章

Go语言编译的exe文件占用内存过大解决办法

Go 语言编译好的exe文件, 没有界面,执行后直接打开cmd运行,发现内存占用100M左右,为了保证程序代码正常吗,随便的一个最简单的代码如下,占用内存还是很大: package mainimport ("fmt" )func mai…

MCU中实时时钟(RTC)和普通定时器有什么区别

所谓实时时钟,只是一种称谓而已,也算是一种普通的时钟吧,不过加有晶振等电路,比较稳定,功能较多如日历,普通的时钟有一些是用元器件做振荡电路或用50HZ市电做频率源,不够稳定。 何为实时时钟RTC…

深入浅出 Android AES 加密解密:从理论到实战

深入浅出 Android AES 加密解密:从理论到实战 在现代移动应用中,数据安全是不可忽视的一环。无论是用户隐私保护,还是敏感信息的存储与传输,加密技术都扮演着重要角色。本文将以 AES(Advanced Encryption Standard&am…

使用PWM生成模式驱动BLDC三相无刷直流电机

引言 在 TI 的无刷直流 (BLDC) DRV8x 产品系列使用的栅极驱动器应用中,通常使用一些控制模式来切换MOSFET 开关的输出栅极。这些控制模式包括:1x、3x、6x 和独立脉宽调制 (PWM) 模式。   不过,DRV8x 产品系列(例如 DRV8311&…

使用 Python 操作 MySQL 数据库

💖 欢迎来到我的博客! 非常高兴能在这里与您相遇。在这里,您不仅能获得有趣的技术分享,还能感受到轻松愉快的氛围。无论您是编程新手,还是资深开发者,都能在这里找到属于您的知识宝藏,学习和成长…

在 WSL Ubuntu 上安装 ProxySQL 并配置 主从同步,读写分离,延迟检测

✅ 在 WSL Ubuntu 上安装 ProxySQL 并配置 MySQL 主从同步、读写分离、延迟检测 这是一个详细的教程,涵盖以下内容: 安装 MySQL(主库和从库)配置 MySQL 主从同步安装 ProxySQL配置 ProxySQL 实现读写分离配置 ProxySQL 延迟检测…

Gitlab Runner安装与配置

由于格式和图片解析问题,为了更好阅读体验可前往 阅读原文 本篇使用Docker安装Gitlab runner进行runner的安装和注册,其他方式请参考官方文档非Docker安装Gitlab runner 请确保runner版本和gitlab版本兼容以及docker相关版本兼容问题 下载镜像 docker p…

IOS工程师

IOS工程师 职位描述 负责程序的开发测试调试工作。 负责后期维护 任职要求 有扎实的Objective-C/Swift语言基础 熟悉IOS开发技术,包括UI、网络等方面 熟悉IOS开发工具和相关开发测试工具的使用 熟悉多线程和网络编程,对高性能程序设计、架构有较多…