本人阅读了 Skywalking 的大部分核心代码,也了解了相关的文献,对此深有感悟,特此借助巨人的思想自己手动用JAVA语言实现了一个 “调用链监控APM” 系统。本书采用边讲解实现原理边编写代码的方式,看本书时一定要跟着敲代码。
作者已经将过程写成一部书籍,奈何没有钱发表,如果您知道渠道可以联系本人。一定重谢。
本书涉及到的核心技术与思想
JavaAgent , ByteBuddy,SPI服务,类加载器的命名空间,增强JDK类,kafka,插件思想,切面,链路栈等等。实际上远不止这么多,差不多贯通了整个java体系。
适用人群
自己公司要实现自己的调用链的;写架构的;深入java编程的;阅读Skywalking源码的;
版权
本书是作者呕心沥血亲自编写的代码,不经同意切勿拿出去商用,否则会追究其责任。
原版PDF+源码请见:
本章涉及到的工具类也在这里面:
PDF书籍《手写调用链监控APM系统-Java版》第1章 开篇介绍-CSDN博客
第11章 插件与链路的结合:HttpClient插件实现跨进程传输TraceSegment
之所以要讲这个插件就是因为http调用涉及到跨进程传输,会有新的TraceSegment生成,需要把当前TraceSegment信息设置到Http请求头里面进行跨进程传输到后一个TraceSegment上面。
要想对httpclient的接口请求进行拦截,需要拦截的信息如下:
类名:org.apache.commons.httpclient.HttpClient
方法:executeMethod
非JDK类库
下面来开发这个插件,要记住前面说的插件开发四部曲。在插件模块下新增http-client-plugin项目,hadluo-apm-plugin.def内容如下:
http-client=com.hadluo.apm.httpclient.HttpClientInstrumentation
HttpClientInstrumentation代码如下:
public class HttpClientInstrumentation extends AbstractClassEnhancePluginDefine {@Overridepublic String enhanceClass() {// 拦截类return "org.apache.commons.httpclient.HttpClient";}@Overridepublic MethodsInterceptPoint[] configMethodsInterceptPoint() {return new MethodsInterceptPoint[]{new MethodsInterceptPoint() {@Overridepublic ElementMatcher<MethodDescription> getMethodsMatcher() {// 拦截方法return ElementMatchers.named("executeMethod").and(ElementMatchers.takesArguments(HttpMethod.class));}@Overridepublic String getMethodsInterceptor() {// 拦截处理逻辑return "com.hadluo.apm.httpclient.HttpClientInterceptor";}@Overridepublic boolean isOverrideArgs() {return false;}}};}
}
HttpClientInterceptor代码如下:
public class HttpClientInterceptor implements InstanceMethodsAroundInterceptor {@Overridepublic void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes) throws Throwable {HttpMethod httpMethod = (HttpMethod) allArguments[0];final String remotePeer = httpMethod.getURI().getHost() + ":" + httpMethod.getURI().getPort();ContextCarrier contextCarrier = new ContextCarrier();TraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);// 创建span , 并且把当前segment的信息 设置到 contextCarrier里面AbstractSpan exitSpan = service.createExitSpan(httpMethod.getURI().getPath(), remotePeer, contextCarrier);// 设置标签等信息exitSpan.setComponent("HttpClient");exitSpan.setLayer(SpanLayer.HTTP) ;exitSpan.setTag("url" , httpMethod.getURI().toString()) ;exitSpan.setTag("method" , httpMethod.getName()) ;// 将contextCarrier 信息设置到 http请求头里面, 传递到下一个 跨进程的segment上Map<String, String> serialize = contextCarrier.serialize();for(String key : serialize.keySet()){httpMethod.setRequestHeader(key, serialize.get(key));}}@Overridepublic Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {TraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);service.stopSpan();return ret;}@Overridepublic void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {TraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);service.activeSpan().log(t) ;}
}
我们首先构造了一个空的ContextCarrier , 然后传递到createExitSpan方法里面,这个方法我们之前只实现了两个参数的,我们需要增加这个重载方法的实现。在TraceContextManager增加方法:
public AbstractSpan createExitSpan(String operationName, String remotePeer , ContextCarrier contextCarrier){AbstraceTraceContext context = getOrCreate(true) ;AbstractSpan exitSpan = context.createExitSpan(operationName, remotePeer);context.inject(contextCarrier);return exitSpan ;
}
通过inject方法,我们将context上的segment信息设置到了空的ContextCarrier 上面, inject代码如下:
@Override
public void inject(ContextCarrier carrier) {carrier.setTraceId(traceSegment.getTraceId());carrier.setTraceSegmentId(traceSegment.getTraceSegmentId());carrier.setSpanId(acviveSpan().getSpanId());carrier.setParentServiceInstance(Config.Agent.serviceInstance);carrier.setParentServiceName(Config.Agent.serviceName);
}
经过这个设置,我们的ContextCarrier 里面就有当前的segment信息了,然后回到插件的beforeMethod, 下面将ContextCarrier 的信息序列化到map里面,serialize也是新增的,代码如下:
public Map<String, String> serialize(){Map<String, String> param = new HashMap<String, String>();for (Field f : this.getClass().getDeclaredFields()) {if(Modifier.isStatic(f.getModifiers())){continue;}f.setAccessible(true);try {param.put(SW_FLAG + f.getName() , f.get(this).toString()) ;} catch (IllegalAccessException e) {Logs.err(getClass(), "ContextCarrier serialize错误, field: " + f.getName(), e);}}return param ;
}
通过这个serialize序列化后,就把信息转存到了map里面,最后通过遍历Map,httpMethod.setRequestHeader将每个值设置到了请求头里面传递到下一个进程。
由于代码过于简单,作者就不在测试此插件,读者也应该能自己测试了,有问题的可以向作者提出,万分感谢!