JobGraph组成
JobGraph主要是StreamGraph经过优化后生成的,主要优化的就是对符合条件节点进行chain,这样可以减少数据流动的序列化和传输。
JobGraph主要由三部分组成。
- JobVertex:图的顶点。输入是一个JobEdge,输出是IntermediateDataSet。它可以对应多个StreamNode,将多个operator合并到一起。
- IntermediateDataSet:中间结果集。是JobVertex处理后生成的结果集,为了方便下游复用,producer 是 JobVertex ,consumer 是 JobEdge。
- JobEdge:边。JobGraph的传输管道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex 。
JobVertex
- operatorIDs:该 job 节点包含的所有 operator ids,以深度优先方式存储 ids
- results:job 节点计算出的中间结果
- inputs:输入数据的边列表
IntermediateDataSet
- producer:生产者,JobVertex
- consumers:消费边,可以对应多个,但是必须具有相同的分区器和并行性
- resultType:运行时使用的分区类型
- BLOCKING 阻塞,批处理模式
- PIPELINED 管道非阻塞,流处理模式
JobEdge
- target:edge的输出,JobVertex
- source:edge的源,IntermediateDataSet
- distributionPattern:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
- ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
- POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务
JobGraph生成
入口是在StreamingJobGraphGenerator的createJobGraph方法
createJobGraph过程比较多,重点是三步:
- 为各个StreamNode生成hash值,这样在故障恢复的时候可以识别
- 生成JobVertex
- 生成JobEdge、IntermediateDataSet
生成JobVertex(setChaining)
从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例。
buildChainedInputsAndGetHeadInputs会得到chain的起点集合,然后遍历进行createChain
buildChainedInputsAndGetHeadInputs
private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {// 可以chain的source,单独处理这种节点final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();// chain的起点(不能chain的souce节点、可以chain的souce节点的下一个节点)final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();// 遍历streamGraph的所有source nodefor (Integer sourceNodeId : streamGraph.getSourceIDs()) {final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory&& sourceNode.getOutEdges().size() == 1) {// 要求source node的outEdge只有一个。有多个出边的source不能chain// as long as only NAry ops support this chaining, we need to skip the other partsfinal StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());final ChainingStrategy targetChainingStrategy =target.getOperatorFactory().getChainingStrategy();if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES&& isChainableInput(sourceOutEdge, streamGraph)) {final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));final StreamConfig.SourceInputConfig inputConfig =new StreamConfig.SourceInputConfig(sourceOutEdge);final StreamConfig operatorConfig = new StreamConfig(new Configuration());setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap());setOperatorChainedOutputsConfig(operatorConfig, Collections.emptyList());// we cache the non-chainable outputs here, and set the non-chained config lateropNonChainableOutputsCache.put(sourceNodeId, Collections.emptyList());// sources的index都是0operatorConfig.setChainIndex(0); // sources are always firstoperatorConfig.setOperatorID(opId);operatorConfig.setOperatorName(sourceNode.getOperatorName());chainedSources.put(sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));final SourceOperatorFactory<?> sourceOpFact =(SourceOperatorFactory<?>) sourceNode.getOperatorFactory();final OperatorCoordinator.Provider coord =sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);// chainEntryPoints中添加(targetNodeId, chainInfo)final OperatorChainInfo chainInfo =chainEntryPoints.computeIfAbsent(sourceOutEdge.getTargetId(),(k) ->new OperatorChainInfo(sourceOutEdge.getTargetId(),hashes,legacyHashes,chainedSources,streamGraph));chainInfo.addCoordinatorProvider(coord);chainInfo.recordChainedNode(sourceNodeId);continue;}}chainEntryPoints.put(sourceNodeId,new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));}return chainEntryPoints;
}
createChain
在创建chain的过程中,一个chain完成后,在头结点创建一个JobVertex。
private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {Integer startNodeId = chainInfo.getStartNodeId();if (!builtVertices.contains(startNodeId)) {// transitiveOutEdges 过渡的出边集合,就是两个StreamNode不能再进行chain的那条边,用于生成JobEdgeList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();// chainableOutputs 两个StreamNode可以进行chain的出边集合// nonChainableOutputs 两个StreamNode不能进行chain的出边List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);} else {nonChainableOutputs.add(outEdge);}}for (StreamEdge chainable : chainableOutputs) {// 如果存在可以chain的边,那么就继续往这条边的target operator进行chain。// transitiveOutEdges最终返回给首次调用栈的是不能再继续chain的那条边transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex + 1,chainInfo,chainEntryPoints));}for (StreamEdge nonChainable : nonChainableOutputs) {//如果存在了不可chain的边,说明该边就是StreamNode chain之间的过渡边,添加到transitiveOutEdges中,//继续对该边的target StreamNode进行新的createChain操作,意味着一个新的chaintransitiveOutEdges.add(nonChainable);createChain(nonChainable.getTargetId(),1, // operators start at position 1 because 0 is for chained source inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k) -> chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints);}chainedNames.put(currentNodeId,createChainedName(currentNodeId,chainableOutputs,Optional.ofNullable(chainEntryPoints.get(currentNodeId))));chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));chainedPreferredResources.put(currentNodeId,createChainedPreferredResources(currentNodeId, chainableOutputs));// 添加当前的StreamNode到chain中OperatorID currentOperatorId =chainInfo.addNodeToChain(currentNodeId,streamGraph.getStreamNode(currentNodeId).getOperatorName());if (currentNode.getInputFormat() != null) {getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());}if (currentNode.getOutputFormat() != null) {getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());}// chain的头结点创建JobVertexStreamConfig config =currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, chainInfo): new StreamConfig(new Configuration());tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());setOperatorChainedOutputsConfig(config, chainableOutputs);// we cache the non-chainable outputs here, and set the non-chained config later// 缓存不能chain的出边集合opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);if (currentNodeId.equals(startNodeId)) {// 头结点chainInfo.setTransitiveOutEdges(transitiveOutEdges);chainInfos.put(startNodeId, chainInfo);config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));} else {chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());config.setChainIndex(chainIndex);StreamNode node = streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());chainedConfigs.get(startNodeId).put(currentNodeId, config);}config.setOperatorID(currentOperatorId);if (chainableOutputs.isEmpty()) {// chain尾节点config.setChainEnd();}return transitiveOutEdges;} else {return new ArrayList<>();}
}
判断是否chainable
- 公用一个slotGroup
- 上下游operator可以chain
- partitioner和exchangeMode可以chain(forward)
- 并行度一样
- 允许chain
- 不能是联合操作
createJobVertex
- 创建对应的operator集合
- 创建JobVertex(InputOutputFormatVertex是一种特殊的 JobVertex,它用于处理输入输出格式相关的任务,例如读取和写入文件、数据库等)
- 添加对应的上游数据集
- 缓存JobVertex相关信息
生成JobEdge、IntermediateDataSet(setAllVertexNonChainedOutputsConfigs)
遍历jobVertices,调用connect连接起来。
connect
将两个JobVertex(headVertex、downStreamVertex)连接起来。关键方法是downStreamVertex.connectNewDataSetAsInput
connectNewDataSetAsInput
创建IntermediateDataSet和JobEdge,形成JobVertex->IntermediateDataSet->JobEdge->JobVertex