PDF书籍《手写调用链监控APM系统-Java版》第4章 SPI服务模块化系统

embedded/2024/12/27 13:03:35/

本人阅读了 Skywalking 的大部分核心代码,也了解了相关的文献,对此深有感悟,特此借助巨人的思想自己手动用JAVA语言实现了一个 “调用链监控APM” 系统。本书采用边讲解实现原理边编写代码的方式,看本书时一定要跟着敲代码。

作者已经将过程写成一部书籍,奈何没有钱发表,如果您知道渠道可以联系本人。一定重谢。

本书涉及到的核心技术与思想

JavaAgent , ByteBuddy,SPI服务,类加载器的命名空间,增强JDK类,kafka,插件思想,切面,链路栈等等。实际上远不止这么多,差不多贯通了整个java体系。

适用人群

自己公司要实现自己的调用链的;写架构的;深入java编程的;阅读Skywalking源码的;

版权

本书是作者呕心沥血亲自编写的代码,不经同意切勿拿出去商用,否则会追究其责任。

原版PDF+源码请见:

本章涉及到的工具类也在下面:

PDF书籍《手写调用链监控APM系统-Java版》第1章 开篇介绍-CSDN博客

第3章 SPI服务模块化系统

大名鼎鼎的调用链监控Skywalking将很多功能抽象成一个一个的Java SPI服务(BootService),每个服务都有生命周期方法,prepare,boot,shutdown等。这些服务还有排序值属性,用来指定哪个服务最先执行生命周期prepare,boot,shutdown。

这些服务都交由ServiceManager进行管理,用这种SPI服务模式进行了高度解耦和统一管理,着实是一种好的设计理念。

比如JVM信息收集,链路数据收集,链路数据的kafka发送到后端,都可以抽象成BootService服务,显而易见kafka发送服务是要最先初始化的,所以排序值也起了作用。

Java SPI服务

Java SPI(Service Provider Interface)是Java官方提供的一种服务发现机制,它允许在运行时动态地加载实现特定接口的类,而不需要在代码中显式地指定该类,从而实现解耦和灵活性。用法就是在/META-INF/services建立接口文件,里面内容为具体的实现。

3.1 SPI服务架构的建立

首先我们需要一个顶层SPI接口,这个接口我们在apm-commons项目中新建类:

com.hadluo.apm.commons.trace.BootService

public interface BootService {// 生命周期方法void prepare() throws Throwable;// 生命周期方法void boot() throws Throwable;// 生命周期方法void onComplete() throws Throwable;// 生命周期方法void shutdown() throws Throwable;/*** 值越大,越先执行* @return*/default int priority() {return 0;}
}

生命周期设计是从上到下依次执行,排序值默认都是0,有具体服务组件重写的,值越大越先执行。

然后需要一个服务管理器来管理所有服务,在apm-commons项目下新建类:

com.hadluo.apm.commons.trace.ServiceManager

public enum ServiceManager {// 枚举单例INSTANCE;// 存储 所有的 SPI 服务容器private Map<Class<?>, BootService> services = new HashMap<>();public void boot() {// 加载所有的服务this.services = loadAllServiceList();// 调用生命周期this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.prepare();} catch (Throwable e) {Logs.err(ServiceManager.class , "prepare error", e);}});this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.boot();} catch (Throwable e) {Logs.err(ServiceManager.class , "boot error", e);}});this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.onComplete();} catch (Throwable e) {Logs.err(ServiceManager.class , "onComplete error", e);}});}public void shutdown() {this.services.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {try {service.shutdown();} catch (Throwable e) {Logs.err(ServiceManager.class , "shutdown error", e);}});}public <T> T getService(Class<T> clazz) {return (T) this.services.get(clazz);}private Map<Class<?>, BootService> loadAllServiceList() {Map<Class<?>, BootService> services = new HashMap<>();// SPI 加载ServiceLoader<BootService> load = ServiceLoader.load(BootService.class);for (BootService service : load) {services.put(service.getClass(), service);}return services;}
}

ServiceManager 采用枚举单例写法,参考 Effective Java 书中设计。

boot方法为初始化方法,在里面首先通过 loadAllServiceList 加载了所有服务,然后排依次调用其生命周期方法。shutdown为对外提供的方法,是注册jvm退出钩子时主动要调用的。

loadAllServiceList方法就是通过SPI技术从/META-INF/services路径下找到服务接口声明文件,从而得到了配置的服务。

在apm-agent-core项目下的resource下面新建/META-INF/services 目录,然后在新建接口服务文件:

文件名称就是BootService的类全名称,内容我们暂时先定义一个JVMService服务,下面会讲解到很多服务组件。

3.2 JVMService收集内存,CPU,线程信息

这个JVMService服务组件是用来收集当前微服务的jvm内存,CPU等系统资源信息,然后发送到后端监控展示。下面我们来实现这个服务。

在apm-agent-core项目下新建类:

com.hadluo.apm.agentcore.service.jvm.JVMService

public class JVMService implements BootService {// 定时器private ScheduledExecutorService executorService ;// kafka 发送到后端的 kafka发送服务private KafkaProducerManager kafkaProducerManager;@Overridepublic void prepare() throws Throwable {this.kafkaProducerManager = ServiceManager.INSTANCE.getService(KafkaProducerManager.class) ;}@Overridepublic void boot() throws Throwable {}@Overridepublic void onComplete() throws Throwable {}@Overridepublic void shutdown() throws Throwable {// 关闭定时器executorService.shutdown();}
}

由于是服务组件,所以必须实现BootService接口,还有生命周期方法。

调用链系统采集到的CPU,内存等信息都需要是近实时更新的,所以需要一个定时器,每隔N秒去采集系统信息然后上报到后端。

ScheduledExecutorService 为线程池实现的定时服务,在boot中,我们可以进行初始化,boot新增代码:

@Override
public void boot() throws Throwable {executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("JVMService-Collect"));executorService.scheduleAtFixedRate(()->{// 采集jvm信息JVMMetric metric = collectJVM() ;// 发送到kafkakafkaProducerManager.send(metric);} ,10, Integer.parseInt(Config.Agent.jvmCollectIntervalSecond), TimeUnit.SECONDS );
}

为了便于查找问题,有一个好习惯就是开的线程自定定义一个名字,上面的NamedThreadFactory 就是为了给定时器线程取名的存在,在apm-commons项目下新建类:

com.hadluo.apm.commons.trace.NamedThreadFactory

public class NamedThreadFactory implements ThreadFactory {private final String name;private final AtomicInteger threadNumber = new AtomicInteger(1);public NamedThreadFactory(String name) {this.name = name;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, name + "-" + threadNumber.getAndIncrement());t.setDaemon(true);return t;}
}

定时器10秒后启动,以后每隔配置Config.JVM.jvmCollectIntervalSecond 秒去执行 collectJVM 收集系统资源信息返回到JVMMetric实体对象里面,然后通过kafka发送服务发送到kafka broker让后端进行消费。

配置系统之前就讲过,不在说明,自己在Config.JVM中添加jvmCollectIntervalSecond。

collectJVM 方法为真实采集当前机器和jvm信息。代码如下:

private JVMMetric collectJVM() {JVMMetric metric = new JVMMetric();// 设置基础信息metric.setServiceInstance(Config.Agent.serviceInstance);metric.setServiceName(Config.Agent.serviceName);metric.setMsgTypeClass(JVMMetric.class.getName());//设置 cpu信息metric.setCpu(CPUProvider.INSTANCE.getCPUMetrics());// 设置堆内存信息metric.setHeapMemory(MemoryProvider.INSTANCE.getHeapMemoryMetrics());// 设置非堆内存信息metric.setNonHeapMemory(MemoryProvider.INSTANCE.getNonHeapMemoryMetrics());// 线程信息metric.setThreadInfo(ThreadProvider.INSTANCE.getThreadMetrics());return metric ;
}

这里每个系统指标都是通过XxxProvider进行收集,最后存放到一个kafka能发送的实体bean对象里面。这个对象应该是公共的,后端OAP也要用,于是我们要放到apm-commoms模块里面。

在apm-commoms里面新建类:

com.hadluo.apm.commons.kafka.JVMMetric

@Data  // lombok注解,自动生成get set
public class JVMMetric extends BaseMsg {// cpu信息private CPU cpu;// 堆内存private Memory HeapMemory;// 非堆内存private Memory nonHeapMemory;// 线程private ThreadInfo threadInfo;
}

每个发送到kafka的bean对象都有一个公共的基类BaseMsg ,在apm-commoms里面新建类:

com.hadluo.apm.commons.kafka.BaseMsg

@Data
public class BaseMsg {// 消息类型的classprivate String msgTypeClass;// 采样时间final long sampleTime = System.currentTimeMillis();// agent的服务名称private String serviceName ;private String serviceInstance ;
}

也就是说,每次上报给后端OAP的数据都包含这4个公共属性。

CPU实体代码,在apm-commoms里面新建类:

com.hadluo.apm.commons.kafka.CPU

@Data
@Builder  // lombok注解,提供构建器模式的初始化对象
public class CPU{// cpu核心数long logicalProcessorCount;//cpu系统使用率String sysCpu;//cpu用户使用率String userCpu;//cpu当前等待率String iowaitCpu;//cpu当前空闲率String idleCpu;
}

Memory实体代码,在apm-commoms里面新建类:

com.hadluo.apm.commons.kafka.Memory

@Builder
@Data
public class Memory {
// 初始内存private final long init;
// 已使用内存private final long used;
//提交内存private final long committed;
// 最大private final long max;
}

ThreadInfoy实体代码,在apm-commoms里面新建类:

com.hadluo.apm.commons.kafka.ThreadInfo

@Data
@Builder
public class ThreadInfo {//当前活动线程数,包括守护线程和非守护线程int threadCount;//守护线程数int daemonThreadCount;// 自Java虚拟机启动以来创建和启动的线程总数。long totalStartedThreadCount;// jvm 启动以来 的活动线程数的峰值int peakThreadCount;// runable状态的线程数int runnableStateThreadCount;// block状态的线程数int blockedStateThreadCount ;// waiting状态的线程数int waitingStateThreadCount;// time wait 状态的线程数int timedWaitingStateThreadCount;
}

到此我们实体已经全部建立完成,下面介绍各个Provider的具体实现。

CPUProvider获取cpu信息

这里需要借助github开源的oshi工具,POM文件已经在前面建立框架时依赖进去了,直接在apm-agent-core项目下新建类:

com.hadluo.apm.agentcore.service.jvm.CPUProvider

public enum CPUProvider {INSTANCE ;private final SystemInfo systemInfo = new SystemInfo();public CPU getCPUMetrics() {// 使用 oshi 工具 获取 cpu信息CentralProcessor processor = systemInfo.getHardware().getProcessor();//cpulong[] prevTicks = processor.getSystemCpuLoadTicks();long[] ticks = processor.getSystemCpuLoadTicks();long nice = ticks[CentralProcessor.TickType.NICE.getIndex()]- prevTicks[CentralProcessor.TickType.NICE.getIndex()];long irq = ticks[CentralProcessor.TickType.IRQ.getIndex()]- prevTicks[CentralProcessor.TickType.IRQ.getIndex()];long softirq = ticks[CentralProcessor.TickType.SOFTIRQ.getIndex()]- prevTicks[CentralProcessor.TickType.SOFTIRQ.getIndex()];long steal = ticks[CentralProcessor.TickType.STEAL.getIndex()]- prevTicks[CentralProcessor.TickType.STEAL.getIndex()];long cSys = ticks[CentralProcessor.TickType.SYSTEM.getIndex()]- prevTicks[CentralProcessor.TickType.SYSTEM.getIndex()];long user = ticks[CentralProcessor.TickType.USER.getIndex()]- prevTicks[CentralProcessor.TickType.USER.getIndex()];long iowait = ticks[CentralProcessor.TickType.IOWAIT.getIndex()]- prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()];long idle = ticks[CentralProcessor.TickType.IDLE.getIndex()]- prevTicks[CentralProcessor.TickType.IDLE.getIndex()];long totalCpu = user + nice + cSys + idle + iowait + irq + softirq + steal;// cpu核心数long logicalProcessorCount = processor.getLogicalProcessorCount();//cpu系统使用率String sysCpu = new DecimalFormat("#.##%").format(cSys * 1.0 / totalCpu);//cpu用户使用率String userCpu = new DecimalFormat("#.##%").format(user * 1.0 / totalCpu);//cpu当前等待率String iowaitCpu = new DecimalFormat("#.##%").format(iowait * 1.0 / totalCpu);//cpu当前空闲率String idleCpu = new DecimalFormat("#.##%").format(idle * 1.0 / totalCpu);return CPU.builder().logicalProcessorCount(logicalProcessorCount).idleCpu(idleCpu).sysCpu(sysCpu).userCpu(userCpu).iowaitCpu(iowaitCpu).build();}
}

逻辑我就不讲了,毕竟这就是一个工具类而已。

MemoryProvider获取JVM内存信息

这里需要借助java.lang提供的ManagementFactory工具获取JVM的堆内存和非堆内存。

在apm-agent-core项目下新建类:

com.hadluo.apm.agentcore.service.jvm.MemoryProvider

public enum MemoryProvider {INSTANCE;private final MemoryMXBean memoryMXBean =  ManagementFactory.getMemoryMXBean();public Memory getNonHeapMemoryMetrics(){MemoryUsage nonHeapMemoryUsage = memoryMXBean.getNonHeapMemoryUsage();return Memory.builder().init(nonHeapMemoryUsage.getInit()).used(nonHeapMemoryUsage.getUsed()).committed(nonHeapMemoryUsage.getCommitted()).max(nonHeapMemoryUsage.getMax()).build();}public Memory getHeapMemoryMetrics() {MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();return Memory.builder().init(heapMemoryUsage.getInit()).used(heapMemoryUsage.getUsed()).committed(heapMemoryUsage.getCommitted()).max(heapMemoryUsage.getMax()).build();}
}

ThreadProvider获取线程信息

也需要借助java.lang提供的ManagementFactory工具获取JVM线程信息。

在apm-agent-core项目下新建类:

com.hadluo.apm.agentcore.service.jvm.ThreadProvider

public enum ThreadProvider {INSTANCE;private final ThreadMXBean threadMXBean;ThreadProvider() {this.threadMXBean = ManagementFactory.getThreadMXBean();}public ThreadInfo getThreadMetrics() {int runnableStateThreadCount = 0;int blockedStateThreadCount = 0;int waitingStateThreadCount = 0;int timedWaitingStateThreadCount = 0;java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 0);if (threadInfos != null) {for (java.lang.management.ThreadInfo threadInfo : threadInfos) {if (threadInfo == null) {continue;}switch (threadInfo.getThreadState()) {case RUNNABLE:runnableStateThreadCount++;break;case BLOCKED:blockedStateThreadCount++;break;case WAITING:waitingStateThreadCount++;break;case TIMED_WAITING:timedWaitingStateThreadCount++;break;default:break;}}}return ThreadInfo.builder().threadCount(threadMXBean.getThreadCount()).daemonThreadCount(threadMXBean.getDaemonThreadCount()).peakThreadCount(threadMXBean.getPeakThreadCount()).runnableStateThreadCount(runnableStateThreadCount).blockedStateThreadCount(blockedStateThreadCount).waitingStateThreadCount(waitingStateThreadCount).timedWaitingStateThreadCount(timedWaitingStateThreadCount).totalStartedThreadCount(threadMXBean.getTotalStartedThreadCount()).build();}
}

3.3 JVMService 服务的测试

为了保证代码的质量,这里需要进行测试,我们把服务的加载放到premain里面,也就是接着配置信息加载的后面,在premain方法继续添加代码:

//2. 初始化服务
try {ServiceManager.INSTANCE.boot();
} catch (Exception e) {Logs.err(AgentMain.class, "AgentMain启动失败,服务初始化失败", e);return;
}

到这里我们还不能测试,我们还要开发KafkaProducerManager服务,所以我们要注释掉JVMService里面的KafkaProducerManager ,将kafka发送改成打印日志:

//kafkaProducerManager.send(metric);Logs.info(JVMService.class , "kafka发送>>" + metric);

配置文件增加采集频率配置:

# 10s 采集一次 CPU,内存, 线程信息
Agent.jvmCollectIntervalSecond = 10

重新打包apm-agent-core,启动测试:

kafka发送>>JVMMetric(cpu=CPU(logicalProcessorCount=24, sysCpu=0.64%, userCpu=0%, iowaitCpu=0%, idleCpu=99.36%), HeapMemory=Memory(init=264241152, used=39600608, committed=253231104, max=3750756352), nonHeapMemory=Memory(init=2555904, used=43875616, committed=46727168, max=-1), threadInfo=ThreadInfo(threadCount=16, daemonThreadCount=12, totalStartedThreadCount=24, peakThreadCount=17, runnableStateThreadCount=7, blockedStateThreadCount=0, waitingStateThreadCount=3, timedWaitingStateThreadCount=6))

每隔10秒就会打印出系统信息。

3.4 KafkaProducerManager服务发送采集数据到后端OAP

如果一条链路起点入口被采样到了,那么它后续的创建的相关TraceSegment数据(这里不懂没关系,后面会讲到链路部分)都必需要采样到,也就是要保证采集数据一定要发送到后端OAP。

调用链的插桩插件是拦截了微服务所有的接口请求,框架方法的调用,所以还一定要保证高吞吐量。链路的采集一定不能阻塞影响微服务的正常接口和框架方法。

以上问题都可以通过kafka来规避解决。

Kafka的优点

高吞吐量和低延迟

Kafka能够处理每秒数百万条消息,具有极低的延迟,使得它非常适合处理大量实时数据,如日志收集、指标监控和事件流处理等应用场景。

可伸缩性

Kafka的设计理念是通过分布式架构来实现高度的可伸缩性。它可以轻松地扩展到成千上万的生产者和消费者,以应对不断增长的数据流量和工作负载。

持久性和可靠性

Kafka将所有的消息持久化存储在磁盘上,确保数据不会丢失。它采用多副本机制,使得数据可以在集群中的多个节点间进行复制,提供故障容忍和高可用性。

容错性

Kafka具备高度的容错性,即使在节点故障的情况下仍能保持数据的可靠传输。当集群中的某个节点失效时,生产者和消费者可以自动重定向到其他可用节点,确保消息的连续性。

多语言支持:Kafka提供了丰富的客户端API,支持多种编程语言,如Java、Python、Go和Scala等,使得开发者能够轻松地将Kafka集成到他们的应用程序中。

异步处理

Kafka支持异步处理模式,允许生产者和消费者之间以异步方式进行通信。这使得后端的业务流程可以并行执行,提高处理效率。

搭建kafka不在本书的讨论之内,所以默认已经搭建好了kafka集群。

接下来我们就来实现kafka发送服务,在apm-agent-core模块里面新建类:com.hadluo.apm.agentcore.service.KafkaProducerManager

public class KafkaProducerManager implements BootService {// 真正的kafka生产者private volatile KafkaProducer<String, String> producer;@Overridepublic void prepare() throws Throwable {Properties props = new Properties();props.put("bootstrap.servers", Config.Bootstrap.servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);producer = new KafkaProducer<>(props);Logs.debug(getClass() , "hadluo-apm初始化kafka producer成功, bootstrap.servers : " + Config.Bootstrap.servers);}@Overridepublic void boot() throws Throwable {}public void send(BaseMsg msg){ProducerRecord<String, String> record = new ProducerRecord<>(Config.Bootstrap.topic, Json.encodeAsString(msg));producer.send(record) ;Logs.info(getClass() , "kafka发送>>" + Json.encodeAsString(msg));}@Overridepublic void onComplete() throws Throwable {}@Overridepublic void shutdown() throws Throwable {producer.flush();producer.close();}@Overridepublic int priority() {// 最先执行的return Integer.MAX_VALUE;}
}

在prepare中,初始化了KafkaProducer kafka生产者。提供了对外send方法,通过KafkaProducer来向kafka发送消息。服务的排序值拉到最大,也就是最先执行。

代码完成之后,不要忘记加入SPI服务的声明

META-INF/services/com.hadluo.apm.agentcore.service.BootService

3.5 KafkaProducerManager服务测试

启动kafka集群服务,修改配置文件的kafka地址配置,然后将JVMService里面的kafka发送代码打开。重新打包apm-agent-core,启动测试。

Kafka topic里面的数据可以用Offset Explorer工具查看

可以发现,数据都已经到第0个分区里面来了,格式如下:

{"msgTypeClass":"com.hadluo.apm.commons.kafka.JVMMetric","sampleTime":1732616181427,"serviceName":"smartapm-test","serviceInstance":"4b33ad57aef449b18ec09a1f83432dd5@192.168.2.86","cpu":{"logicalProcessorCount":24,"sysCpu":"0%","userCpu":"0%","iowaitCpu":"0%","idleCpu":"100%"},"nonHeapMemory":{"init":2555904,"used":51309296,"committed":54722560,"max":-1},"threadInfo":{"threadCount":18,"daemonThreadCount":14,"totalStartedThreadCount":27,"peakThreadCount":19,"runnableStateThreadCount":9,"blockedStateThreadCount":0,"waitingStateThreadCount":3,"timedWaitingStateThreadCount":6},"heapMemory":{"init":264241152,"used":65098848,"committed":323485696,"max":3750756352}}

3.6 SamplingService采样率控制服务

熟悉链路监控的都知道不是微服务的每一个请求都会被采集监控,而是有一个采样率的配置,比如采样率配置为20 ,就代表3秒内,允许最多20次链路采样,超过的链路就就会被丢弃,这个控制就是SamplingService服务来实现的。这样做的好处就是降低对微服务正常接口的影响。

要实现上述逻辑并不难,首先定义一个全局计数器初始值为0,每一条链路采集时会判断计数器是否超过20,如果超过则抛弃链路,如果不超过则加加计数器。于此同时后台有一个3秒种执行定时器,定时将计数器清零,就实现了上述采样率控制逻辑,值得注意的是一定要保证线程安全。

在apm-commoms项目下新建类:

com.hadluo.apm.commons.trace.SamplingService

public class SamplingService implements BootService {// 清零 定时器private volatile ScheduledExecutorService scheduledFuture;// 全局计数器private volatile AtomicInteger count = new AtomicInteger(0);// 重置计数器private void restCount(){count.set(0);}@Overridepublic void boot() throws Throwable {scheduledFuture = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("SamplingService"));scheduledFuture.scheduleAtFixedRate(this::restCount, 0,3, TimeUnit.SECONDS);}// 尝试采样方法public synchronized boolean trySampling(){// 获取 当前 计数器int c = count.get();if(c < Integer.parseInt(Config.Agent.samplingRate)){// 通过// 计数器 加加count.incrementAndGet();return true;}return false;}@Overridepublic void shutdown() throws Throwable {if (scheduledFuture != null) {scheduledFuture.shutdown();}}
}

在boot方法中 的开启了一个每隔3秒会执行restCount方法的定时器,在restCount中会重置计数器为0。

trySampling对外方法就是后面链路采集时会调用,用来判断该链路是否应该被采集,一定要保证多线程对全局计数器的访问安全。

代码写好后,不要忘记把服务加到SPI的接口配置文件里面。

3.7 SamplingService服务的测试

为了测试方便,我们先把前面的JVMService服务的上报逻辑先注释掉。不然会打印很多干扰数据。

在premain方法里面增加模拟调用链调用的测试代码:

// 测试采样服务
ExecutorService executorService = Executors.newFixedThreadPool(50);
SamplingService service = ServiceManager.INSTANCE.getService(SamplingService.class);
for (int i=0;i<10000;i++) {executorService.execute(()->{boolean ret = service.trySampling();if(ret){String sdf = new SimpleDateFormat("HH:mm:ss").format(new Date());System.out.println(sdf + ">> 通过采样");}try {Thread.sleep(10);} catch (InterruptedException e) {}});
}

修改采样率配置为5

#采样率: 3秒钟之内允许通过的链路数为5, 3秒后会重置
Agent.samplingRate = 5

重新打包agent-core,启动测试结果:

每次通过的采样数是配置的5个。

3.8 服务内部资源的清理

前面介绍的BootService服务有一个shutdown生命周期的方法,用来清除各自服务的内部资源组件,比如定时器,kafka Producer等。

在实现具体服务时,都有对shutdown方法的实现,但是还没有对shutdown方法发起调用。

Java 语言提供一种 ShutdownHook(钩子)进制,当 JVM 接受到系统的关闭通知之后,调用 ShutdownHook 内的方法,用以完成清理操作,从而平滑的退出应用。

注册JVM关闭钩子回调,然后进行服务的shutdown调用。

在premain的结尾处,添加代码:

//注册关闭构子
Runtime.getRuntime().addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "hadluo-apm shutdown thread"));

3.9 本章总结

本章主要讲解了如何通过SPI服务将各种模块功能进行服务化,解耦了开发代码。还介绍了服务的 “生命周期”,“排序”,以及通过“服务管理器(ServiceManager)统一管理各个服务” 等的设计思想。后续开发可能还会新建很多服务,原理都是类似。

本章具体介绍的服务有“JVMService” , “KafkaProducerManager”,“SamplingService”, 它们都是调用链的基石,通过这些服务完成了基本的采集机器资源的功能。

本章还介绍到了JVM关闭钩子回调ShutdownHook ,用来清理各个服务内部的资源组件,这也是各大框架通用的清理资源思想。


http://www.ppmy.cn/embedded/149178.html

相关文章

D类音频应用EMI管理

1、前言 对于EMI&#xff0c;首先需要理解天线。频率和波长之间的关系&#xff0c;如下图所示。   作为有效天线所需的最短长度是λ/4。在空气中&#xff0c;介电常数是1&#xff0c;但是在FR4或玻璃环氧PCB的情况下&#xff0c;介电常数大约4.8。这种效应会导致信号在FR4材…

基于遥感与通信技术的灾害应急测绘

基于遥感与通信技术的灾害应急测绘研究评述与展望 摘要 本研究围绕基于遥感与通信技术的灾害应急测绘展开&#xff0c;深入探讨其在灾害管理中的重要性及当前发展现状。遥感技术凭借高分辨率、广覆盖的特性&#xff0c;已成为获取灾害信息的核心手段。结合5G通信与低轨卫星技术…

英语单词拼读小程序开发制作介绍

英语单词拼读小程序开发制作介绍本英语单词拼读小程序系统开发的主要功能有&#xff1a; 1、按年级分类展示每个年级阶段的英语单词信息。 2、点击选择的单词进入单词拼读页面&#xff0c;展示英语单词的拼读音标、中文意思、单词发音、拆分词汇发音、用户通过朗读发音对比。通…

在Ubuntu上通过Docker部署NGINX服务器

Yo伙计们&#xff0c;今天我们要讨论的话题是如何在Ubuntu系统上通过Docker来部署NGINX服务器。NGINX是一个高性能的Web服务器&#xff0c;适合处理静态内容、反向代理和负载均衡。想要搞定这个家伙&#xff0c;就跟着我来吧&#xff01; Docker和NGINX简介 让我来简要介绍一下…

文件解析漏洞中间件(iis和Apache)

IIS解析漏洞 IIS6.X #环境 Windows Server 2003 在iis6.x中&#xff0c;.asp文件夹中的任意文件都会被当做asp文件去执行 在默认网站里创建一个a.asp文件夹并创建一个1.jpg写进我们的asp代码 <%now()%> #asp一句话 <%eval request("h")%> 单独创建一…

Ubuntu系统部署Mysql8.0后设置不区分大小写

部署MySQL # 更新系统软件包列表 sudo apt update# 安装MySQL Server sudo apt install mysql-server# 在安装时&#xff0c;系统会自动进行初始化&#xff0c;安装完成后MySQL已经处于运行状态# MySQL常见命令 #启动MySQL sudo systemctl start mysql#停止MySQL sudo systemc…

【微信小程序】3|首页搜索框 | 我的咖啡店-综合实训

首页-搜索框-跳转 引言 在微信小程序中&#xff0c;首页的搜索框是用户交互的重要入口。本文将通过“我的咖啡店”小程序的首页搜索框实现&#xff0c;详细介绍如何在微信小程序中创建和处理搜索框的交互。 1. 搜索函数实现 onClickInput函数在用户点击搜索框时触发&#x…

【仓颉语言体验】Hello World TCP客户端 C/C++ or Python

仓颉语言体验 总体上语法和C/C还是比较相近的&#xff0c;体验上更偏向Python。 注意代码后面没有分号哦&#xff1b; Hello World&#xff1a; main() {println("Hello World") }实现一个TCP客户端&#xff1a; import std.net.* // 导入网络模块 import std.conso…