SeaTunnel Zeta 引擎启动流程分析(一)

news/2024/11/25 17:00:45/

概述

本次源码解读基于 v2.3.4 版本

v2.3.4 官方文档:About SeaTunnel Engine | Apache SeaTunnel

SeaTunnel 支持 Flink、Spark 引擎启动,也支持自身的 Zeta 引擎启动。

自身的 Zeta 引擎基于 hazelcast 作为分布式集群控制,支持单机、集群运行,支持自治集群(去中心化),省去了用户为 SeaTunnel Engine 集群指定 Master 节点的麻烦,因为它在运行过程中可以自行选择一个 Master 节点,当 Master 节点发生故障时,会自动选择新的 Master 节点。作为 SeaTunnel 的默认引擎,它支持高吞吐、低延迟、强一致性的同步作业操作,速度更快、更稳定、更节省资源、使用方便。

这篇文章我们先大致了解下 SeaTunnel 的启动流程中都干了些什么,后面的文章中我们在慢慢一步一步深入了解

题外话

hazelcast 是一个开源的分布式内存级别的缓存数据库,当然,也可以使用它的各种 API 来实现自己的分布式集群效果,它不需要依赖 Zookeeper 这类额外的中间件来协助 Master 节点选举,使用起来也简单,SeaTunnel 基于它封装了自己的 Zeta 引擎,轻量而且宕机恢复能力也很强,启动速度也极快。(有兴趣的同学可以认真研究下 SeaTunnel 的 Zeta 引擎,参考 Zeta 引擎集成 hazelcast 的方法,做一些自己的分布式集群的小项目,hazelcast 是真的小而强悍!)

废话不多说,开始吧!

启动流程分析

启动脚本找启动类

首先,SeaTunnel 为我们提供了集群启动时使用的启动脚本 bin/seatunnel-cluster.sh,打开这个脚本,大概在 40 行左右,可以看到 APP_MAIN 这个变量,定义了 jvm 启动的入口类是 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer

APP_JAR=${APP_DIR}/starter/seatunnel-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"
OUT="${APP_DIR}/logs/seatunnel-server.out"

从启动类出发

打开 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer,里面代码很简单,也就解析命令行传入的参数,构建 Server 实例并启动

java">// org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServerpublic class SeaTunnelServer {public static void main(String[] args) throws CommandException {ServerCommandArgs serverCommandArgs =CommandLineUtils.parse(args,new ServerCommandArgs(),EngineType.SEATUNNEL.getStarterShellName(),true);SeaTunnel.run(serverCommandArgs.buildCommand());}
}

serverCommandArgs.buildCommand() 返回的是 ServerExecuteCommand 类的实例,SeaTunnel.run 执行的是 ServerExecuteCommand 类实例的 execute 方法

所有我们接下来进入 ServerExecuteCommandexecute 方法

java">// org.apache.seatunnel.core.starter.seatunnel.command.ServerExecuteCommand@Overridepublic void execute() {// 读取配置文件SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());}// 创建Hazelcast实例并启动HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),Thread.currentThread().getName(),new SeaTunnelNodeContext(seaTunnelConfig));}

看着也不复杂,读取配置文件,创建 Hazelcast 实例并启动

其实关键就是在给 HazelcastInstanceFactory.newHazelcastInstance 传入的最后一个参数 new SeaTunnelNodeContext(seaTunnelConfig)

HazelcastInstanceFactory.newHazelcastInstance 接收的最后一个参数是 NodeContext,是 Hazelcast 节点整个生命周期都存在的节点上下文,SeaTunnel 就是在 SeaTunnelNodeContext 这个类上启动了 SeaTunnel 的服务端

接着,我看看 SeaTunnelNodeContext

java">// org.apache.seatunnel.engine.server.SeaTunnelNodeContextpublic class SeaTunnelNodeContext extends DefaultNodeContext {private final SeaTunnelConfig seaTunnelConfig;public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {this.seaTunnelConfig = seaTunnelConfig;}// 这个方法会被Hazelcast内部逻辑调用@Overridepublic NodeExtension createNodeExtension(@NonNull Node node) {return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);}
}

createNodeExtension 方法中,创建了 NodeExtension 实例,而在 NodeExtension 的构造方法中,创建了 SeaTunnelServer,这个类就是 SeaTunnel 的真正服务端类

然后在 NodeExtensioncreateExtensionServices 方法中,将 SeaTunnelServer 注册到了 hazelcast

关键的 SeaTunnelServer

org.apache.seatunnel.engine.server.SeaTunnelServer 实现了 ManagedService, MembershipAwareService, LiveOperationsTracker,这三个接口是Hazelcast节点生命周期相关接口,SeaTunnel 也是利用 Hazelcast生命周期相关的钩子来管理自己的核心功能

咱们来看看 SeaTunnelServer 的字段

java">// org.apache.seatunnel.engine.server.SeaTunnelServerpublic class SeaTunnelServerimplements ManagedService, MembershipAwareService, LiveOperationsTracker {private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);public static final String SERVICE_NAME = "st:impl:seaTunnelServer";private NodeEngineImpl nodeEngine;private final LiveOperationRegistry liveOperationRegistry;private volatile SlotService slotService;private TaskExecutionService taskExecutionService;private CoordinatorService coordinatorService;private ScheduledExecutorService monitorService;@Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;private final SeaTunnelConfig seaTunnelConfig;private volatile boolean isRunning = true;
}

其中最重要的 4 个组件

  • slotService:槽管理服务,控制任务运行数量,任务 cpu、内存资源分配
  • taskExecutionService:处理客户端提交的任务
  • coordinatorService:集群状态监听
  • seaTunnelHealthMonitor:集群健康状态监测

接着看 SeaTunnelServerinit 方法,上面提到的 4 个组件都在这里初始化了

java">// org.apache.seatunnel.engine.server.SeaTunnelServer@Overridepublic void init(NodeEngine engine, Properties hzProperties) {this.nodeEngine = (NodeEngineImpl) engine;// TODO Determine whether to execute there method on the master node according to the deploy// typetaskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);taskExecutionService.start();// 初始化slotService,这个方法里面用了单例模式的DCL双重锁检查,因为getSlotService方法很多地方都有调用getSlotService();// 初始化集群信息监控coordinatorService =new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());// 启动定时任务,定时触发coordinatorService轮询集群的信息并打印出来monitorService = Executors.newSingleThreadScheduledExecutor();monitorService.scheduleAtFixedRate(this::printExecutionInfo,0,seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),TimeUnit.SECONDS);// 初始化集群健康状态监测seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());// a trick way to fix StatisticsDataReferenceCleaner thread class loader leak.// see https://issues.apache.org/jira/browse/HADOOP-19049FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");}

到这里,SeaTunnel 节点就完成了初始化了,里面涉及到的这几个组件,就不在这里展开跟踪了,后面单独写文章来详细了解

监听 rest-api 请求

官方文档有提到 SeaTunnel 支持 rest-api 方式提交任务,那么是在哪定义的 api 呢,api 的处理逻辑在哪呢?

咱们回到 org.apache.seatunnel.engine.server.NodeExtension,这里面定义了 hazelcast节点的生命周期中会触发到的钩子方法

其中 createTextCommandService 方法中,就注册了 http 请求的处理器

java">// org.apache.seatunnel.engine.server.NodeExtension@Overridepublic TextCommandService createTextCommandService() {return new TextCommandServiceImpl(node) {{// 获取日志配置信息register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));// 设置日志配置信息register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));// rest-api的get方法处理register(HTTP_GET, new RestHttpGetCommandProcessor(this));// rest-api的post方法处理register(HTTP_POST, new RestHttpPostCommandProcessor(this));}};}

总体流程

最后放一个简单的总体流程图
在这里插入图片描述


http://www.ppmy.cn/news/1548989.html

相关文章

解决复杂查询难题:如何通过 Self-querying Prompting 提高 RAG 系统效率?

在现代自然语言处理(NLP)领域,检索增强生成(RAG)系统因其能够结合外部知识库和大语言模型的强大生成能力,成为了提升信息检索质量的主流解决方案之一。然而,传统的 RAG 流程存在诸多挑战&#x…

深入探讨 Puppeteer 如何使用 X 和 Y 坐标实现鼠标移动

背景介绍 现代爬虫技术中,模拟人类行为已成为绕过反爬虫系统的关键策略之一。无论是模拟用户点击、滚动,还是鼠标的轨迹移动,都可以为爬虫脚本带来更高的“伪装性”。在众多的自动化工具中,Puppeteer作为一个无头浏览器控制库&am…

Kotlin 编译失败问题及解决方案:从守护进程到 Gradle 配置

Kotlin 编译失败问题及解决方案:从守护进程到 Gradle 配置 在使用 Kotlin 编译项目时,有时可能会遇到类似以下错误: Unable to clear jar cache after compilation, maybe daemon is already down: java.rmi.ConnectException: Connection …

WebGIS地图框架有哪些?

地理信息系统(GIS)已经成为现代应用开发中不可或缺的一部分,尤其在前端开发中。随着Web技术的快速发展,许多强大而灵活的GIS框架涌现出来,为开发人员提供了丰富的工具和功能,使他们能够创建交互式、高性能的…

探索 FFI - Rust 与 C# 互调实战

所谓幸福,就是把灵魂安放在适当的位置。 —— 亚里士多德 Aristotle 一、Rust C# ? 1、C# 的优势 丰富的生态系统:C# 是由微软开发和维护的,拥有强大的 .NET 框架支持,提供了大量的库和工具,可以极大地…

免费的视频混剪综合处理工具介绍与下载

免费的视频混剪综合处理工具 软件截图 功能 支持: 这个软件主要用于视频的批量处理,包括添加水印、裁剪、画中画、去水印、去头尾、变速、文本和背景音乐等功能。以下是界面中一些主要功能的介绍: 视频队列:显示当前待处理的视…

Kubernetes集群Pod内存泄露问题分析和解决

在Kubernetes集群中,有时会遇到Pod无法正常创建或被杀掉的情况,describe Pod时显示"no allocated memory"。这种情况很可能是由于节点内存泄露导致的。本文将分析内存泄露的原因,并给出解决方案。 问题现象 Pod状态异常,describe pod显示原因为:no allocated memory…

代码随想录1016-Day16

目录 530.二叉搜索树的最小绝对差501.二叉搜索树中的众数105.从中序与前序遍历序列构造二叉树总结 收获 530.二叉搜索树的最小绝对差 文章链接:代码随想录 题目链接:题目 思路:用中序遍历遍历一遍 BST 的所有节点得到有序结果,然后在遍历过程…