RabbitMQ消费者重试的两种方案

server/2025/1/13 9:54:10/

目录

直接重试

优点:

缺点:

保存数据库的重试方案

优点:

缺点:

选择建议

适合直接重试的场景 

适合数据库记录的场景 


这篇文章总结一下消费者消费重试的方案

直接重试

一种是消息消费失败然后消费者直接重试,这需要配置消费者重试机制

java">@Component
public class DirectRetryConsumer {@RabbitListener(queues = "myQueue")@RabbitListener(queues = "myQueue",containerFactory = "retryContainerFactory")public void processMessage(Message message, Channel channel) {try {// 处理消息processBusinessLogic(message);// 确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 重试处理handleRetry(message, channel, e);}}private void handleRetry(Message message, Channel channel, Exception e) {MessageProperties props = message.getMessageProperties();Long retryCount = props.getHeader("retry-count");if (retryCount == null) {retryCount = 0L;}if (retryCount < maxRetryCount) {// 重新入队,等待重试channel.basicNack(props.getDeliveryTag(), false, true);} else {// 超过重试次数,进入死信队列channel.basicNack(props.getDeliveryTag(), false, false);}}
}@Configuration
public class RetryConfig {@Beanpublic SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 配置重试策略RetryTemplate retryTemplate = new RetryTemplate();ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(1000);backOffPolicy.setMultiplier(2.0);backOffPolicy.setMaxInterval(10000);retryTemplate.setBackOffPolicy(backOffPolicy);factory.setRetryTemplate(retryTemplate);return factory;}
}
优点:
  • 实现简单,开发成本低
  • 消息处理实时性高
  • 系统复杂度低
  • 资源消耗相对较少
缺点:
  • 重试策略不够灵活
  • 无法保存失败原因和重试历史
  • 难以进行人工干预
  • 监控和统计困难

 

保存数据库的重试方案

java">@Entity
public class MessageRecord {@Idprivate Long id;private String messageId;private String payload;private String status; // PENDING, PROCESSING, FAILED, SUCCESSprivate Integer retryCount;private String errorMessage;private LocalDateTime createTime;private LocalDateTime updateTime;
}@Service
@Transactional
public class MessageProcessService {@Autowiredprivate MessageRecordRepository recordRepository;@RabbitListener(queues = "myQueue")public void processMessage(Message message, Channel channel) {MessageRecord record = null;try {// 1. 保存消息记录record = saveMessageRecord(message);// 2. 处理业务逻辑processBusinessLogic(message);// 3. 更新状态为成功record.setStatus("SUCCESS");recordRepository.save(record);// 4. 确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 5. 记录失败信息handleFailure(record, e);// 6. 拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}@Component
@Slf4j
public class FailedMessageRetryJob {@Autowiredprivate MessageRecordRepository recordRepository;@Scheduled(fixedDelay = 300000) // 5分钟public void retryFailedMessages() {List<MessageRecord> failedRecords = recordRepository.findByStatusAndRetryCountLessThan("FAILED", maxRetryCount);for (MessageRecord record : failedRecords) {try {// 重试处理processMessage(record);// 更新状态record.setStatus("SUCCESS");recordRepository.save(record);} catch (Exception e) {// 更新重试次数和错误信息record.setRetryCount(record.getRetryCount() + 1);record.setErrorMessage(e.getMessage());recordRepository.save(record);log.error("Retry failed for message: {}", record.getMessageId(), e);}}}
}

       将所有的消息都记录在数据库中进行保存,并以消息是否消费成功来更改数据库中消息的状态值,可以开启一个定时任务执行从数据库取出失败的消息进行重新消费,因为可能取出消费的时候还可能会失败,可以设置一次任务取出数据消费条数,若是超出条数则等到下次定时任务再进行消费。

      当然也可以通过定时任务给另一个重试队列投递消息,然后消费者收到消息就从数据库中取出失败的记录进行重试。这样的方法其实和刚刚讲的原理差不多。

优点:
  • 追踪性强,保留完整的处理历史
  • 支持人工干预和处理
  • 便于监控和统计
  • 可以实现定制化的重试逻辑
  • 数据不会丢失
缺点:
  • 实现复杂度高
  • 需要额外的数据库存储
  • 系统资源消耗较大
  • 实时性相对较差
  • 需要维护额外的定时任务

选择建议

适合直接重试的场景 

简单的消息处理,处理简单的、非关键的消息,失败影响较小,不需要追踪历史

适合数据库记录的场景 

复杂的业务处理,处理订单、支付等关键业务,需要完整的处理历史,可能需要人工介入

 


http://www.ppmy.cn/server/157984.html

相关文章

数据结构之双向链表

目录 双向链表的基本概念和结构 初始化 尾插 头插 尾删 头删 查找 在指定位置之后插入 删除指定位置节点 判空 销毁 完整代码 测试代码 双向链表的基本概念和结构 双向链表&#xff08;Doubly Linked List&#xff09;‌是一种链式存储结构&#xff0c;每个节点除…

spark functions函数合集(无示例)

ctrlF进行页面查找 没有示例&#xff0c;仅用于查询&#xff0c;具体用法自行搜索 函数名称作用avg计算指定列的平均值count计算指定列或所有行的数量countDistinct计算指定列中不同值的数量corr计算两个列之间的相关系数covar_pop计算两个列之间的总体协方差covar_samp计算两…

08cms房产系统开源源码与链家房产系统小程序源码两套的安装教程步骤大同小异

简介&#xff1a; 08cms系统源码目前没有任何域名限制&#xff0c;一个实实在在的房产门户系统功能比较强悍&#xff0c;包括新房二手房等所有房产门户的特征功能都具备&#xff0c;自带全景看房&#xff0c;仿链家功能带小程序app工程源码&#xff0c; 2.TP房产系统源码也是一…

pdf提取文本,表格以及转图片:spire.pdf

文章目录 &#x1f412;个人主页&#xff1a;信计2102罗铠威&#x1f3c5;JavaEE系列专栏&#x1f4d6;前言&#xff1a;&#x1f380; 1. pdfbox1.1导入pdfbox 的maven依赖1.1 提取文本1.2 提取文本表格&#xff08;可自行加入逻辑处理&#xff09;1.3 pdf转换成图片代码&…

检验统计量与p值笔记

一、背景 以雨量数据为例&#xff0c;当获得一个站点一年的日雨量数据后&#xff0c;我们需要估计该站点的雨量的概率分布情况&#xff0c;因此我们利用有参估计的方式如极大似然法估计得到了假定该随机变量服从某一分布的参数&#xff0c;从而得到该站点的概率密度函数&#x…

【网络】计算机网络的分类 局域网 (LAN) 广域网 (WAN) 城域网 (MAN)个域网(PAN)

局域网是通过路由器接入广域网的 分布范围 局域网Local Area Network&#xff1a;小范围覆盖&#xff0c;速度高&#xff0c;延迟低(办公室&#xff0c;家庭&#xff0c;校园&#xff0c;网络) 广域网Wide Area Network 大范围覆盖&#xff0c;速度相对低&#xff0c;延迟高…

【会话详解】

会话详解 概述 会话&#xff1a; 用户通过浏览器访问多个Web资源的过程&#xff0c;从打开浏览器开始访问特定网站&#xff0c;直到关闭浏览器的过程称为会话&#xff08;Session&#xff09;。会话管理是Web应用中跟踪和存储用户状态的重要机制。 有状态会话&#xff1a; …

使用conda出现requests.exceptions.HTTPError 解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…