背景:
xxl-job版本:2.0.2
xxl-rpc-core版本:1.4.0
springboot版本:1.5.20.RELEASE
Kingbase版本:V8R6
针对业务上的需求,做一些个性化扩展:
1、启用accessToken,并且使用SM2加密;
2、(可选)xxl-job-admin应用兼容人大金仓Kingbase数据库
3、(可选)springboot版本升级到2.7.18;
4、(可选)删掉xxl-job-admin应用种的一些非核心业务代码;
1 xxl-job-admin应用中的springboot版本升级
1.1 quartz.properties
# ### 修改1
# org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
1.2 application.properties
# ### 修改1
# server.context-path=/xxl-job-admin
server.servlet.context-path=/xxl-job-admin# ### 修改2
# spring.resources.static-locations=classpath:/static/
spring.web.resources.static-locations=classpath:/static/# ### 修改3
#spring.datasource.type=org.apache.tomcat.jdbc.pool.DataSource
#spring.datasource.tomcat.max-wait=10000
#spring.datasource.tomcat.max-active=30
#spring.datasource.tomcat.test-on-borrow=true
#spring.datasource.tomcat.validation-interval=30000spring.datasource.hikari.pool-name=xxl-pool
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.max-lifetime=60000
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.connection-test-query=SELECT 1# ### 修改4,引入jasypt
jasypt.encryptor.algorithm=PBEWithMD5AndDES
jasypt.encryptor.password=G0CvDz7oJn6
# jasypt.encryptor.property.prefix=ENC(
# jasypt.encryptor.property.suffix=)
# jasypt.encryptor.keyObtentionIterations=1000
# jasypt.encryptor.poolSize=1
# jasypt.encryptor.saltGeneratorClassname=org.jasypt.salt.RandomSaltGenerator
# jasypt.encryptor.ivGeneratorClassname=org.jasypt.iv.RandomIvGenerator
# jasypt.encryptor.stringOutputType=base64mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
1.3 XxlJobRegistryMapper.xml
注意:
#{timeout}原来是传参,由于适配金仓数据库SQL,目前没有找到合适的实现方式,硬编码为90
<delete id="removeDead" parameterType="java.lang.Integer" >DELETE FROM XXL_JOB_QRTZ_TRIGGER_REGISTRYWHERE update_time <![CDATA[ < ]]> (sysdate - #{timeout}*1/(24*60*60))WHERE update_time <![CDATA[ < ]]> (sysdate - interval <!-- #{timeout} -->'90' SECOND)</delete><select id="findAll" parameterType="java.lang.Integer" resultMap="XxlJobRegistry">SELECT <include refid="Base_Column_List" />FROM XXL_JOB_QRTZ_TRIGGER_REGISTRY tWHERE t.update_time <![CDATA[ > ]]> (sysdate - #{timeout}*1/(24*60*60))WHERE t.update_time <![CDATA[ > ]]> (sysdate - interval <!-- #{timeout} -->'90' SECOND)</select>
2 使用SM2加密动态accessToken
2.1 xxl-rpc-core
2.1.1 新增依赖
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency><dependency><groupId>org.bouncycastle</groupId><artifactId>bcprov-jdk15on</artifactId><version>1.70</version></dependency>
2.1.2 新增一个配置类XxlJobAdminConfigCopy
java">package com.xxl.rpc.config;import cn.hutool.core.codec.Base64;
import cn.hutool.crypto.SmUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.SM2;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;import java.time.LocalDate;/*** xxl-job config** @author xuxueli 2017-04-28*/
@Configuration
public class XxlJobAdminConfigCopy implements InitializingBean {private static XxlJobAdminConfigCopy adminConfig = null;public static XxlJobAdminConfigCopy getAdminConfig() {return adminConfig;}@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;}@Value("${xxl.job.accessToken}")private String accessToken;public String getAccessToken() {// 将AccessToken搞成动态,使用SM2加密// 加上当前日期String privateKey = "私钥";String publicKey = "公钥";SM2 sm2 = SmUtil.sm2(Base64.decode(privateKey), Base64.decode(publicKey));return sm2.encryptBcd(accessToken + LocalDate.now(), KeyType.PublicKey);}
}
xxl-rpc-core\src\main\resources\META-INF\spring.factories
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.xxl.rpc.config.XxlJobAdminConfigCopy
2.1.3 修改XxlRpcProviderFactory.java
com.xxl.rpc.remoting.provider.XxlRpcProviderFactory.java
java">/*** invoke service** @param xxlRpcRequest* @return*/public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {// make responseXxlRpcResponse xxlRpcResponse = new XxlRpcResponse();xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());// match service beanString serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());Object serviceBean = serviceData.get(serviceKey);// validif (serviceBean == null) {xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");return xxlRpcResponse;}if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");return xxlRpcResponse;}String accessTokenCopy = XxlJobAdminConfigCopy.getAdminConfig().getAccessToken();if (accessTokenCopy!=null && !accessTokenCopy.trim().isEmpty()) {String privateKey = "私钥";String publicKey = "公钥";SM2 sm2 = SmUtil.sm2(Base64.decode(privateKey), Base64.decode(publicKey));String clientAccessToken = StrUtil.utf8Str(sm2.decryptFromBcd(xxlRpcRequest.getAccessToken(), KeyType.PrivateKey));String accessTokenDecrypt = StrUtil.utf8Str(sm2.decryptFromBcd(accessTokenCopy.trim(), KeyType.PrivateKey));if (!accessTokenDecrypt.equals(clientAccessToken)) {xxlRpcResponse.setErrorMsg("The access token is wrong.");// logger.error("The access token[{}] is wrong.accessToken【{}】clientAccessToken【{}】", xxlRpcRequest.getAccessToken(), accessTokenDecrypt, clientAccessToken);return xxlRpcResponse;}}try {// invokeClass<?> serviceClass = serviceBean.getClass();String methodName = xxlRpcRequest.getMethodName();Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result = method.invoke(serviceBean, parameters);/*FastClass serviceFastClass = FastClass.create(serviceClass);FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);Object result = serviceFastMethod.invoke(serviceBean, parameters);*/xxlRpcResponse.setResult(result);} catch (Throwable t) {// catch errorlogger.error("xxl-rpc provider invokeService error.", t);xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));}return xxlRpcResponse;}
2.1.4 修改XxlRpcReferenceBean
com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean
java"> public Object getObject() {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { iface },new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// method paramString className = method.getDeclaringClass().getName(); // iface.getName()String varsion_ = version;String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();Object[] parameters = args;// filter for genericif (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {Class<?>[] paramTypes = null;if (args[3]!=null) {String[] paramTypes_str = (String[]) args[3];if (paramTypes_str.length > 0) {paramTypes = new Class[paramTypes_str.length];for (int i = 0; i < paramTypes_str.length; i++) {paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);}}}className = (String) args[0];varsion_ = (String) args[1];methodName = (String) args[2];parameterTypes = paramTypes;parameters = (Object[]) args[4];}// filter method like "Object.toString()"if (className.equals(Object.class.getName())) {logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);throw new XxlRpcException("xxl-rpc proxy class-method not support");}// addressString finalAddress = address;if (finalAddress==null || finalAddress.trim().length()==0) {if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {// discoveryString serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);// load balanceif (addressSet==null || addressSet.size()==0) {// pass} else if (addressSet.size()==1) {finalAddress = addressSet.first();} else {finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);}}}if (finalAddress==null || finalAddress.trim().length()==0) {throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");}// requestXxlRpcRequest xxlRpcRequest = new XxlRpcRequest();xxlRpcRequest.setRequestId(UUID.randomUUID().toString());xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());// 将AccessToken搞成动态,使用SM2加密// 加上当前日期String privateKey = "私钥";String publicKey = "公钥";SM2 sm2 = SmUtil.sm2(Base64.decode(privateKey), Base64.decode(publicKey));xxlRpcRequest.setAccessToken(sm2.encryptBcd(accessToken + LocalDate.now(), KeyType.PublicKey));xxlRpcRequest.setClassName(className);xxlRpcRequest.setMethodName(methodName);xxlRpcRequest.setParameterTypes(parameterTypes);xxlRpcRequest.setParameters(parameters);// sendif (CallType.SYNC == callType) {// future-response setXxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);try {// do invokeclient.asyncSend(finalAddress, xxlRpcRequest);// future getXxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);if (xxlRpcResponse.getErrorMsg() != null) {throw new XxlRpcException(xxlRpcResponse.getErrorMsg());}return xxlRpcResponse.getResult();} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);} finally{// future-response removefutureResponse.removeInvokerFuture();}} else if (CallType.FUTURE == callType) {// future-response setXxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);try {// invoke future setXxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);XxlRpcInvokeFuture.setFuture(invokeFuture);// do invokeclient.asyncSend(finalAddress, xxlRpcRequest);return null;} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);// future-response removefutureResponse.removeInvokerFuture();throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);}} else if (CallType.CALLBACK == callType) {// get callbackXxlRpcInvokeCallback finalInvokeCallback = invokeCallback;XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();if (threadInvokeCallback != null) {finalInvokeCallback = threadInvokeCallback;}if (finalInvokeCallback == null) {throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");}// future-response setXxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);try {client.asyncSend(finalAddress, xxlRpcRequest);} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);// future-response removefutureResponse.removeInvokerFuture();throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);}return null;} else if (CallType.ONEWAY == callType) {client.asyncSend(finalAddress, xxlRpcRequest);return null;} else {throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");}}});}
跟accessToken相关的代码:
java">com.xxl.job.admin.core.conf.XxlJobAdminConfig.getAccessToken()