RabbitMQ实现多线程处理接收消息

server/2024/10/18 14:21:25/

前言:在使用@RabbitListener注解来指定消费方法的时候,默认情况是单线程去监听队列,但是这个如果在高并发的场景中会出现很多个任务,但是每次只消费一个消息,就会很缓慢。单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,这个就很伤。

处理办法:可以添加配置类,设置RabbitMQ的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

一、编写配置类

java">package com.quick.config;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;/*** RabbitMQ配置类*/
@Configuration
/*@ConditionalOnClass(RabbitTemplate.class) //有RabbitTemplate依赖才会生效,否则不生效*/
public class MqConfig {// 定义线程数、最大线程数常量private static final int INITIAL_CONCURRENT_CONSUMERS = 10;private static final int MAX_CONCURRENT_CONSUMERS = 10;/*** 将多线程配置配置注入容器工厂*/@Bean("customContainerFactory")public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConcurrentConsumers(INITIAL_CONCURRENT_CONSUMERS); //设置线程数factory.setMaxConcurrentConsumers(MAX_CONCURRENT_CONSUMERS); //最大线程数configurer.configure(factory, connectionFactory);return factory;}}

  • setConcurrentConsumers(int concurrentConsumers): 这个方法设置了容器应该同时启动的监听器(消费者)线程的数量。这些线程会并发地从RabbitMQ队列中拉取并处理消息。这个值决定了系统初始时能够并行处理消息的能力。

  • setMaxConcurrentConsumers(int maxConcurrentConsumers): 这个方法设置了容器在需要时可以增加到的最大并发消费者数量。这通常用于处理负载高峰,当队列中的消息积压时,可以动态地增加并发消费者数量以提高处理速度。然而,请注意,这并不意味着系统会立即创建所有最大数量的线程,而是会根据需要逐渐增加到这个上限。

这个容器负责监听 RabbitMQ 的队列,并将接收到的消息分发给相应的处理器(即 @RabbitListener 注解标记的方法)

二、修改监听者

在接收消息方里面的@RabbitListener注解中添加配置

@RabbitListener(queues = {"监听队列名"},containerFactory = "customContainerFactory")

java">
/*** 接收消息*/
@Component
public class StoreListener {@Resourceprivate IStoreService storeService;@Resourceprivate StoreMapper storeMapper;/*** 更新店铺收藏人数,实现收藏人数+1* @param storeId 店铺id*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "store.addFavorite.success.queue", durable = "true"), // 队列 起名规则(服务名+业务名+成功+队列),durable持久化exchange = @Exchange(name = "addFavorite.direct"), // 交换机名称,交换机默认类型就行direct,所以不用配置directkey = "addFavorite.success" // 绑定的key),// 在@RabbitListener注解中指定容器工厂containerFactory = "customContainerFactory")public void listenAddFavoriteCountsSuccess(Long storeId){storeService.updateStoreFavoriteUsersCountAdd1(storeId);}/*** 根据传过来的店铺实体类修改店铺信息* @param store 店铺实体类*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "store.updateStore.success.queue", durable = "true"),exchange = @Exchange(name = "updateStore.direct"),key = "updateStore.success"),// 在@RabbitListener注解中指定容器工厂containerFactory = "customContainerFactory")public void updateStoreByEntity(Store store){storeMapper.updateById(store);}}

多线程的好处:

1、提高吞吐量:通过并行处理消息,系统可以更快地处理大量消息,从而提高整体吞吐量。

2、更好的资源利用率:在多核处理器上,多线程可以更好地利用硬件资源,减少处理延迟。


http://www.ppmy.cn/server/103202.html

相关文章

特洛伊木马:现代网络安全的隐形威胁

在网络安全领域,特洛伊木马(Trojan Horse,简称木马)是一种古老却依然十分有效的攻击手段。尽管名字来源于古希腊神话中的特洛伊战争,但在现代信息技术中,木马病毒已演变为一种极具破坏性和隐蔽性的恶意软件…

培训学校课程管理系统-计算机毕设Java|springboot实战项目

🍊作者:计算机毕设匠心工作室 🍊简介:毕业后就一直专业从事计算机软件程序开发,至今也有8年工作经验。擅长Java、Python、微信小程序、安卓、大数据、PHP、.NET|C#、Golang等。 擅长:按照需求定制化开发项目…

探索顶级PDF水印API:PDFBlocks(2024年更新)

引言 在一个敏感信息常常面临风险的时代,能够轻松高效地保护文档的能力至关重要。PDF水印已成为企业和个人寻求保护其知识产权、确保文件保密性的基本工具。 PDFBlocks 文字水印 API是什么? PDFBlocks API 提供了一个强大的解决方案,用于在…

方差的原理以及应用场景

方差是统计学中一个重要的概念,用来衡量一组数据的离散程度或波动性。具体来说,方差描述了数据点与其均值之间的平均平方差。方差越大,说明数据点的波动性或不确定性越大;方差越小,说明数据点集中在均值附近&#xff0…

re模块入门教程

re模块是Python中的正则表达式模块,用于处理字符串的匹配和替换操作。re模块提供了一组函数,可以用来进行正则表达式的匹配、查找、替换等操作 一、匹配 re.match(pattern, string, flags0): 从字符串的开头开始匹配,如果成功匹配则返回一个…

GATK SampleList接口介绍

在 GATK 中,SampleList 是一个接口,用于表示一个样本列表。这些样本通常是在基因组分析过程中被处理的不同生物样本。SampleList 接口提供了访问这些样本的一些基本方法,通常用于多样本分析任务,比如变异检测或基因组重测序。 SampleList 接口的方法 SampleList 接口定义…

Android交流社区

推荐一个Android交流社区:玩Android

Kubernetes--命令行工具 kubectl

前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除 一、在任意节点使用 kubectl 1、将 master 节点中 /etc/kubernetes/admin.conf 拷贝到需要运行的服务器的 /etc/kubernetes 目录中 [rootk8s-master…