当前主要介绍了:执行器如何进行初始化,如何向调度器注册,调度器侧接收相关方法分析
目录:
1. 配置xxl-job数据库环境,将调度器,执行器demo进行启动。
2. 分析执行器启动流程
3. 分析调度器接收心跳流程
1. 创建mysql数据库,调度器服务配置mysql参数,启动调度器服务
1.1 由父目录/doc/db/ 复制table_xxl_job.sql 内容,在数据库执行,便会自动创建xxl_job库
1.2 调度器配置mysql参数
1.3 启动服务即可
2. 启动执行器, 位置在xxl-job-executor-samples/xxl-job-executor-springboot
3. 执行器启动为:向spring 注册一个bean, 名称为:XxlJobSpringExecutor, 故如果从0开始接入xxl-job中间件,执行器端服务 pom引入xxl-job-core 模块,再配置相关参数,如下注册XxlJobSpringExecutor Bean即可!
3. 执行器启动代码为xxl-job-core. 包名为com.xxl.job.core/executor/impl/
3.1 XxlJobSpringExecutor启动类,使用spring钩子函数,在spring bean注册完毕后,获取XxlJob注解的方法,先放入Map<Method, XxlJob> annotatedMethods中,最终放入XxlJobExecutor类的ConcurrentHashMap容器中,即放入内存repository。第二件事就是使用单线程向调度器list循环发送心跳包。第一次就算 是注册包。interval time =30s
3.1 XxlSpringExecutor代码
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// 其他代码xxx# SmartInitializingSingleton 接口方法,SmartInitializingSingleton接口是在所有非惰性单
# 实例初始化完成之后进行激活回调,InitializingBean接口是在每一个Bean实例初始化完成之后进
# 行激活回调。
@Override
public void afterSingletonsInstantiated() {# xxl job 启动逻辑 // init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry { // 抽出放入了 XxlJobExecutor ,疑惑点,为什么不叫AbstractXxlJobExecutor呢super.start();} catch (Exception e) {throw new RuntimeException(e);}}}
3.2 进入XxlJobExecutor start方法:
public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken, timeout);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);
}
3.3 代码有些校验方法封装下更佳。
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}
3.4 进入核心方法start逻辑:
启动客户端的netty server.
启动向server发送心跳包逻辑
public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Throwable e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Throwable e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}
3.5 startRegistry逻辑:
3.5.1 ExecutorRegistryThread 这个地方有个知识点为: 单例模式
单例模式分为:懒汉式,饿汉式。经常使用的有静态内部类,饿汉式
此处使用的是饿汉式,直接创建好了,无线程安全问题
private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){return instance;
}
public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}
3.6. 线程实现:
3.6.1 采用字段标识区分destry逻辑
3.6.2 每隔30s 循环发送心跳消息,TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
3.6.3 如果有多个调度器,那么发送多条心跳消息,循环发送
3.6.4 注意官方代码,注册成功只有debug模式才会打印日志,此处我改为了logger.info()方便调试,更直接看到,发送的http请求
public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = new Thread(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Throwable e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Throwable e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (Throwable e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Throwable e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Throwable e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");}});registryThread.setDaemon(true);registryThread.setName("xxl-job, executor ExecutorRegistryThread");registryThread.start();}
3.7 adminBiz.registry()
向服务端发送http请求,心跳包发送在客户端逻辑基本就结束了
public class AdminBizClient implements AdminBiz { @Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);}}
4. 调度器服务接收位置为:
xxl-job-admi/com.xxl.job.admin/controller/JobApiController
4.1 核心代码:
a. 此处代码改为switch, 或使用策略模式更好点。
甚至将三个参数进行拆分为三个接口更佳,目前认为不符合高内聚,低耦合思想
/*** api** @param uri* @param data* @return*/@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// validif (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingif ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}
4.2 进入adminBiz.registry()方法:
@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registry(registryParam);}
4.2.2 上方JobRegistryHelper是一个有意思的类,这个JobRegistryHelper不是spring管理的,但是在registry方法,需要使用dao层,进行sql的跟新,那如何处理的呢
使用方法为: XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySaveOrUpdate
XxlJobAdminConfig.getAdminConfig,我觉得这应该叫getInstance()
XxlJobAdminConfig 是spring管理的,里面有相关yml的配置参数,和dao方法
通过实现spring的扩展点InitializingBean(钩子函数), 将实例化好的bean,赋值给单例模式的自己。如下所示:
@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {return adminConfig;
}
@Override
public void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();
}
}
后续如果业务中有相同需求亦可以采用上述方法进行处理
4.3 registry方法:
public ReturnT<String> registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 0-fail; 1-save suc; 2-update suc;int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySaveOrUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret == 1) {// fresh (add)freshGroupRegistryInfo(registryParam);}/*int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}*/}});return ReturnT.SUCCESS;}
4.4 核心代码:
向mysql 注册执行器,或更新心跳时间
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
.registrySaveOrUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(),registryParam.getRegistryValue(), new Date());# sql:
<insert id="registrySaveOrUpdate" >INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})ON DUPLICATE KEY UPDATE`update_time` = #{updateTime}
</insert>
5. 总结:
上述即执行器发送逻辑,与调度器接收相关接口,我认为一些相关方法可以再优化下,比如对容器,字符串判空,封装为xxUtil进行使用, 或使用apache.common.lang3的包进行判断
rename相关类名
代码逻辑再抽象一下
某些对象进行更细粒度的拆分
6. 本文作者水平有限,如有错误,欢迎指正!