本人阅读了 Skywalking 的大部分核心代码,也了解了相关的文献,对此深有感悟,特此借助巨人的思想自己手动用JAVA语言实现了一个 “调用链监控APM” 系统。本书采用边讲解实现原理边编写代码的方式,看本书时一定要跟着敲代码。
作者已经将过程写成一部书籍,奈何没有钱发表,如果您知道渠道可以联系本人。一定重谢。
本书涉及到的核心技术与思想
JavaAgent , ByteBuddy,SPI服务,类加载器的命名空间,增强JDK类,kafka,插件思想,切面,链路栈等等。实际上远不止这么多,差不多贯通了整个java体系。
适用人群
自己公司要实现自己的调用链的;写架构的;深入java编程的;阅读Skywalking源码的;
版权
本书是作者呕心沥血亲自编写的代码,不经同意切勿拿出去商用,否则会追究其责任。
原版PDF+源码请见:
本章涉及到的工具类也在下面:
PDF书籍《手写调用链监控APM系统-Java版》第1章 开篇介绍-CSDN博客
第3章 SPI服务模块化系统
大名鼎鼎的调用链监控Skywalking将很多功能抽象成一个一个的Java SPI服务(BootService),每个服务都有生命周期方法,prepare,boot,shutdown等。这些服务还有排序值属性,用来指定哪个服务最先执行生命周期prepare,boot,shutdown。
这些服务都交由ServiceManager进行管理,用这种SPI服务模式进行了高度解耦和统一管理,着实是一种好的设计理念。
比如JVM信息收集,链路数据收集,链路数据的kafka发送到后端,都可以抽象成BootService服务,显而易见kafka发送服务是要最先初始化的,所以排序值也起了作用。
Java SPI服务
Java SPI(Service Provider Interface)是Java官方提供的一种服务发现机制,它允许在运行时动态地加载实现特定接口的类,而不需要在代码中显式地指定该类,从而实现解耦和灵活性。用法就是在/META-INF/services建立接口文件,里面内容为具体的实现。
3.1 SPI服务架构的建立
首先我们需要一个顶层SPI接口,这个接口我们在apm-commons项目中新建类:
com.hadluo.apm.commons.trace.BootService
public interface BootService {// 生命周期方法void prepare() throws Throwable;// 生命周期方法void boot() throws Throwable;// 生命周期方法void onComplete() throws Throwable;// 生命周期方法void shutdown() throws Throwable;/*** 值越大,越先执行* @return*/default int priority() {return 0;}
}
生命周期设计是从上到下依次执行,排序值默认都是0,有具体服务组件重写的,值越大越先执行。
然后需要一个服务管理器来管理所有服务,在apm-commons项目下新建类:
com.hadluo.apm.commons.trace.ServiceManager
public enum ServiceManager {// 枚举单例INSTANCE;// 存储 所有的 SPI 服务容器private Map<Class<?>, BootService> services = new HashMap<>();public void boot() {// 加载所有的服务this.services = loadAllServiceList();// 调用生命周期this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.prepare();} catch (Throwable e) {Logs.err(ServiceManager.class , "prepare error", e);}});this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.boot();} catch (Throwable e) {Logs.err(ServiceManager.class , "boot error", e);}});this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.onComplete();} catch (Throwable e) {Logs.err(ServiceManager.class , "onComplete error", e);}});}public void shutdown() {this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.shutdown();} catch (Throwable e) {Logs.err(ServiceManager.class , "shutdown error", e);}});}public <T> T getService(Class<T> clazz) {return (T) this.services.get(clazz);}private Map<Class<?>, BootService> loadAllServiceList() {Map<Class<?>, BootService> services = new HashMap<>();// SPI 加载ServiceLoader<BootService> load = ServiceLoader.load(BootService.class);for (BootService service : load) {services.put(service.getClass(), service);}return services;}
}
ServiceManager 采用枚举单例写法,参考 Effective Java 书中设计。
boot方法为初始化方法,在里面首先通过 loadAllServiceList 加载了所有服务,然后排依次调用其生命周期方法。shutdown为对外提供的方法,是注册jvm退出钩子时主动要调用的。
loadAllServiceList方法就是通过SPI技术从/META-INF/services路径下找到服务接口声明文件,从而得到了配置的服务。
在apm-agent-core项目下的resource下面新建/META-INF/services 目录,然后在新建接口服务文件:
文件名称就是BootService的类全名称,内容我们暂时先定义一个JVMService服务,下面会讲解到很多服务组件。
3.2 JVMService收集内存,CPU,线程信息
这个JVMService服务组件是用来收集当前微服务的jvm内存,CPU等系统资源信息,然后发送到后端监控展示。下面我们来实现这个服务。
在apm-agent-core项目下新建类:
com.hadluo.apm.agentcore.service.jvm.JVMService
public class JVMService implements BootService {// 定时器private ScheduledExecutorService executorService ;// kafka 发送到后端的 kafka发送服务private KafkaProducerManager kafkaProducerManager;@Overridepublic void prepare() throws Throwable {this.kafkaProducerManager = ServiceManager.INSTANCE.getService(KafkaProducerManager.class) ;}@Overridepublic void boot() throws Throwable {}@Overridepublic void onComplete() throws Throwable {}@Overridepublic void shutdown() throws Throwable {// 关闭定时器executorService.shutdown();}
}
由于是服务组件,所以必须实现BootService接口,还有生命周期方法。
调用链系统采集到的CPU,内存等信息都需要是近实时更新的,所以需要一个定时器,每隔N秒去采集系统信息然后上报到后端。
ScheduledExecutorService 为线程池实现的定时服务,在boot中,我们可以进行初始化,boot新增代码:
@Override
public void boot() throws Throwable {executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("JVMService-Collect"));executorService.scheduleAtFixedRate(()->{// 采集jvm信息JVMMetric metric = collectJVM() ;// 发送到kafkakafkaProducerManager.send(metric);} ,10, Integer.parseInt(Config.Agent.jvmCollectIntervalSecond), TimeUnit.SECONDS );
}
为了便于查找问题,有一个好习惯就是开的线程自定定义一个名字,上面的NamedThreadFactory 就是为了给定时器线程取名的存在,在apm-commons项目下新建类:
com.hadluo.apm.commons.trace.NamedThreadFactory
public class NamedThreadFactory implements ThreadFactory {private final String name;private final AtomicInteger threadNumber = new AtomicInteger(1);public NamedThreadFactory(String name) {this.name = name;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, name + "-" + threadNumber.getAndIncrement());t.setDaemon(true);return t;}
}
定时器10秒后启动,以后每隔配置Config.JVM.jvmCollectIntervalSecond 秒去执行 collectJVM 收集系统资源信息返回到JVMMetric实体对象里面,然后通过kafka发送服务发送到kafka broker让后端进行消费。
配置系统之前就讲过,不在说明,自己在Config.JVM中添加jvmCollectIntervalSecond。
collectJVM 方法为真实采集当前机器和jvm信息。代码如下:
private JVMMetric collectJVM() {JVMMetric metric = new JVMMetric();// 设置基础信息metric.setServiceInstance(Config.Agent.serviceInstance);metric.setServiceName(Config.Agent.serviceName);metric.setMsgTypeClass(JVMMetric.class.getName());//设置 cpu信息metric.setCpu(CPUProvider.INSTANCE.getCPUMetrics());// 设置堆内存信息metric.setHeapMemory(MemoryProvider.INSTANCE.getHeapMemoryMetrics());// 设置非堆内存信息metric.setNonHeapMemory(MemoryProvider.INSTANCE.getNonHeapMemoryMetrics());// 线程信息metric.setThreadInfo(ThreadProvider.INSTANCE.getThreadMetrics());return metric ;
}
这里每个系统指标都是通过XxxProvider进行收集,最后存放到一个kafka能发送的实体bean对象里面。这个对象应该是公共的,后端OAP也要用,于是我们要放到apm-commoms模块里面。
在apm-commoms里面新建类:
com.hadluo.apm.commons.kafka.JVMMetric
@Data // lombok注解,自动生成get set
public class JVMMetric extends BaseMsg {// cpu信息private CPU cpu;// 堆内存private Memory HeapMemory;// 非堆内存private Memory nonHeapMemory;// 线程private ThreadInfo threadInfo;
}
每个发送到kafka的bean对象都有一个公共的基类BaseMsg ,在apm-commoms里面新建类:
com.hadluo.apm.commons.kafka.BaseMsg
@Data
public class BaseMsg {// 消息类型的classprivate String msgTypeClass;// 采样时间final long sampleTime = System.currentTimeMillis();// agent的服务名称private String serviceName ;private String serviceInstance ;
}
也就是说,每次上报给后端OAP的数据都包含这4个公共属性。
CPU实体代码,在apm-commoms里面新建类:
com.hadluo.apm.commons.kafka.CPU
@Data
@Builder // lombok注解,提供构建器模式的初始化对象
public class CPU{// cpu核心数long logicalProcessorCount;//cpu系统使用率String sysCpu;//cpu用户使用率String userCpu;//cpu当前等待率String iowaitCpu;//cpu当前空闲率String idleCpu;
}
Memory实体代码,在apm-commoms里面新建类:
com.hadluo.apm.commons.kafka.Memory
@Builder
@Data
public class Memory {
// 初始内存private final long init;
// 已使用内存private final long used;
//提交内存private final long committed;
// 最大private final long max;
}
ThreadInfoy实体代码,在apm-commoms里面新建类:
com.hadluo.apm.commons.kafka.ThreadInfo
@Data
@Builder
public class ThreadInfo {//当前活动线程数,包括守护线程和非守护线程int threadCount;//守护线程数int daemonThreadCount;// 自Java虚拟机启动以来创建和启动的线程总数。long totalStartedThreadCount;// jvm 启动以来 的活动线程数的峰值int peakThreadCount;// runable状态的线程数int runnableStateThreadCount;// block状态的线程数int blockedStateThreadCount ;// waiting状态的线程数int waitingStateThreadCount;// time wait 状态的线程数int timedWaitingStateThreadCount;
}
到此我们实体已经全部建立完成,下面介绍各个Provider的具体实现。
CPUProvider获取cpu信息
这里需要借助github开源的oshi工具,POM文件已经在前面建立框架时依赖进去了,直接在apm-agent-core项目下新建类:
com.hadluo.apm.agentcore.service.jvm.CPUProvider
public enum CPUProvider {INSTANCE ;private final SystemInfo systemInfo = new SystemInfo();public CPU getCPUMetrics() {// 使用 oshi 工具 获取 cpu信息CentralProcessor processor = systemInfo.getHardware().getProcessor();//cpulong[] prevTicks = processor.getSystemCpuLoadTicks();long[] ticks = processor.getSystemCpuLoadTicks();long nice = ticks[CentralProcessor.TickType.NICE.getIndex()]- prevTicks[CentralProcessor.TickType.NICE.getIndex()];long irq = ticks[CentralProcessor.TickType.IRQ.getIndex()]- prevTicks[CentralProcessor.TickType.IRQ.getIndex()];long softirq = ticks[CentralProcessor.TickType.SOFTIRQ.getIndex()]- prevTicks[CentralProcessor.TickType.SOFTIRQ.getIndex()];long steal = ticks[CentralProcessor.TickType.STEAL.getIndex()]- prevTicks[CentralProcessor.TickType.STEAL.getIndex()];long cSys = ticks[CentralProcessor.TickType.SYSTEM.getIndex()]- prevTicks[CentralProcessor.TickType.SYSTEM.getIndex()];long user = ticks[CentralProcessor.TickType.USER.getIndex()]- prevTicks[CentralProcessor.TickType.USER.getIndex()];long iowait = ticks[CentralProcessor.TickType.IOWAIT.getIndex()]- prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()];long idle = ticks[CentralProcessor.TickType.IDLE.getIndex()]- prevTicks[CentralProcessor.TickType.IDLE.getIndex()];long totalCpu = user + nice + cSys + idle + iowait + irq + softirq + steal;// cpu核心数long logicalProcessorCount = processor.getLogicalProcessorCount();//cpu系统使用率String sysCpu = new DecimalFormat("#.##%").format(cSys * 1.0 / totalCpu);//cpu用户使用率String userCpu = new DecimalFormat("#.##%").format(user * 1.0 / totalCpu);//cpu当前等待率String iowaitCpu = new DecimalFormat("#.##%").format(iowait * 1.0 / totalCpu);//cpu当前空闲率String idleCpu = new DecimalFormat("#.##%").format(idle * 1.0 / totalCpu);return CPU.builder().logicalProcessorCount(logicalProcessorCount).idleCpu(idleCpu).sysCpu(sysCpu).userCpu(userCpu).iowaitCpu(iowaitCpu).build();}
}
逻辑我就不讲了,毕竟这就是一个工具类而已。
MemoryProvider获取JVM内存信息
这里需要借助java.lang提供的ManagementFactory工具获取JVM的堆内存和非堆内存。
在apm-agent-core项目下新建类:
com.hadluo.apm.agentcore.service.jvm.MemoryProvider
public enum MemoryProvider {INSTANCE;private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();public Memory getNonHeapMemoryMetrics(){MemoryUsage nonHeapMemoryUsage = memoryMXBean.getNonHeapMemoryUsage();return Memory.builder().init(nonHeapMemoryUsage.getInit()).used(nonHeapMemoryUsage.getUsed()).committed(nonHeapMemoryUsage.getCommitted()).max(nonHeapMemoryUsage.getMax()).build();}public Memory getHeapMemoryMetrics() {MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();return Memory.builder().init(heapMemoryUsage.getInit()).used(heapMemoryUsage.getUsed()).committed(heapMemoryUsage.getCommitted()).max(heapMemoryUsage.getMax()).build();}
}
ThreadProvider获取线程信息
也需要借助java.lang提供的ManagementFactory工具获取JVM线程信息。
在apm-agent-core项目下新建类:
com.hadluo.apm.agentcore.service.jvm.ThreadProvider
public enum ThreadProvider {INSTANCE;private final ThreadMXBean threadMXBean;ThreadProvider() {this.threadMXBean = ManagementFactory.getThreadMXBean();}public ThreadInfo getThreadMetrics() {int runnableStateThreadCount = 0;int blockedStateThreadCount = 0;int waitingStateThreadCount = 0;int timedWaitingStateThreadCount = 0;java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 0);if (threadInfos != null) {for (java.lang.management.ThreadInfo threadInfo : threadInfos) {if (threadInfo == null) {continue;}switch (threadInfo.getThreadState()) {case RUNNABLE:runnableStateThreadCount++;break;case BLOCKED:blockedStateThreadCount++;break;case WAITING:waitingStateThreadCount++;break;case TIMED_WAITING:timedWaitingStateThreadCount++;break;default:break;}}}return ThreadInfo.builder().threadCount(threadMXBean.getThreadCount()).daemonThreadCount(threadMXBean.getDaemonThreadCount()).peakThreadCount(threadMXBean.getPeakThreadCount()).runnableStateThreadCount(runnableStateThreadCount).blockedStateThreadCount(blockedStateThreadCount).waitingStateThreadCount(waitingStateThreadCount).timedWaitingStateThreadCount(timedWaitingStateThreadCount).totalStartedThreadCount(threadMXBean.getTotalStartedThreadCount()).build();}
}
3.3 JVMService 服务的测试
为了保证代码的质量,这里需要进行测试,我们把服务的加载放到premain里面,也就是接着配置信息加载的后面,在premain方法继续添加代码:
//2. 初始化服务
try {ServiceManager.INSTANCE.boot();
} catch (Exception e) {Logs.err(AgentMain.class, "AgentMain启动失败,服务初始化失败", e);return;
}
到这里我们还不能测试,我们还要开发KafkaProducerManager服务,所以我们要注释掉JVMService里面的KafkaProducerManager ,将kafka发送改成打印日志:
//kafkaProducerManager.send(metric);Logs.info(JVMService.class , "kafka发送>>" + metric);
配置文件增加采集频率配置:
# 10s 采集一次 CPU,内存, 线程信息
Agent.jvmCollectIntervalSecond = 10
重新打包apm-agent-core,启动测试:
kafka发送>>JVMMetric(cpu=CPU(logicalProcessorCount=24, sysCpu=0.64%, userCpu=0%, iowaitCpu=0%, idleCpu=99.36%), HeapMemory=Memory(init=264241152, used=39600608, committed=253231104, max=3750756352), nonHeapMemory=Memory(init=2555904, used=43875616, committed=46727168, max=-1), threadInfo=ThreadInfo(threadCount=16, daemonThreadCount=12, totalStartedThreadCount=24, peakThreadCount=17, runnableStateThreadCount=7, blockedStateThreadCount=0, waitingStateThreadCount=3, timedWaitingStateThreadCount=6))
每隔10秒就会打印出系统信息。
3.4 KafkaProducerManager服务发送采集数据到后端OAP
如果一条链路起点入口被采样到了,那么它后续的创建的相关TraceSegment数据(这里不懂没关系,后面会讲到链路部分)都必需要采样到,也就是要保证采集数据一定要发送到后端OAP。
调用链的插桩插件是拦截了微服务所有的接口请求,框架方法的调用,所以还一定要保证高吞吐量。链路的采集一定不能阻塞影响微服务的正常接口和框架方法。
以上问题都可以通过kafka来规避解决。
Kafka的优点
高吞吐量和低延迟
Kafka能够处理每秒数百万条消息,具有极低的延迟,使得它非常适合处理大量实时数据,如日志收集、指标监控和事件流处理等应用场景。
可伸缩性
Kafka的设计理念是通过分布式架构来实现高度的可伸缩性。它可以轻松地扩展到成千上万的生产者和消费者,以应对不断增长的数据流量和工作负载。
持久性和可靠性
Kafka将所有的消息持久化存储在磁盘上,确保数据不会丢失。它采用多副本机制,使得数据可以在集群中的多个节点间进行复制,提供故障容忍和高可用性。
容错性
Kafka具备高度的容错性,即使在节点故障的情况下仍能保持数据的可靠传输。当集群中的某个节点失效时,生产者和消费者可以自动重定向到其他可用节点,确保消息的连续性。
多语言支持:Kafka提供了丰富的客户端API,支持多种编程语言,如Java、Python、Go和Scala等,使得开发者能够轻松地将Kafka集成到他们的应用程序中。
异步处理
Kafka支持异步处理模式,允许生产者和消费者之间以异步方式进行通信。这使得后端的业务流程可以并行执行,提高处理效率。
搭建kafka不在本书的讨论之内,所以默认已经搭建好了kafka集群。
接下来我们就来实现kafka发送服务,在apm-agent-core模块里面新建类:com.hadluo.apm.agentcore.service.KafkaProducerManager
public class KafkaProducerManager implements BootService {// 真正的kafka生产者private volatile KafkaProducer<String, String> producer;@Overridepublic void prepare() throws Throwable {Properties props = new Properties();props.put("bootstrap.servers", Config.Bootstrap.servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);producer = new KafkaProducer<>(props);Logs.debug(getClass() , "hadluo-apm初始化kafka producer成功, bootstrap.servers : " + Config.Bootstrap.servers);}@Overridepublic void boot() throws Throwable {}public void send(BaseMsg msg){ProducerRecord<String, String> record = new ProducerRecord<>(Config.Bootstrap.topic, Json.encodeAsString(msg));producer.send(record) ;Logs.info(getClass() , "kafka发送>>" + Json.encodeAsString(msg));}@Overridepublic void onComplete() throws Throwable {}@Overridepublic void shutdown() throws Throwable {producer.flush();producer.close();}@Overridepublic int priority() {// 最先执行的return Integer.MAX_VALUE;}
}
在prepare中,初始化了KafkaProducer kafka生产者。提供了对外send方法,通过KafkaProducer来向kafka发送消息。服务的排序值拉到最大,也就是最先执行。
代码完成之后,不要忘记加入SPI服务的声明
META-INF/services/com.hadluo.apm.agentcore.service.BootService
3.5 KafkaProducerManager服务测试
启动kafka集群服务,修改配置文件的kafka地址配置,然后将JVMService里面的kafka发送代码打开。重新打包apm-agent-core,启动测试。
Kafka topic里面的数据可以用Offset Explorer工具查看
可以发现,数据都已经到第0个分区里面来了,格式如下:
{"msgTypeClass":"com.hadluo.apm.commons.kafka.JVMMetric","sampleTime":1732616181427,"serviceName":"smartapm-test","serviceInstance":"4b33ad57aef449b18ec09a1f83432dd5@192.168.2.86","cpu":{"logicalProcessorCount":24,"sysCpu":"0%","userCpu":"0%","iowaitCpu":"0%","idleCpu":"100%"},"nonHeapMemory":{"init":2555904,"used":51309296,"committed":54722560,"max":-1},"threadInfo":{"threadCount":18,"daemonThreadCount":14,"totalStartedThreadCount":27,"peakThreadCount":19,"runnableStateThreadCount":9,"blockedStateThreadCount":0,"waitingStateThreadCount":3,"timedWaitingStateThreadCount":6},"heapMemory":{"init":264241152,"used":65098848,"committed":323485696,"max":3750756352}}
3.6 SamplingService采样率控制服务
熟悉链路监控的都知道不是微服务的每一个请求都会被采集监控,而是有一个采样率的配置,比如采样率配置为20 ,就代表3秒内,允许最多20次链路采样,超过的链路就就会被丢弃,这个控制就是SamplingService服务来实现的。这样做的好处就是降低对微服务正常接口的影响。
要实现上述逻辑并不难,首先定义一个全局计数器初始值为0,每一条链路采集时会判断计数器是否超过20,如果超过则抛弃链路,如果不超过则加加计数器。于此同时后台有一个3秒种执行定时器,定时将计数器清零,就实现了上述采样率控制逻辑,值得注意的是一定要保证线程安全。
在apm-commoms项目下新建类:
com.hadluo.apm.commons.trace.SamplingService
public class SamplingService implements BootService {// 清零 定时器private volatile ScheduledExecutorService scheduledFuture;// 全局计数器private volatile AtomicInteger count = new AtomicInteger(0);// 重置计数器private void restCount(){count.set(0);}@Overridepublic void boot() throws Throwable {scheduledFuture = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("SamplingService"));scheduledFuture.scheduleAtFixedRate(this::restCount, 0,3, TimeUnit.SECONDS);}// 尝试采样方法public synchronized boolean trySampling(){// 获取 当前 计数器int c = count.get();if(c < Integer.parseInt(Config.Agent.samplingRate)){// 通过// 计数器 加加count.incrementAndGet();return true;}return false;}@Overridepublic void shutdown() throws Throwable {if (scheduledFuture != null) {scheduledFuture.shutdown();}}
}
在boot方法中 的开启了一个每隔3秒会执行restCount方法的定时器,在restCount中会重置计数器为0。
trySampling对外方法就是后面链路采集时会调用,用来判断该链路是否应该被采集,一定要保证多线程对全局计数器的访问安全。
代码写好后,不要忘记把服务加到SPI的接口配置文件里面。
3.7 SamplingService服务的测试
为了测试方便,我们先把前面的JVMService服务的上报逻辑先注释掉。不然会打印很多干扰数据。
在premain方法里面增加模拟调用链调用的测试代码:
// 测试采样服务
ExecutorService executorService = Executors.newFixedThreadPool(50);
SamplingService service = ServiceManager.INSTANCE.getService(SamplingService.class);
for (int i=0;i<10000;i++) {executorService.execute(()->{boolean ret = service.trySampling();if(ret){String sdf = new SimpleDateFormat("HH:mm:ss").format(new Date());System.out.println(sdf + ">> 通过采样");}try {Thread.sleep(10);} catch (InterruptedException e) {}});
}
修改采样率配置为5
#采样率: 3秒钟之内允许通过的链路数为5, 3秒后会重置
Agent.samplingRate = 5
重新打包agent-core,启动测试结果:
每次通过的采样数是配置的5个。
3.8 服务内部资源的清理
前面介绍的BootService服务有一个shutdown生命周期的方法,用来清除各自服务的内部资源组件,比如定时器,kafka Producer等。
在实现具体服务时,都有对shutdown方法的实现,但是还没有对shutdown方法发起调用。
Java 语言提供一种 ShutdownHook(钩子)进制,当 JVM 接受到系统的关闭通知之后,调用 ShutdownHook 内的方法,用以完成清理操作,从而平滑的退出应用。
注册JVM关闭钩子回调,然后进行服务的shutdown调用。
在premain的结尾处,添加代码:
//注册关闭构子
Runtime.getRuntime().addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "hadluo-apm shutdown thread"));
3.9 本章总结
本章主要讲解了如何通过SPI服务将各种模块功能进行服务化,解耦了开发代码。还介绍了服务的 “生命周期”,“排序”,以及通过“服务管理器(ServiceManager)统一管理各个服务” 等的设计思想。后续开发可能还会新建很多服务,原理都是类似。
本章具体介绍的服务有“JVMService” , “KafkaProducerManager”,“SamplingService”, 它们都是调用链的基石,通过这些服务完成了基本的采集机器资源的功能。
本章还介绍到了JVM关闭钩子回调ShutdownHook ,用来清理各个服务内部的资源组件,这也是各大框架通用的清理资源思想。