xxl-job 源码解析,分布式任务调度xxl-job 客户端启动 执行器原理(二)

devtools/2025/2/26 18:31:32/

当前主要介绍了:执行器如何进行初始化,如何向调度器注册,调度器侧接收相关方法分析

目录:

       1. 配置xxl-job数据库环境,将调度器,执行器demo进行启动。

       2. 分析执行器启动流程

       3. 分析调度器接收心跳流程

1. 创建mysql数据库,调度器服务配置mysql参数,启动调度器服务

   1.1 由父目录/doc/db/  复制table_xxl_job.sql   内容,在数据库执行,便会自动创建xxl_job库

   1.2 调度器配置mysql参数

   1.3 启动服务即可

 2. 启动执行器, 位置在xxl-job-executor-samples/xxl-job-executor-springboot

3. 执行器启动为:向spring 注册一个bean, 名称为:XxlJobSpringExecutor, 故如果从0开始接入xxl-job中间件,执行器端服务 pom引入xxl-job-core 模块,再配置相关参数,如下注册XxlJobSpringExecutor Bean即可!

3. 执行器启动代码为xxl-job-core. 包名为com.xxl.job.core/executor/impl/

   3.1 XxlJobSpringExecutor启动类,使用spring钩子函数,在spring bean注册完毕后,获取XxlJob注解的方法,先放入Map<Method, XxlJob> annotatedMethods中,最终放入XxlJobExecutor类的ConcurrentHashMap容器中,即放入内存repository。第二件事就是使用单线程向调度器list循环发送心跳包。第一次就算 是注册包。interval time =30s

3.1 XxlSpringExecutor代码        

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// 其他代码xxx# SmartInitializingSingleton 接口方法,SmartInitializingSingleton接口是在所有非惰性单
# 实例初始化完成之后进行激活回调,InitializingBean接口是在每一个Bean实例初始化完成之后进
# 行激活回调。
@Override
public void afterSingletonsInstantiated() {# xxl job 启动逻辑 // init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry { // 抽出放入了 XxlJobExecutor ,疑惑点,为什么不叫AbstractXxlJobExecutor呢super.start();} catch (Exception e) {throw new RuntimeException(e);}}}

    3.2 进入XxlJobExecutor start方法:

public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken, timeout);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);
}

   3.3 代码有些校验方法封装下更佳。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}

   3.4 进入核心方法start逻辑:

         启动客户端的netty server. 

         启动向server发送心跳包逻辑

    public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Throwable e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Throwable e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}

3.5 startRegistry逻辑:

      3.5.1 ExecutorRegistryThread 这个地方有个知识点为: 单例模式

               单例模式分为:懒汉式,饿汉式。经常使用的有静态内部类,饿汉式

                此处使用的是饿汉式,直接创建好了,无线程安全问题

private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){return instance;
}
    public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}

3.6. 线程实现:

       3.6.1 采用字段标识区分destry逻辑

       3.6.2 每隔30s 循环发送心跳消息,TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

3.6.3 如果有多个调度器,那么发送多条心跳消息,循环发送

3.6.4 注意官方代码,注册成功只有debug模式才会打印日志,此处我改为了logger.info()方便调试,更直接看到,发送的http请求

    public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = new Thread(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Throwable e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Throwable e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (Throwable e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Throwable e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Throwable e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");}});registryThread.setDaemon(true);registryThread.setName("xxl-job, executor ExecutorRegistryThread");registryThread.start();}

3.7 adminBiz.registry()

      向服务端发送http请求,心跳包发送在客户端逻辑基本就结束了

public class AdminBizClient implements AdminBiz {   @Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);}}

4. 调度器服务接收位置为:

    xxl-job-admi/com.xxl.job.admin/controller/JobApiController

   4.1 核心代码:

         a. 此处代码改为switch, 或使用策略模式更好点。

             甚至将三个参数进行拆分为三个接口更佳,目前认为不符合高内聚,低耦合思想

/*** api** @param uri* @param data* @return*/@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// validif (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingif ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}

  4.2 进入adminBiz.registry()方法:

    @Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registry(registryParam);}

     4.2.2 上方JobRegistryHelper是一个有意思的类,这个JobRegistryHelper不是spring管理的,但是在registry方法,需要使用dao层,进行sql的跟新,那如何处理的呢

                 使用方法为:                      XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySaveOrUpdate

                 XxlJobAdminConfig.getAdminConfig,我觉得这应该叫getInstance()

                 XxlJobAdminConfig 是spring管理的,里面有相关yml的配置参数,和dao方法

通过实现spring的扩展点InitializingBean(钩子函数), 将实例化好的bean,赋值给单例模式的自己。如下所示:

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {return adminConfig;
}
@Override
public void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();
}
}

         后续如果业务中有相同需求亦可以采用上述方法进行处理

  4.3 registry方法:

	public ReturnT<String> registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 0-fail; 1-save suc; 2-update suc;int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySaveOrUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret == 1) {// fresh (add)freshGroupRegistryInfo(registryParam);}/*int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}*/}});return ReturnT.SUCCESS;}

  4.4 核心代码:

       向mysql 注册执行器,或更新心跳时间

int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
.registrySaveOrUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(),registryParam.getRegistryValue(), new Date());# sql:
<insert id="registrySaveOrUpdate" >INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)VALUES( #{registryGroup}  , #{registryKey} , #{registryValue}, #{updateTime})ON DUPLICATE KEY UPDATE`update_time` = #{updateTime}
</insert>

 5. 总结:

            上述即执行器发送逻辑,与调度器接收相关接口,我认为一些相关方法可以再优化下,比如对容器,字符串判空,封装为xxUtil进行使用, 或使用apache.common.lang3的包进行判断

     rename相关类名

     代码逻辑再抽象一下

     某些对象进行更细粒度的拆分

6.  本文作者水平有限,如有错误,欢迎指正!

   


http://www.ppmy.cn/devtools/162856.html

相关文章

将VsCode变得顺手好用(1

目录 设置中文 配置调试功能 提效和增强相关插件 主题和图标相关插件 设置中文 打开【拓展】 输入【Chinese】 下载完成后重启Vs即可变为中文 配置调试功能 在随便一个位置新建一个文件夹&#xff0c;用于放置调试文件以及你未来写的代码&#xff0c;随便命名但切记不可用中…

4. MySQL 逻辑架构说明

4. MySQL 逻辑架构说明 文章目录 4. MySQL 逻辑架构说明1. 逻辑架构剖析1.1 服务器处理客户端请求1.2 Connectors(连接器)1.3 第1层&#xff1a;连接层1.4 第2层&#xff1a;服务层1.5 第3层&#xff1a;引擎层1.6 存储层 2. SQL执行流程2.1 MySQL 中的 SQL 执行流程 2.2 MySQL…

STM32-智能台灯项目

一、项目需求 1. 红外传感器检测是否有人&#xff0c;有人的话实时检测距离&#xff0c;过近则报警&#xff1b;同时计时&#xff0c;超过固定时间则报警&#xff1b; 2. 按键 1 切换工作模式&#xff1a;智能模式、按键模式、远程模式&#xff1b; 3. 智能模式下&#xff0c;根…

【Linux】Ubuntu服务器的安装和配置管理

ℹ️大家好&#xff0c;我是练小杰&#xff0c;今天周二了&#xff0c;哪吒的票房已经到了138亿了&#xff0c;饺子导演好样的&#xff01;&#xff01;每个人的成功都不是必然的&#xff0c;坚信自己现在做的事是可以的&#xff01;&#xff01;&#x1f606; 本文是有关Ubunt…

MySql常用指令

原文地址&#xff1a;MySql常用指令 – 无敌牛 欢迎参观我的个人博客&#xff1a;无敌牛 – 技术/著作/典籍/分享等 1 show databases ; 列出所有的数据库名字 2 show tables ; 列出当前库下所有的表名字 3 show create table xxx ; 显示表xxx的建表语句 4 create role…

国内访问Github的四种方法(2025版)

声明&#xff1a;以下内容&#xff0c;仅供学习使用&#xff0c;不得他用。如有他用&#xff0c;与本文作者无关。 国内访问GitHub及下载文件的解决方案整理如下&#xff0c;结合最新技术方案和实测有效方法&#xff1a; 一、网络层解决方案 Hosts文件修改法 通过DNS查询工具…

2.1部署logstash:9600

实验环境&#xff1a;关闭防火墙&#xff0c;完成java环境 yum -y install wget wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm yum localinstall jdk-8u341-linux-x64.rpm -y java -version 1.安装logstash tar xf logstash-6.4.1.tar.gz -C /usr/local…

VMware建立linux虚拟机

本文适用于初学者&#xff0c;帮助初学者学习如何创建虚拟机&#xff0c;了解在创建过程中各个选项的含义。 环境如下&#xff1a; CentOS版本&#xff1a; CentOS 7.9&#xff08;2009&#xff09; 软件&#xff1a; VMware Workstation 17 Pro 17.5.0 build-22583795 1.配…