记一次线程堵塞(挂起)导致消息队列积压

news/2024/11/22 21:58:43/

1 背景

A服务作为生产者,每天发送上千万的mq消息,每一个消息包含500个用户ids数据。B服务作为消费者,接受MQ消息并通过http调用第三方请求进行业务处理,消费组启用了rabbitmq的多线程消费组,一个实例并发40个mq消费者线程,每个线程一次获取10个消息进行消费。

Mq消费者配置如下:

# mq配置rabbitmq:connection-timeout: 15000cache:channel:size: 200# 消息发送到rabbitmq broker cluster需要回调publisher-confirms: true# 交换机将消息投递至队列失败时需要回调publisher-returns: truelistener:# 手动确认消息已被消费simple:acknowledge-mode: manual# consumer的并发数concurrency: 40max-concurrency: 50# 每个消息者每次取10条prefetch: 10

Mq挤压消息如下

2 排查

2.1 复制rabbitmq挤压消息数据进行模拟复现

找出rabbitmq挤压的消息,在本地模拟消费,找出没有进行消息确认的原因,通过rabbitmq控制台的Get messages功能

复制payload的消息进行base64转码,转出来的消息是乱码不完整的,怀疑
是rabbitmq还结合了其他加密处理,放弃这种排查思路

2.2 检查报错日志

rabbitmq的unack消息挤压,那就是消费者没有进行ack确认,怀疑消费者代码有异常导致没能执行到ack的代码。
查询服务器日志,没发现有报错的日志,梳理业务代码,消费者使用了spring aop around机制进行消息确认,所以不管代码有没有报错,按理说都会手动进行mq消息ack确认

2.3 检查服务是否宕机

消费组实例数量符合服务器大小配置,因此服务器应用没有宕机

2.4 检查java线程

使用IBM的TMDA工具进行分析线程堆栈,工具下载地址
TMDA工具下载地址

TMDA工具简介

TMDA分析线程堆栈结果如下

通过分析图,看到大量park线程,确实是符合现状,应用的线程挂起了

3 分析和解决

通过stack深度高到底排序,业务代码存在线程等待情况,具体代码CountDownLatch.await

3.1 结合业务代码分析

通过上图stack提示,找到关联的业务代码

伪代码如下:

// new一个CompletableFuture
public CompletableFuture<Integer> httpCall(String tokenData){CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {long time = 3000L;try {Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}return Integer.parseInt(tokenData);});return completableFuture;}httpCall(tokenData).whenCompleteAsync((returnValue, ex)->{// do business// ex.getMessage()// 其中ex对象为空,使用ex.getMessage()报了空指针,导致没能执行如下的countDowncountDownLatch.countDown();})

消费者服务通过http调用第三方服务,为了提高并发,使用了多线程,每一组(数十个为一组)http请求批量调用完成后再把请求响应结果异步存入数据库,
主线程使用了countDownLatch.await进行等待,
其中whenCompleteAsync方法存在空指针问题,导致没能执行如下的countDown方法。

这里有人会问, 上面错误日志检查步骤,不是说日志没有空指针异常吗?
对,子线程报了空指针,因为CompletableFuture执行每次都是new 一个新的CompletableFuture对象,并把结果作为下一个CompletableFuture执行的入参,
通过伪代码可以发现,执行whenCompleteAsync后,没有新的CompletableFuture方法执行,所以异常没有抛出来,使得排查变得困难

3.2 解决

因为存在whenCompleteAsync报错的情况,添加多一个新的异常捕获处理方法,捕获异常也进行countDown的操作。

代码如下:

    httpCall(tokenData).whenCompleteAsync((returnValue, ex)->{// do business// ex.getMessage()// 其中ex对象为空,使用ex.getMessage()报了空指针,导致没能执行如下的countDowncountDownLatch.countDown();}).exceptionally(e ->{log.info("exceptionally捕获到异常,tokenData={}, e={}", tokenData, e.getMessage());countDownLatch.countDown();return null;});

4 结论

  • 熟练CompletableFuture的使用,要看源码的实现(实现原理cas + 多个future采用入stack,每次把前一个future的结果作为参数传入下一个future去执行)

  • 使用多线程需要考虑异常、超时等情况

  • 熟练使用jvm stack分析工具

5 文章参考

CompletableFuture流程图

CompletableFuture参考文章如下

CompletableFuture 原理浅析


http://www.ppmy.cn/news/1114363.html

相关文章

OJ练习第176题——第二高的薪水

第二高的薪水 力扣链接&#xff1a;176. 第二高的薪水 题目描述 示例 MySQL select max(salary) as SecondHighestSalary from Employee where salary ! (select max(salary) as salary from Employee); #去掉最大后&#xff0c;再去取最大&#xff0c;就是第二大select (s…

DMA简单总结

文章目录 一、基础概念1.1 DMA---Direct Memory Access 直接存储器访问&#xff0c;目的减少CPU资源占用 二、典型DMA硬件模型2.1 基本硬件特性---通道数、源/目标类型&#xff0c;地址与累加方式&#xff0c;数据位宽&#xff0c;搬移长度&#xff0c;循环模式&#xff0c;中断…

工商银行潍坊分行党建RPA机器人项目解析

01 案例背景&#xff1a;银行业掀起引入RPA加速实现数字化转型的浪潮 近年来&#xff0c;金融科技的蓬勃发展极大促进了银行的业务创新&#xff0c;新技术、新业态层出不穷。随着银行业务和科技的融合逐步落实&#xff0c;银行业务正朝着线上化、智能化转变。科技赋能的转型范…

CentOS LVM缩容与扩容步骤

为VM打快照;备份home数据;# yum install xfsdump -y [root@testCentos7 home]# xfsdump -f /dev/home.dump /home xfsdump: using file dump (drive_simple) strategy xfsdump: version 3.1.7 (dump format 3.0) - type ^C for status and control ===================…

Golang开发--计时器(Timer)和定时器(Ticker)

计时器&#xff08;Timer&#xff09; 在 Go 中&#xff0c;可以使用 time 包提供的计时器&#xff08;Timer&#xff09;来执行定时任务。计时器允许你在指定的时间间隔后执行某个操作。 time.Timer结构表示一个计时器&#xff0c;它会在指定的时间段后触发单次操作。 创建计…

【PyTorch 攻略 (4/7)】张量和梯度函数

一、说明 W在训练神经网络时&#xff0c;最常用的算法是反向传播。在该算法中&#xff0c;参数&#xff08;模型权重&#xff09;根据损失函数相对于给定参数的梯度进行调整。损失函数计算神经网络产生的预期输出和实际输出之间的差异。 目标是获得尽可能接近零的损失函…

Python-利用小波变换预测股票

一、简介 股票上涨和下跌&#xff0c;创造出像海浪一样难以预测的模式和走势。然而&#xff0c;就像科学家通过了解下面的水流来预测波浪的运动一样&#xff0c;我们也可以使用类似的工具破译股票市场的一些模式。 通过利用小波变换的力量&#xff0c;我们深入表面&#xff0c;…

大事件项目 api_server

Headline 大事件后台 API 项目&#xff0c; API 接口文档请参考 https://www.showdoc.cc/escook?page_id3707158761 215217 1. 初始化 1.1 创建项目 1. 新建 api_server 文件夹作为项目根目录&#xff0c;并在项目根目录中运行如下的命令&#xff0c;初始化包管理配置…