Flink源码之StreamTask启动流程

news/2025/2/12 8:19:45/

每个ExecutionVertex分配Slot后,JobMaster就会向Slot所在的TaskExecutor提交RPC请求执行Task,接口为TaskExecutorGateway::submitTask

CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout); 

TaskDeploymentDescriptor 中包含当前Task的执行逻辑、Job信息、输入输出信息
在这里插入图片描述

submitTask 方法核心就是构造org.apache.flink.runtime.taskmanager.Task实例,该实例继承自Runnable接口,有个Thread成员变量,构造完成后就启动线程执行Task逻辑。

TaskExecutor::submitTask
Task.startTaskThread
Task.run
Task.doRun
Task::setupPartitionsAndGates //初始化Task的输入输出
RuntimeEnvironment::new //封装task执行上下文信息
Task::loadAndInstantiateInvokable //TaskInvokables实例化
StreamTask::newStreamTask::createRecordWriterDelegate //创建Writer,为每个StreamEdge创建一个WriterStreamTask::createStateBackend //创建StateBackend,一个task一个StateBackend实例StreamTask::createCheckpointStorageSubtaskCheckpointCoordinatorImpl::new 
Task::restoreAndInvoke
TaskInvokable::restore 
TaskInvokable::invoke //处理输入元素
TaskInvokable::cleanUp

Task的Invokable Class是在StreamGraph中添加Operator形成StreamNode时确定的,对不同的算子有不同的InvokableClass:

  • SourceStreamTask.class //LegacySource算子
  • SourceOperatorStreamTask //Source算子
  • OneInputStreamTask.class //输入是一个算子
  • TwoInputStreamTask:class //输入是两个算子
  • MultipleInputStreamTask.class //输入有多个算子

以上这些类都继承自org.apache.flink.streaming.runtime.tasks.StreamTask

在这里插入图片描述

在调用TaskInvokable::restore时会执行:

StreamTask::restore
StreamTask::restoreInternal //创建OperatorChain
RegularOperatorChain::new
OperatorChain::new
OperatorChain::createOutputCollector
OperatorChain::createOperatorChain
OperatorChain::createOperator
StreamOperatorFactoryUtil.createOperator  //创建Operator,在每个算子的StreamConfig中定义了每个Operator具体类型,比如StreamMap, StreamFlatMap
SimpleOperatorFactory::createStreamOperator //创建StreamOperator包装了用户函数,, StreamOperator包装了代码中用户函数,会调用用户函数中的open/close等生命周期函数AbstractUdfStreamOperator::setupAbstractStreamOperator::setup //设置用用自定义函数中的RuntimeContext成员变量StreamingRuntimeContext::new  //StreamTask::init //子类做初始化,创建InputGate、StreamTaskInput、DataOutput及InputProcessor
StreamTask::restoreGatesStreamTask::createStreamTaskStateInitializerStreamTaskStateInitializerImpl::new //OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeState StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::newAbstractUdfStreamOperator::initializeState//调用用户定义函数中的initializeState方法,可获取Operator StateStreamingFunctionUtils::restoreFunctionStateStreamingRuntimeContext::setKeyedStateStoreStreamOperator::open //调用getRuntimeContext().getState可获取keySate
StreamTask::invoke
StreamTask::runMailboxLoop
MailboxProcessor::runMailboxLoop
StreamTask::processInput

整个过程在StreamTask.java的注释中有说明:

 * -- invoke()*       |*       +----> Create basic utils (config, etc) and load the chain of operators*       +----> operators.setup()*       +----> task specific init()*       +----> initialize-operator-states()*       +----> open-operators()*       +----> run()*       +----> finish-operators()*       +----> close-operators()*       +----> common cleanup*       +----> task specific cleanup()
  1. 首先创建OperatorChain,依次创建出每个StreamOperator
  2. 调用Operator的setup方法,初始化StreamingRuntimeContext
  3. 调用子类init方法初始化
  4. 调用initializeState初始化每个算子的状态,此时会为每个StreamOperator创建keyedStatedBackend和operatorStateBackend,然后会调用用户定义函数中的initializeState方法,用于创建Operator State
  5. 调用算子的open方法,便于用户在自定义函数open中进行初始化,比如初始化keyState
  6. 调用processInput处理流中数据

SourceStreamTask重载了StreamTask::processInput,该方法中直接起一个线程调用SourceFunction::run方法。

OneInputStreamTask则不同,它重载了StreamTask的init方法,在init方法中创建了StreamOneInputProcessor

OneInputStreamTask::init
OneInputStreamTask::createCheckpointedInputGate
OneInputStreamTask::createDataOutput //创建StreamTaskNetworkOutput
OneInputStreamTask::createTaskInput //创建StreamTaskNetworkInput
StreamOneInputProcessor::new

在StreamTask::processInput则是调用InputProcessor::processInput不断读取数据进行处理

StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理
AbstractStreamTaskNetworkInput::processElementStreamTaskNetworkOutput::emitRecord //调用operator的setKeyContextElement和processElementOneInputStreamOperator::setKeyContextElementAbstractStreamOperator::setKeyContextElement1AbstractStreamOperator::setCurrentKey //StreamOperatorStateHandler::setCurrentKey //设置状态当前keyInput::processElement  //调用StreamOperator的processElement方法

以上Task从提交到起线程执行起来的整个过程,在初始化过程中为每个StreamOperator进行状态后端的初始化相当重要,后续处理流的过程中会使用这些状态后端存储管理状态。


http://www.ppmy.cn/news/1032922.html

相关文章

如何使用CSS实现一个下拉菜单?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 使用CSS实现下拉菜单⭐ HTML 结构⭐ CSS 样式⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些…

如何基于 ACK Serverless 快速部署 AI 推理服务

作者&#xff1a;元毅 随着 AI 浪潮的到来&#xff0c;各种 AI 应用层出不穷&#xff0c;众所周知 AI 应用对 GPU 资源强烈依赖&#xff0c;但 GPU 很昂贵&#xff0c;如何降低 GPU 资源使用成本成为用户首要问题。而 AI 与 Serverless 技术结合&#xff0c;完全可以达到按需使…

pytorch基础实践2

文章目录 tensor操作Reshape 操作Flatten 操作Concatenating 操作&#xff08;级联&#xff09;高阶张量的flattenelement-wise operationsArithmetic Operations(算术操作&#xff09;Broadcasting Tensors&#xff08;广播机制&#xff09;Comparison OperationsElement-wise…

AI 实力:利用 Docker 简化机器学习应用程序的部署和可扩展性

利用 Docker 的强大功能&#xff1a;简化部署解决方案、确保可扩展性并简化机器学习模型的 CI/CD 流程。 近年来&#xff0c;机器学习 (ML) 出现了爆炸性增长&#xff0c;导致对健壮、可扩展且高效的部署方法的需求不断增加。由于训练和服务环境之间的差异或扩展的困难等因素&a…

【Vue-Router】别名

后台返回来的路径名不合理&#xff0c;但多个项目在使用中了&#xff0c;不方便改时可以使用别名。可以有多个或一个。 First.vue <template><h1>First Seciton</h1> </template>Second.vue&#xff0c;Third.vue代码同理 UserSettings.vue <tem…

【佳佳怪文献分享】安全人机交互的学习责任分配与自动驾驶应用

标题&#xff1a;Learning Responsibility Allocations for Safe Human-Robot Interaction with Applications to Autonomous Driving 作者&#xff1a;Ryan K. Cosner, Yuxiao Chen, Karen Leung, and Marco Pavone 来源&#xff1a;2023 IEEE International Conference on …

中国生产了5.07亿台,库存高达近4亿台?国产手机彻底卖不动了?

统计数据显示今年上半年中国的手机产量达到5.07亿台&#xff0c;国内市场手机出货量仅有1.24亿台&#xff0c;都出现了下滑&#xff0c;那么中国手机的产量比销量多出了3.83亿台&#xff0c;这些手机都成为了库存&#xff1f; 中国手机市场确实不如早年那么辉煌&#xff0c;201…

【JS 线性代数算法之向量与矩阵】

线性代数算法 一、向量的加减乘除1. 向量加法2. 向量减法3. 向量数乘4. 向量点积5. 向量叉积 二、矩阵的加减乘除1. 矩阵加法2. 矩阵减法3. 矩阵数乘4. 矩阵乘法 常用数学库 线性代数是数学的一个分支&#xff0c;用于研究线性方程组及其解的性质、向量空间及其变换的性质等。在…