分布式中间件:RabbitMQ确认消费机制

server/2025/4/1 7:41:49/

分布式中间件:RabbitMQ确认消费机制

分布式系统中,消息队列是实现异步通信和系统解耦的重要组件。RabbitMQ 作为一款功能强大的消息队列中间件,提供了丰富的特性来保证消息的可靠传输和消费。其中,确认消费机制是确保消息被正确处理的关键环节。本文将深入探讨 RabbitMQ 的确认消费机制,并给出不同场景下的配置示例。

确认消费机制概述

RabbitMQ 的确认消费机制主要涉及两个方面:生产者消息确认和消费者消息确认。生产者消息确认用于确保消息成功发送到 RabbitMQ 服务器,而消费者消息确认则用于确保消息被消费者正确处理。

消费者确认模式

RabbitMQ 提供了三种消费者确认模式:

  1. AcknowledgeMode.AUTO:自动确认模式。当消费者接收到消息后,RabbitMQ 会自动将消息标记为已消费,无论消费者是否成功处理该消息。这种模式简单方便,但可能会导致消息丢失,因为如果消费者在处理消息时出现异常,消息已经被确认,无法再次处理。
  2. AcknowledgeMode.MANUAL:手动确认模式。消费者需要显式地调用 channel.basicAck() 方法来确认消息已经被成功处理,或者调用 channel.basicNack()channel.basicReject() 方法来拒绝消息。这种模式可以确保消息不会丢失,但需要开发者手动处理确认逻辑。
  3. AcknowledgeMode.NONE:无确认模式。RabbitMQ 不会等待消费者的确认,一旦消息被发送给消费者,就会将其标记为已消费。这种模式适用于对消息可靠性要求不高的场景。

配置示例

单一消费者实例配置

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {private final ConnectionFactory connectionFactory;public RabbitMQConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}@Bean("singleListenerContainer")public SimpleRabbitListenerContainerFactory singleListenerContainer() {// 创建一个监听容器工厂SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 设置连接工厂factory.setConnectionFactory(connectionFactory);// 设置消息转换器factory.setMessageConverter(new Jackson2JsonMessageConverter());// 设置并发消费factory.setConcurrentConsumers(1);// 设置最大并发消费factory.setMaxConcurrentConsumers(1);// 设置最大单条消费消息factory.setPrefetchCount(1);return factory;}
}

在这个配置中,我们创建了一个单一消费者实例的监听容器工厂。通过设置 ConcurrentConsumersMaxConcurrentConsumers 为 1,确保只有一个消费者实例在处理消息。PrefetchCount 设置为 1,表示消费者一次只从队列中获取一条消息,保证消息的顺序处理。

多个消费者实例配置

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.AcknowledgeMode;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {private final ConnectionFactory connectionFactory;public RabbitMQConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}@Bean("multiListenerContainer")public SimpleRabbitListenerContainerFactory multiListenerContainer() {// 创建一个监听容器工厂SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 设置连接工厂factory.setConnectionFactory(connectionFactory);// 设置消息转换器factory.setMessageConverter(new Jackson2JsonMessageConverter());// 设置手动提交factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 设置并发消费factory.setConcurrentConsumers(10);// 设置最大并发消费factory.setMaxConcurrentConsumers(20);// 设置最大单条消费消息factory.setPrefetchCount(10);return factory;}
}

在多个消费者实例的配置中,我们将 ConcurrentConsumers 设置为 10,MaxConcurrentConsumers 设置为 20,表示初始有 10 个消费者实例,最多可以扩展到 20 个。PrefetchCount 设置为 10,允许每个消费者一次从队列中获取 10 条消息,提高消费效率。同时,我们将确认模式设置为 AcknowledgeMode.MANUAL,需要开发者手动处理消息确认。

自定义 RabbitMQ 发送消息组件

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {private final ConnectionFactory connectionFactory;public RabbitMQConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate() {// 设置消息发送确认connectionFactory.setPublisherConfirms(true);// 设置消息发送返回connectionFactory.setPublisherReturns(true);// 创建 rabbitTemplateRabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置消息发送确认回调rabbitTemplate.setMandatory(true);// 设置消息发送确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功");} else {System.out.println("消息发送失败:" + cause + correlationData.toString());}});// 设置消息发送返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息丢失:exchange:" + exchange + ",route:" + routingKey + ",replyCode:" + replyCode + ",replyText:" + replyText);});// 设置消息转换器rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}

在这个配置中,我们自定义了一个 RabbitTemplate 用于发送消息。通过设置 PublisherConfirmsPublisherReturnstrue,开启生产者消息确认和返回机制。ConfirmCallback 用于处理消息发送确认结果,ReturnCallback 用于处理消息无法路由到队列的情况。

总结

RabbitMQ 的确认消费机制为分布式系统中的消息传递提供了可靠的保障。通过合理配置生产者和消费者的确认模式,可以确保消息的可靠发送和消费。在实际应用中,需要根据业务需求选择合适的确认模式,并处理好异常情况,以保证系统的稳定性和可靠性。

希望本文能帮助你更好地理解和使用 RabbitMQ 的确认消费机制。如果你有任何问题或建议,欢迎在评论区留言。

以上博客详细介绍了 RabbitMQ 的确认消费机制,并给出了不同场景下的配置示例。通过这些配置,你可以根据业务需求灵活调整消费者实例数量和确认模式,确保消息的可靠传输和处理。


http://www.ppmy.cn/server/177142.html

相关文章

Pytorch torch.roll函数介绍

torch.roll 是 PyTorch 中的一个函数,用于对输入张量的元素进行循环滚动操作。它可以将张量的元素在指定的维度上移动,超出边界的元素会循环回到另一侧。以下是关于 torch.roll 函数的详细介绍: 函数语法 torch.roll(input, shifts, dimsNo…

数学建模:MATLAB循环神经网络

一、简述 1.循环神经网络 循环神经网络(RNN)是一种用于处理序列数据的神经网络。不同于传统的前馈神经网络,RNN在隐藏层中加入了自反馈连接,使得网络能够对序列中的每个元素执行相同的操作,同时保持一个“记忆”状态…

大屏技术汇集【目录】

Cesium 自从首次发布以来,经历了多个版本的迭代和更新,每个版本都带来了性能改进、新功能添加以及对现有功能的优化。以下是 Cesium 一些重要版本及其主要特点: 主要版本概述 Cesium 1.0 (2012年) 初始版本发布,确立了Cesium作为…

Three.js中的加载器与资源管理:构建丰富3D场景的关键

一、引言 Three.js是一个强大的JavaScript库,用于在Web浏览器中创建和展示3D图形。在构建复杂的3D场景时,有效地加载和管理各种资源是至关重要的。加载器在Three.js中扮演着桥梁的角色,负责将外部的3D模型、纹理、字体和其他资源导入到场景中…

对接马来西亚、印度、韩国、越南等全球金融数据示例

Python对接StockTV全球金融数据API的封装实现及使用教程: import requests import websockets import asyncio from typing import Dict, List, Optional, Union from datetime import datetimeclass StockTVClient:"""StockTV全球金融数据API客户端…

操作系统的心脏节拍:CPU中断如何驱动内核运转?

目录 一、硬件中断 二、时钟中断 三、软中断 四、用户态与内核态 一、硬件中断 为引出今天的话题&#xff0c;我们来思考这样一个问题&#xff1a; #include<stdio.h> int main() {int a;scanf("%d",&a);return 0; } 当以上程序执行到scanf时&#xf…

【软考-架构】8.4、信息化战略规划-CRO-SCM-应用集成-电子商务

✨资料&文章更新✨ GitHub地址&#xff1a;https://github.com/tyronczt/system_architect 文章目录 信息化战略体系&#x1f4af;考试真题第一题第二题 信息系统战略规划&#x1f4af;考试真题第一题第二题 ✨客户关系管理CRM供应链管理SCM&#x1f4af;考试真题第一题第二…

HTML语言的贪心算法

HTML语言的贪心算法&#xff1a;理论与实践 引言 在编程和算法研究中&#xff0c;贪心算法是一种广泛应用的解决问题的方法。它通过对每一阶段选择最优解的方式来构建整个问题的解决方案。贪心算法不一定能在所有情况下得到最优解&#xff0c;但在许多实际问题中&#xff0c;…