RocketMq集成Sleuth

ops/2024/10/24 23:33:06/

RocketMq集成Sleuth

通过Aop的形式在使RocketMq能在生产端的时候携带tracId和spanId,在消费端获取前面二者并且基于tracId创建新的span,达到一个业务请求经过Mq传递后tracId仍然保持一致的目的

ps:Sleuth基于2.2.8版本

生产者

我们从当前span获取上下文,提取出tracIdspanId,并作为自定义参数放入Mq消息中传递到下游消费者

@Around("execution(* org.apache.rocketmq.client.producer.DefaultMQProducer.send(..))")
public Object traceProducerSend(ProceedingJoinPoint pjp) throws Throwable {// 创建 Sleuth 的 span 以跟踪消息发送tracer.nextSpan().name("rocketmq-producer-send").start();try {// 获取传入的参数Object[] args = pjp.getArgs();if (args != null && args.length > 0 && args[0] instanceof Message) {Message message = (Message) args[0];// 获取当前 traceIdSpan currentSpan = tracer.currentSpan();if (currentSpan != null) {TraceContext context = currentSpan.context();long tracedId = context.traceId();long spanId = context.spanId();Long parentId = context.parentId();// 将 traceId 添加到消息的属性中message.putUserProperty("traceId", tracedId+"");message.putUserProperty("spanId", spanId+"");}}} catch (Exception e) {log.warn("traceProducerSend", e);}return pjp.proceed();
}

消费者

这里假定是单条消费的场景

我们从自定义参数中获取tracId,如果可以获取到则基于此构建一个traceContext,然后基于后者构建一个新的span,否则直接构建一个全新的span

@Around("execution(* org.apache.rocketmq.client.consumer.listener.MessageListener*.consumeMessage(..))")
public Object traceConsumerConsumeMessage(ProceedingJoinPoint pjp) throws Throwable {// 获取方法参数(消息列表)Object[] args = pjp.getArgs();List<MessageExt> messages = null;if (args != null && args.length > 0 && args[0] instanceof List) {messages = (List<MessageExt>) args[0];}// 如果有消息if (messages != null && !messages.isEmpty()) {// 从第一条消息中提取 traceIdString traceId = messages.get(0).getUserProperty("traceId");Span newSpan;if (StringUtils.isNotBlank(traceId)){TraceContext traceContext = TraceContext.newBuilder().traceId(Long.parseLong(traceId)).spanId(Long.parseLong(messages.get(0).getUserProperty("spanId"))).build();newSpan = tracer.nextSpan(TraceContextOrSamplingFlags.create(traceContext)).name("rocketmq-consume-message").start();}else{// 创建一个新的 spannewSpan = tracer.nextSpan().name("rocketmq-consume-message").start();}try (Tracer.SpanInScope ws = tracer.withSpanInScope(newSpan)) {// 执行原方法return pjp.proceed();} catch (Exception e) {log.warn("traceConsumerConsumeMessage", e);} finally {newSpan.finish();}}// 如果没有消息,直接执行原方法return pjp.proceed();
}

测试

image-20241024222450905

我们可以看到生产者消费者持有了相同的tracId但拥有不同的spanId


http://www.ppmy.cn/ops/128177.html

相关文章

1024是什么日子

【1024程序员日数字编织梦想的赞歌】 在这个由二进制构建的宇宙里&#xff0c;每一行代码都是通往未来的桥梁&#xff0c;每一位程序员都是这浩瀚数字海洋中的航海家。今天&#xff0c;10月24日&#xff0c;不仅是一个简单的日期&#xff0c;它是属于我们的节日——程序员日&a…

Stability.AI 发布 SD3.5 模型,能否逆袭击败 FLUX?如何在ComfyUI中的使用SD3.5?

就在前天&#xff0c;Stability AI 正式发布了 Stable Diffusion 3.5版本&#xff0c;包括 3 款强大的模型&#xff1a; Stable Diffusion 3.5 Large&#xff1a;拥有 80 亿参数&#xff0c;提供卓越的图像质量和精确的提示词响应&#xff0c;非常适合在 1 兆像素分辨率下的专…

python 爬虫 入门 二、数据解析(正则、bs4、xpath)

目录 一、待匹配数据获取 二、正则 三、bs4 &#xff08;一&#xff09;、访问属性 &#xff08;二&#xff09;、获取标签的值 &#xff08;三&#xff09;、查询方法 四、xpath 后续&#xff1a;登录和代理 上一节我们已经知道了如何向服务器发送请求以获得数据&#x…

Unity引擎:游戏开发的核心力量

目录 引言 Unity引擎的发展历程 早期发展 跨平台支持 Unity引擎的核心特性 易用性 社区支持 跨平台能力 Unity在游戏开发中的应用 移动游戏 独立游戏 3A游戏 Unity的未来展望 高级图形和渲染技术 扩展现实&#xff08;XR&#xff09;支持 云服务和多人游戏 结论…

萤石云服务支持云端视频AI自动剪辑生成

萤石视频云存储及媒体处理服务是围绕IoT设备云端存储场景下的音视频采集、媒体管理、视频剪辑和分发能力的一站式、专业云服务&#xff0c;并可面向广大开发者提供复杂设备存储场景下的完整技术方案。目前该服务新增了视频剪辑功能&#xff0c;支持将视频片段在云端进行裁剪并拼…

多模态大语言模型(MLLM)-Blip3/xGen-MM

论文链接&#xff1a;https://www.arxiv.org/abs/2408.08872 代码链接&#xff1a;https://github.com/salesforce/LAVIS/tree/xgen-mm 本次解读xGen-MM (BLIP-3): A Family of Open Large Multimodal Models 可以看作是 [1] Blip: Bootstrapping language-image pre-training…

数据结构 ——— 顺序表和链表的区别以及各自的优缺点

目录 顺序表和链表的区别 一、存储空间上 二、下标的随机访问 三、任意位置插入或者删除元素 四、添加数据 五、应用场景 六、缓存利用率 顺序表和链表的优缺点 顺序表的缺点 链表的优点&#xff08;和顺序表的缺点对应&#xff09; 顺序表的优点 链表的缺点&#…

Redis 分布式锁

如果追求高可用性&#xff08;AP&#xff09; 就采用redis 如果追求高一致性(CP) 就采用zookeeper 加锁方式:set lockKey uniqueId NX PX expireTime lockKey可以根据业务自己定义&#xff08;如订单&#xff09;uniqueId是为了不解错锁&#xff08;uniqueId可以是session I…