RabbitMQ之消费者批量消费

news/2024/9/24 23:37:03/

为什么要用消费端批量消费?
在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。可以通过对 SimpleRabbitListenerContainerFactory 进行配置实现批量消费能力

==========================>配置类
@Configuration
public class ConsumerConfiguration {@ResourceConnectionFactory connectionFactory;@ResourceSimpleRabbitListenerContainerFactoryConfigurer configurer;/*** 配置一个批量消费的 SimpleRabbitListenerContainerFactory*/@Bean(name = "consumer10BatchContainerFactory")public SimpleRabbitListenerContainerFactory consumer10BatchContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);// 这里是重点 配置消费者的监听器是批量消费消息的类型factory.setBatchListener(true);// 一批十个factory.setBatchSize(1000);// 等待时间 毫秒 , 这里其实是单个消息的等待时间 指的是单个消息的等待时间// 也就是说极端情况下,你会等待 BatchSize * ReceiveTimeout 的时间才会收到消息factory.setReceiveTimeout(10 * 1000L);factory.setConsumerBatchEnabled(true);return factory;}
}
====================》生产者
@Component
public class Producer10 {@ResourceRabbitTemplate rabbitTemplate;public void sendSingle(String id, String routingKey) {Message10 message = new Message10();message.setId(id);rabbitTemplate.convertAndSend(Message10.EXCHANGE, routingKey, message);}
}
================================》消费者
@RabbitListener(queues = Message10.QUEUE, containerFactory = "consumer10BatchContainerFactory")
@Component
@Slf4j
public class Consumer10 {/*** 批量消费** @param message 一批消息*/@RabbitHandlerpublic void onMessage(List<Message10> message) {log.info("[{}][Consumer10 批量][线程编号:{}][消息个数:{}][消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message.size(), message);}/*** 单个消费** @param message 一个消息*/@RabbitHandlerpublic void onMessage(Message10 message) {log.info("[{}][Consumer10 单个][线程编号:{}][消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);}
}
==================================》测试类
@Testvoid sendSingle() throws InterruptedException {// 假设 一秒一个,发送 1000 个,观察消费者的情况for (int i = 0; i < 15; i++) {TimeUnit.SECONDS.sleep(1);String id = UUID.randomUUID().toString();producer10.sendSingle(id, Message10.ROUTING_KEY);if (i == 9) {log.info("[{}][test producer10 sendSingle] 发送成功10个", LocalDateTime.now());}}log.info("[{}][test producer10 sendSingle] 发送成功", LocalDateTime.now());TimeUnit.SECONDS.sleep(20);}
}

以上的是RabbitMQ之消费者批量消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述


http://www.ppmy.cn/news/1452866.html

相关文章

【模型参数优化】网格搜索对随机森林分类模型进行参数寻优【附python实现代码】

写在前面&#xff1a; 首先感谢兄弟们的订阅&#xff0c;让我有创作的动力&#xff0c;在创作过程我会尽最大能力&#xff0c;保证作品的质量&#xff0c;如果有问题&#xff0c;可以私信我&#xff0c;让我们携手共进&#xff0c;共创辉煌。 路虽远&#xff0c;行则将至&#…

STM32的外设总了解

1.NVIC—嵌套向量中断控制器 2.SysTick—系统滴答定时器: 它们是内核里的外设 3.RCC—复位和时钟控制 这个外设十分重要,因为其他的外设再上电的情况下默认是没有时钟的&#xff0c;那么不给时钟的情况下&#xff0c;操作其他外设是无效的&#xff0c;外设不会工作,因此我们需要…

Xcode 对应的 macOS、SDK 版本

最低要求和支持的 SDK 本表截取于 2024-05-04&#xff0c;更多更新可见&#xff1a;https://developer.apple.com/cn/support/xcode/ Xcode 版本要求的最低 OS 版本SDK架构部署目标模拟器SwiftXcode 15.3macOS Sonoma 14iOS 17.4 macOS 14.4 tvOS 17.4 watchOS 10.4 DriverKi…

全方位解析Node.js:从模块系统、文件操作、事件循环、异步编程、性能优化、网络编程等高级开发到后端服务架构最佳实践以及Serverless服务部署指南

Node.js是一种基于Chrome V8引擎的JavaScript运行环境&#xff0c;专为构建高性能、可扩展的网络应用而设计。其重要性在于革新了后端开发&#xff0c;通过非阻塞I/O和事件驱动模型&#xff0c;实现了轻量级、高并发处理能力。Node.js的模块化体系和活跃的npm生态极大加速了开发…

奈氏准则和香农定理

一、奈奎斯特和香农 哈里奈奎斯特&#xff08;Harry Nyquist&#xff09;(左) 克劳德艾尔伍德香农&#xff08;Claude Elwood Shannon&#xff09;(右) 我们应该在心里记住他们&#xff0c;记住所有为人类伟大事业做出贡献的人&#xff0c;因为他们我们的生活变得越来越精彩&…

【C++】---模板进阶

【C】---模板进阶 一、模版参数1、类型参数2、非类型参数 二、模板的特化1、函数模板的特化2、类模板特化&#xff08;1&#xff09;全特化&#xff08;2&#xff09;偏特化 三、模板分离编译1、模板支持分离编译吗&#xff1f;2、为什么模板不支持分离编译&#xff1f;3、如何…

VBA 批量处理Excel文件

目录 一. 批量创建Excel文件1.1 VBA的方式1.2 Powershell方式 二. 批量删除文件 一. 批量创建Excel文件 1.1 VBA的方式 Sub CreateFiles()Dim strPath As String, strFileName As StringDim i As Long, rDim pathSeparator As StringOn Error Resume Next 用户选择文件夹路径…

大模型引领NLP研究新范式:从统计机器学习到预训练语言模型

自然语言处理&#xff08;NLP&#xff09;研究范式经历了从浅层到深层、从局部到整体、从特定到通用的演进过程。下面我们来详细回顾这一过程。 一、早期的统计机器学习方法&#xff08;20世纪90年代 - 21世纪初&#xff09; 词袋模型&#xff08;Bag-of-Words&#xff09; 将…