flink JobGraph解析

ops/2025/2/11 16:53:38/

JobGraph组成

JobGraph主要是StreamGraph经过优化后生成的,主要优化的就是对符合条件节点进行chain,这样可以减少数据流动的序列化和传输。

JobGraph主要由三部分组成。

  • JobVertex:图的顶点。输入是一个JobEdge,输出是IntermediateDataSet。它可以对应多个StreamNode,将多个operator合并到一起。
  • IntermediateDataSet:中间结果集。是JobVertex处理后生成的结果集,为了方便下游复用,producer 是 JobVertex ,consumer 是 JobEdge。
  • JobEdge:边。JobGraph的传输管道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex 。

JobVertex

  • operatorIDs:该 job 节点包含的所有 operator ids,以深度优先方式存储 ids
  • results:job 节点计算出的中间结果
  • inputs:输入数据的边列表

IntermediateDataSet

  • producer:生产者,JobVertex
  • consumers:消费边,可以对应多个,但是必须具有相同的分区器和并行性
  • resultType:运行时使用的分区类型
    • BLOCKING 阻塞,批处理模式
    • PIPELINED 管道非阻塞,流处理模式

JobEdge

  • target:edge的输出,JobVertex
  • source:edge的源,IntermediateDataSet
  • distributionPattern:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
    • ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
    • POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务

JobGraph生成

入口是在StreamingJobGraphGenerator的createJobGraph方法

createJobGraph过程比较多,重点是三步:

  1. 为各个StreamNode生成hash值,这样在故障恢复的时候可以识别
  2. 生成JobVertex
  3. 生成JobEdge、IntermediateDataSet

生成JobVertex(setChaining

从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例。

buildChainedInputsAndGetHeadInputs会得到chain的起点集合,然后遍历进行createChain

buildChainedInputsAndGetHeadInputs

private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {// 可以chain的source,单独处理这种节点final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();// chain的起点(不能chain的souce节点、可以chain的souce节点的下一个节点)final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();// 遍历streamGraph的所有source nodefor (Integer sourceNodeId : streamGraph.getSourceIDs()) {final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory&& sourceNode.getOutEdges().size() == 1) {// 要求source node的outEdge只有一个。有多个出边的source不能chain// as long as only NAry ops support this chaining, we need to skip the other partsfinal StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());final ChainingStrategy targetChainingStrategy =target.getOperatorFactory().getChainingStrategy();if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES&& isChainableInput(sourceOutEdge, streamGraph)) {final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));final StreamConfig.SourceInputConfig inputConfig =new StreamConfig.SourceInputConfig(sourceOutEdge);final StreamConfig operatorConfig = new StreamConfig(new Configuration());setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap());setOperatorChainedOutputsConfig(operatorConfig, Collections.emptyList());// we cache the non-chainable outputs here, and set the non-chained config lateropNonChainableOutputsCache.put(sourceNodeId, Collections.emptyList());// sources的index都是0operatorConfig.setChainIndex(0); // sources are always firstoperatorConfig.setOperatorID(opId);operatorConfig.setOperatorName(sourceNode.getOperatorName());chainedSources.put(sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));final SourceOperatorFactory<?> sourceOpFact =(SourceOperatorFactory<?>) sourceNode.getOperatorFactory();final OperatorCoordinator.Provider coord =sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);// chainEntryPoints中添加(targetNodeId, chainInfo)final OperatorChainInfo chainInfo =chainEntryPoints.computeIfAbsent(sourceOutEdge.getTargetId(),(k) ->new OperatorChainInfo(sourceOutEdge.getTargetId(),hashes,legacyHashes,chainedSources,streamGraph));chainInfo.addCoordinatorProvider(coord);chainInfo.recordChainedNode(sourceNodeId);continue;}}chainEntryPoints.put(sourceNodeId,new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));}return chainEntryPoints;
}

createChain

在创建chain的过程中,一个chain完成后,在头结点创建一个JobVertex。

private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {Integer startNodeId = chainInfo.getStartNodeId();if (!builtVertices.contains(startNodeId)) {// transitiveOutEdges 过渡的出边集合,就是两个StreamNode不能再进行chain的那条边,用于生成JobEdgeList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();// chainableOutputs 两个StreamNode可以进行chain的出边集合// nonChainableOutputs 两个StreamNode不能进行chain的出边List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);} else {nonChainableOutputs.add(outEdge);}}for (StreamEdge chainable : chainableOutputs) {// 如果存在可以chain的边,那么就继续往这条边的target operator进行chain。// transitiveOutEdges最终返回给首次调用栈的是不能再继续chain的那条边transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex + 1,chainInfo,chainEntryPoints));}for (StreamEdge nonChainable : nonChainableOutputs) {//如果存在了不可chain的边,说明该边就是StreamNode chain之间的过渡边,添加到transitiveOutEdges中,//继续对该边的target StreamNode进行新的createChain操作,意味着一个新的chaintransitiveOutEdges.add(nonChainable);createChain(nonChainable.getTargetId(),1, // operators start at position 1 because 0 is for chained source inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k) -> chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints);}chainedNames.put(currentNodeId,createChainedName(currentNodeId,chainableOutputs,Optional.ofNullable(chainEntryPoints.get(currentNodeId))));chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));chainedPreferredResources.put(currentNodeId,createChainedPreferredResources(currentNodeId, chainableOutputs));// 添加当前的StreamNode到chain中OperatorID currentOperatorId =chainInfo.addNodeToChain(currentNodeId,streamGraph.getStreamNode(currentNodeId).getOperatorName());if (currentNode.getInputFormat() != null) {getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());}if (currentNode.getOutputFormat() != null) {getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());}// chain的头结点创建JobVertexStreamConfig config =currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, chainInfo): new StreamConfig(new Configuration());tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());setOperatorChainedOutputsConfig(config, chainableOutputs);// we cache the non-chainable outputs here, and set the non-chained config later// 缓存不能chain的出边集合opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);if (currentNodeId.equals(startNodeId)) {// 头结点chainInfo.setTransitiveOutEdges(transitiveOutEdges);chainInfos.put(startNodeId, chainInfo);config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));} else {chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());config.setChainIndex(chainIndex);StreamNode node = streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());chainedConfigs.get(startNodeId).put(currentNodeId, config);}config.setOperatorID(currentOperatorId);if (chainableOutputs.isEmpty()) {// chain尾节点config.setChainEnd();}return transitiveOutEdges;} else {return new ArrayList<>();}
}

判断是否chainable

  • 公用一个slotGroup
  • 上下游operator可以chain
  • partitioner和exchangeMode可以chain(forward)
  • 并行度一样
  • 允许chain
  • 不能是联合操作

createJobVertex

  1. 创建对应的operator集合
  2. 创建JobVertex(InputOutputFormatVertex是一种特殊的 JobVertex,它用于处理输入输出格式相关的任务,例如读取和写入文件、数据库等)
  3. 添加对应的上游数据集
  4. 缓存JobVertex相关信息

生成JobEdge、IntermediateDataSet(setAllVertexNonChainedOutputsConfigs)

遍历jobVertices,调用connect连接起来。

connect

将两个JobVertex(headVertex、downStreamVertex)连接起来。关键方法是downStreamVertex.connectNewDataSetAsInput

connectNewDataSetAsInput

创建IntermediateDataSet和JobEdge,形成JobVertex->IntermediateDataSet->JobEdge->JobVertex


http://www.ppmy.cn/ops/157563.html

相关文章

STL函数算法笔记

STL函数算法笔记 今天我们来学习的是STL库中的一些函数。首先,STL这个东西大家一定非常熟悉,里面很多的数据结构都帮了大家不少忙,那么今天我们就来说几个重要的数据结构。 向量 向量,也就是数据结构vector,你也可以称之为动态数组,本质跟数组差不多,只不过有一些好处…

DeepSeek R1技术报告关键解析(8/10):DeepSeek-R1 的“aha 时刻”,AI 自主学习的新突破

1. 什么是 AI 的“aha 时刻”&#xff1f; 在强化学习过程中&#xff0c;AI 的推理能力并不是线性增长的&#xff0c;而是会经历一些关键的“顿悟”时刻&#xff0c;研究人员将其称为“aha 时刻”。 这是 AI 在训练过程中突然学会了一种新的推理方式&#xff0c;或者能够主动…

伺服使能的含义解析

前言&#xff1a; 大家好&#xff0c;我是上位机马工&#xff0c;硕士毕业4年年入40万&#xff0c;目前在一家自动化公司担任软件经理&#xff0c;从事C#上位机软件开发8年以上&#xff01;我们在开发C#的运动控制程序的时候&#xff0c;一个必要的步骤就是对伺服上使能&#…

LabVIEW2025中文版软件安装包、工具包、安装教程下载

下载链接&#xff1a;LabVIEW及工具包大全-三易电子工作室http://blog.eeecontrol.com/labview6666 《LabVIEW2025安装图文教程》 1、解压后&#xff0c;双击install.exe安装 2、选中“我接受上述2条许可协议”&#xff0c;点击下一步 3、点击下一步&#xff0c;安装NI Packa…

【多线程-第三天-NSOperation和GCD的区别 Objective-C语言】

一、我们来看NSOperation和GCD的区别 1.我们来对比一下,NSOperation和GCD, 那这个代码,我们都写过了, 我们来看一下它们的特点啊,首先来看GCD, 1)GCD是C语言的框架,是iOS4.0之后推出的,并且它的特点是,针对多核做了优化,可以充分利用CPU的多核,OK,这是GCD, 2…

Groovy语言的物联网

Groovy语言在物联网中的应用 引言 物联网&#xff08;Internet of Things, IoT&#xff09;是指通过各种信息传感设备与互联网结合&#xff0c;实现物与物之间的信息交流和智能化的网络。随着物联网技术的快速发展&#xff0c;越来越多的编程语言和平台被应用到物联网设备的开…

解锁 CSS Grid 高级技巧:提升网页布局灵活性的秘诀

系列文章目录 01-从零开始学CSS选择器&#xff1a;属性选择器与伪类选择器完全指南 02-避免样式冲突&#xff1a;掌握CSS选择器优先级与层叠规则的终极指南 03-如何精确掌控网页布局&#xff1f;深入解析 CSS 样式与盒模型 04-CSS 布局全面解析&#xff1a;从传统浮动到现代 F…

Unity 打造游戏资源加密解密系统详解

在游戏开发中&#xff0c;保护游戏资源不被轻易破解和盗用至关重要。本文将详细介绍如何在 Unity 中打造一个游戏资源加密解密系统&#xff0c;并提供技术详解和代码实现。 一、加密方案选择 1.1 对称加密 优点: 加密解密速度快&#xff0c;适合加密大量数据。 缺点: 密钥管…