RocketMq集成Sleuth
通过Aop的形式在使RocketMq能在生产端的时候携带tracId和spanId,在消费端获取前面二者并且基于tracId创建新的span,达到一个业务请求经过Mq传递后tracId仍然保持一致的目的
ps:Sleuth基于2.2.8版本
生产者
我们从当前span获取上下文,提取出tracId
和spanId
,并作为自定义参数放入Mq消息中传递到下游消费者
@Around("execution(* org.apache.rocketmq.client.producer.DefaultMQProducer.send(..))")
public Object traceProducerSend(ProceedingJoinPoint pjp) throws Throwable {// 创建 Sleuth 的 span 以跟踪消息发送tracer.nextSpan().name("rocketmq-producer-send").start();try {// 获取传入的参数Object[] args = pjp.getArgs();if (args != null && args.length > 0 && args[0] instanceof Message) {Message message = (Message) args[0];// 获取当前 traceIdSpan currentSpan = tracer.currentSpan();if (currentSpan != null) {TraceContext context = currentSpan.context();long tracedId = context.traceId();long spanId = context.spanId();Long parentId = context.parentId();// 将 traceId 添加到消息的属性中message.putUserProperty("traceId", tracedId+"");message.putUserProperty("spanId", spanId+"");}}} catch (Exception e) {log.warn("traceProducerSend", e);}return pjp.proceed();
}
消费者
这里假定是单条消费的场景
我们从自定义参数中获取tracId
,如果可以获取到则基于此构建一个traceContext
,然后基于后者构建一个新的span
,否则直接构建一个全新的span
@Around("execution(* org.apache.rocketmq.client.consumer.listener.MessageListener*.consumeMessage(..))")
public Object traceConsumerConsumeMessage(ProceedingJoinPoint pjp) throws Throwable {// 获取方法参数(消息列表)Object[] args = pjp.getArgs();List<MessageExt> messages = null;if (args != null && args.length > 0 && args[0] instanceof List) {messages = (List<MessageExt>) args[0];}// 如果有消息if (messages != null && !messages.isEmpty()) {// 从第一条消息中提取 traceIdString traceId = messages.get(0).getUserProperty("traceId");Span newSpan;if (StringUtils.isNotBlank(traceId)){TraceContext traceContext = TraceContext.newBuilder().traceId(Long.parseLong(traceId)).spanId(Long.parseLong(messages.get(0).getUserProperty("spanId"))).build();newSpan = tracer.nextSpan(TraceContextOrSamplingFlags.create(traceContext)).name("rocketmq-consume-message").start();}else{// 创建一个新的 spannewSpan = tracer.nextSpan().name("rocketmq-consume-message").start();}try (Tracer.SpanInScope ws = tracer.withSpanInScope(newSpan)) {// 执行原方法return pjp.proceed();} catch (Exception e) {log.warn("traceConsumerConsumeMessage", e);} finally {newSpan.finish();}}// 如果没有消息,直接执行原方法return pjp.proceed();
}
测试
我们可以看到生产者和消费者持有了相同的tracId
但拥有不同的spanId