概述
本次源码解读基于 v2.3.4 版本
v2.3.4 官方文档:About SeaTunnel Engine | Apache SeaTunnel
SeaTunnel 支持 Flink、Spark 引擎启动,也支持自身的 Zeta 引擎启动。
自身的 Zeta 引擎基于 hazelcast 作为分布式集群控制,支持单机、集群运行,支持自治集群(去中心化),省去了用户为 SeaTunnel Engine 集群指定 Master 节点的麻烦,因为它在运行过程中可以自行选择一个 Master 节点,当 Master 节点发生故障时,会自动选择新的 Master 节点。作为 SeaTunnel 的默认引擎,它支持高吞吐、低延迟、强一致性的同步作业操作,速度更快、更稳定、更节省资源、使用方便。
这篇文章我们先大致了解下 SeaTunnel 的启动流程中都干了些什么,后面的文章中我们在慢慢一步一步深入了解
题外话
hazelcast 是一个开源的分布式内存级别的缓存数据库,当然,也可以使用它的各种 API 来实现自己的分布式集群效果,它不需要依赖 Zookeeper 这类额外的中间件来协助 Master 节点选举,使用起来也简单,SeaTunnel 基于它封装了自己的 Zeta 引擎,轻量而且宕机恢复能力也很强,启动速度也极快。(有兴趣的同学可以认真研究下 SeaTunnel 的 Zeta 引擎,参考 Zeta 引擎集成 hazelcast 的方法,做一些自己的分布式集群的小项目,hazelcast 是真的小而强悍!)
废话不多说,开始吧!
启动流程分析
启动脚本找启动类
首先,SeaTunnel 为我们提供了集群启动时使用的启动脚本 bin/seatunnel-cluster.sh,打开这个脚本,大概在 40 行左右,可以看到 APP_MAIN 这个变量,定义了 jvm 启动的入口类是 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer
APP_JAR=${APP_DIR}/starter/seatunnel-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"
OUT="${APP_DIR}/logs/seatunnel-server.out"
从启动类出发
打开 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer
,里面代码很简单,也就解析命令行传入的参数,构建 Server 实例并启动
java">// org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServerpublic class SeaTunnelServer {public static void main(String[] args) throws CommandException {ServerCommandArgs serverCommandArgs =CommandLineUtils.parse(args,new ServerCommandArgs(),EngineType.SEATUNNEL.getStarterShellName(),true);SeaTunnel.run(serverCommandArgs.buildCommand());}
}
serverCommandArgs.buildCommand()
返回的是 ServerExecuteCommand
类的实例,SeaTunnel.run
执行的是 ServerExecuteCommand
类实例的 execute
方法
所有我们接下来进入 ServerExecuteCommand
的 execute
方法
java">// org.apache.seatunnel.core.starter.seatunnel.command.ServerExecuteCommand@Overridepublic void execute() {// 读取配置文件SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());}// 创建Hazelcast实例并启动HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),Thread.currentThread().getName(),new SeaTunnelNodeContext(seaTunnelConfig));}
看着也不复杂,读取配置文件,创建 Hazelcast 实例并启动
其实关键就是在给 HazelcastInstanceFactory.newHazelcastInstance
传入的最后一个参数 new SeaTunnelNodeContext(seaTunnelConfig)
HazelcastInstanceFactory.newHazelcastInstance
接收的最后一个参数是 NodeContext,是 Hazelcast 节点整个生命周期都存在的节点上下文,SeaTunnel 就是在 SeaTunnelNodeContext
这个类上启动了 SeaTunnel 的服务端
接着,我看看 SeaTunnelNodeContext
java">// org.apache.seatunnel.engine.server.SeaTunnelNodeContextpublic class SeaTunnelNodeContext extends DefaultNodeContext {private final SeaTunnelConfig seaTunnelConfig;public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {this.seaTunnelConfig = seaTunnelConfig;}// 这个方法会被Hazelcast内部逻辑调用@Overridepublic NodeExtension createNodeExtension(@NonNull Node node) {return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);}
}
在 createNodeExtension
方法中,创建了 NodeExtension
实例,而在 NodeExtension
的构造方法中,创建了 SeaTunnelServer
,这个类就是 SeaTunnel 的真正服务端类
然后在 NodeExtension
的 createExtensionServices
方法中,将 SeaTunnelServer
注册到了 hazelcast
关键的 SeaTunnelServer
org.apache.seatunnel.engine.server.SeaTunnelServer
实现了 ManagedService, MembershipAwareService, LiveOperationsTracker
,这三个接口是Hazelcast节点生命周期相关接口,SeaTunnel 也是利用 Hazelcast生命周期相关的钩子来管理自己的核心功能
咱们来看看 SeaTunnelServer
的字段
java">// org.apache.seatunnel.engine.server.SeaTunnelServerpublic class SeaTunnelServerimplements ManagedService, MembershipAwareService, LiveOperationsTracker {private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);public static final String SERVICE_NAME = "st:impl:seaTunnelServer";private NodeEngineImpl nodeEngine;private final LiveOperationRegistry liveOperationRegistry;private volatile SlotService slotService;private TaskExecutionService taskExecutionService;private CoordinatorService coordinatorService;private ScheduledExecutorService monitorService;@Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;private final SeaTunnelConfig seaTunnelConfig;private volatile boolean isRunning = true;
}
其中最重要的 4 个组件
- slotService:槽管理服务,控制任务运行数量,任务 cpu、内存资源分配
- taskExecutionService:处理客户端提交的任务
- coordinatorService:集群状态监听
- seaTunnelHealthMonitor:集群健康状态监测
接着看 SeaTunnelServer
的 init
方法,上面提到的 4 个组件都在这里初始化了
java">// org.apache.seatunnel.engine.server.SeaTunnelServer@Overridepublic void init(NodeEngine engine, Properties hzProperties) {this.nodeEngine = (NodeEngineImpl) engine;// TODO Determine whether to execute there method on the master node according to the deploy// typetaskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);taskExecutionService.start();// 初始化slotService,这个方法里面用了单例模式的DCL双重锁检查,因为getSlotService方法很多地方都有调用getSlotService();// 初始化集群信息监控coordinatorService =new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());// 启动定时任务,定时触发coordinatorService轮询集群的信息并打印出来monitorService = Executors.newSingleThreadScheduledExecutor();monitorService.scheduleAtFixedRate(this::printExecutionInfo,0,seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),TimeUnit.SECONDS);// 初始化集群健康状态监测seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());// a trick way to fix StatisticsDataReferenceCleaner thread class loader leak.// see https://issues.apache.org/jira/browse/HADOOP-19049FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");}
到这里,SeaTunnel 节点就完成了初始化了,里面涉及到的这几个组件,就不在这里展开跟踪了,后面单独写文章来详细了解
监听 rest-api 请求
官方文档有提到 SeaTunnel 支持 rest-api 方式提交任务,那么是在哪定义的 api 呢,api 的处理逻辑在哪呢?
咱们回到 org.apache.seatunnel.engine.server.NodeExtension
,这里面定义了 hazelcast节点的生命周期中会触发到的钩子方法
其中 createTextCommandService
方法中,就注册了 http 请求的处理器
java">// org.apache.seatunnel.engine.server.NodeExtension@Overridepublic TextCommandService createTextCommandService() {return new TextCommandServiceImpl(node) {{// 获取日志配置信息register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));// 设置日志配置信息register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));// rest-api的get方法处理register(HTTP_GET, new RestHttpGetCommandProcessor(this));// rest-api的post方法处理register(HTTP_POST, new RestHttpPostCommandProcessor(this));}};}
总体流程
最后放一个简单的总体流程图