RabbitMQ常见问题持续汇总

devtools/2024/10/25 9:54:23/

文章目录

  • 消息分发
    • 不公平分发
    • 限流-basic.qos
      • 主要功能
      • 使用场景
      • 示例代码
  • 消费者默认concurrency数量
    • prefetch和concurrency结合?
  • spring.rabbitmq.template.retry.enabled=true和spring.rabbitmq.listener.simple.retry.enabled=true有什么区别
    • 1. `spring.rabbitmq.template.retry.enabled=true`
    • 2. `spring.rabbitmq.listener.simple.retry.enabled=true`
    • 总结
  • 多机器(集群部署)同时消费某个队列的消息理解
    • 负载均衡
    • 高可用性
    • 和Fanout发布订阅对比理解
  • TODO--持续更新

这篇文章主要是汇总一些杂七杂八的问题,核心内容参考前三章节

RabbitMQ系列文章
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
不止于纸上谈兵,用代码案例分析如何确保RabbitMQ消息可靠性?
不止于方案,用代码示例讲解RabbitMQ顺序消费
RabbitMQ常见问题持续汇总

消息分发

不公平分发

RabbitMQ 默认分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1); 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

限流-basic.qos

通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

basic.qos 是 RabbitMQ 的一个方法,用于设置消息消费的流控(Quality of Service, QoS)。具体来说,它允许你限制在消费者确认(acknowledge)消息之前,RabbitMQ 能够推送给该消费者的消息数量。这样可以防止消费者因为处理不过来而被淹没在大量未处理的消息中。

 com.rabbitmq.client.Channel#basicQos(int, int, boolean)	/*** 请求对此通道应用特定的prefetchCount“服务质量”设置** @param prefetchCount 服务器将交付的最大消息数,如果不限制则为0* @throws java.io.IOException 如果遇到错误* * 注意: 该方法是基本服务质量机制的一部分,用于流量控制* 客户端可以通过设置prefetchCount来避免被服务器压垮*/
void basicQos(int prefetchCount) throws IOException;

主要功能

配置如下

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:concurrency: 1max-concurrency: 3# 消费者预取1条数据到内存,默认为250条# 消费端限流:每个消费者未确认的未处理消息的最大数量prefetch: 1  
  1. 设置预取计数(prefetch count):

    prefetch_countbasic_qos 方法中的一个参数,用于指定消费者可以接收的未确认消息的最大数量。在 RabbitMQ 中,prefetch_count 的值可以是 0 到 65535 之间的任意整数,其中 0 表示无限制 。RabbitMQ 允许为每个消费者独立设置 prefetch_count,而不是像 AMQP 0-9-1 协议中那样在通道级别共享 。

    • basic.qos 允许设置每个消费者在未确认的情况下能接收的最大消息数量。比如,basic.qos(1) 表示每个消费者在没有确认上一条消息之前不会再接收新的消息。这有助于实现公平调度,确保消费者不会被淹没。
  2. 限制消息分发:

    • 当有多个消费者时,basic.qos 通过限制每个消费者处理消息的数量,确保消息分发更均衡。消费者处理完消息并发送确认后,RabbitMQ 才会将新的消息发送给它。
  3. 避免消息堆积:

    • 通过合理设置 prefetch count,可以防止消费者端消息堆积,避免因为过多未处理的消息导致的内存消耗问题。

使用场景

  • 资源密集型任务: 如果你的消费者在处理某些需要消耗较多资源(如CPU、内存)的任务时,需要限制其一次处理的消息数量,以避免资源耗尽。
  • 分布式系统: 在分布式系统中,basic.qos 可以帮助实现更公平的负载均衡,确保每个消费者都能公平地获取消息处理。

示例代码

// 设置每个消费者最多处理3条未确认的消息
channel.basicQos(3);

通过上述设置,RabbitMQ 在每个消费者确认消息前只会推送3条消息,这样就能有效控制消息处理的并发量。

消费者默认concurrency数量

首先说一下concurrency配置,这个配置是设置listener初始化时的线程数,即消费者的数量,即消费者同时消费消息的数量。

那么如果没有显性设置concurrency时,默认的线程数是多少呢,答案是1。

具体的我们可以在源码org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer看到

在这里插入图片描述

prefetch和concurrency结合?

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 全局并发设置concurrency: 1  max-concurrency: 3# 消费者预取1条数据到内存,默认为250条# 消费端限流:每个消费者未确认的未处理消息的最大数量prefetch: 10  

若一个消费者配置prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个线程到内存中(注意不是两个线程去共享内存中抓取的消息)。

concurrency在注解里面可以配置,配置了以后以注解为准,yml里面相当于是全局的

@Component
public class MyConsumer {//会覆盖配置文件中的参数。@RabbitListener(queues = {"myQueue"},concurrency ="2")public void receiver(Message msg, Channel channel) throws InterruptedException {//...业务处理}}

rabbitmqtemplateretryenabledtruespringrabbitmqlistenersimpleretryenabledtrue_141">spring.rabbitmq.template.retry.enabled=true和spring.rabbitmq.listener.simple.retry.enabled=true有什么区别

spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.listener.simple.retry.enabled=true 是 Spring AMQP 中配置 RabbitMQ 消息重试机制的两个不同选项,分别用于不同的场景。

rabbitmqtemplateretryenabledtrue_145">1. spring.rabbitmq.template.retry.enabled=true

  • 作用范围: 这个配置用于通过 RabbitTemplate 进行消息发送时的重试机制。
  • 适用场景: 当你在使用 RabbitTemplate 主动发送消息到 RabbitMQ 时,如果消息发送失败(如网络问题、连接超时等),Spring 会自动进行重试。
  • 默认行为: 如果开启了这个选项,RabbitTemplate 会按照配置的重试策略(如重试次数、重试间隔等)自动重试消息发送操作,直到成功或者达到重试上限。
  • 常见使用: 适用于主动调用 RabbitTemplate.convertAndSend() 或类似方法进行消息发送的场景。

rabbitmqlistenersimpleretryenabledtrue_152">2. spring.rabbitmq.listener.simple.retry.enabled=true

  • 作用范围: 这个配置用于在使用消息监听器(Message Listener)时的重试机制,特别是在使用简单消息监听容器(SimpleMessageListenerContainer)时。
  • 适用场景: 当你使用 @RabbitListener 或其他监听机制从队列中接收和处理消息时,如果消息处理失败(如业务逻辑异常),Spring 会按照配置的重试策略自动进行重试。
  • 默认行为: 如果开启了这个选项,当消费者处理消息时抛出异常,Spring 会自动进行重试,直到处理成功或达到最大重试次数。
  • 常见使用: 适用于使用 @RabbitListener 注解来监听队列消息,并希望在处理失败时自动重试的场景。

总结

  • spring.rabbitmq.template.retry.enabled=true 是用于发送消息失败后的重试机制,针对的是消息的生产者(发送端)。
  • spring.rabbitmq.listener.simple.retry.enabled=true 是用于消费消息时处理失败后的重试机制,针对的是消息的消费者(监听端)。

多机器(集群部署)同时消费某个队列的消息理解

对于这个代码,我们的代码在实际业务中是集群部署的,不同的机器都可能去消费这个队列的消息!

public class MessageConsumer {@RabbitListener(queues = "queue1")public void receiveMessageFromQueue1(String message) {// 处理来自queue1的消息System.out.println("Received from queue1: " + message);}}

负载均衡

通过多个消费者实例共同消费同一个队列的消息,负载可以均匀分布在所有消费者上,提高整体系统的处理能力。当多个消费者监听同一个队列时,RabbitMQ 会按照轮询的方式将消息分发给这些消费者。每条消息只会被一个消费者消费,这样可以有效地分摊负载,提高系统的吞吐量和可靠性。

假设有两个消费者(Consumer A 和 Consumer B),都监听同一个队列 queue1。RabbitMQ 将会按如下方式分发消息:

  • 消息 1 发送给 Consumer A
  • 消息 2 发送给 Consumer B
  • 消息 3 发送给 Consumer A
  • 依此类推…

高可用性

如果某个消费者实例宕机,RabbitMQ 会自动将新的消息发送给其他存活的消费者,确保消息不会丢失,业务处理不中断。

和Fanout发布订阅对比理解

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

在这里插入图片描述

也就是说发送一条消息A,假设有队列1、队列2都订阅了这个消息A,那么这监听这两个队列的消费者都会去消费消息A,同时,在集群部署的情况下,对应监听队列1、2的消费者都会有多个,但是每次MQ都会采用负载均衡策略(默认轮询),这样每个队列对应实际只会有一个消费者去消费!

下面展示了消息A被两个队列订阅,并且每个队列有多个消费者,但在负载均衡策略下,每次只有一个消费者去消费消息A的情况。

在这里插入图片描述

如图所示:

  • 消息A被发送到两个队列:队列1和队列2。
  • 每个队列有多个消费者:队列1有消费者1、消费者2和消费者3,队列2有消费者4、消费者5和消费者6。
  • 在负载均衡策略下,每次只有一个消费者去消费消息A。例如,队列1的消息A可能会被消费者1、消费者2或消费者3中的一个消费,队列2的消息A可能会被消费者4、消费者5或消费者6中的一个消费。

TODO–持续更新


http://www.ppmy.cn/devtools/128654.html

相关文章

HarmonyOS 5.0应用开发——Navigation实现页面路由

【高心星出品】 文章目录 Navigation实现页面路由完整的Navigation入口页面子页面 页面跳转路由拦截其他的 Navigation实现页面路由 Navigation:路由导航的根视图容器,一般作为页面(Entry)的根容器去使用,包括单页面&…

基于Qcom A14虚拟化平台的qcom-ethqos-thin驱动的分析(五)

前言 前面说的都是ethqos-thin驱动的基本功能实现,这篇就看看驱动中重要的emac_fe_ev_wq工作队列,emac_fe_ev_wq负责收取cmd通知,来判断当前的网卡状态,并基于不同的状态对驱动做出不同的处理。 同时emac_ctrl_fe_virtio驱动负责与BE端通信,获取的cmd发送到emac_fe_ev_wq队…

Java项目实战II基于微信小程序的医院管理系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 基于微信小…

AI学习指南自然语言处理篇-Transformer模型简介

AI学习指南自然语言处理篇——Transformer模型简介 目录 引言Transformer模型的背景Transformer模型的结构 自注意力机制位置编码编码器-解码器架构 Transformer在自然语言处理中的应用 机器翻译文本生成问答系统 与传统循环神经网络和卷积神经网络的对比 计算效率长程依赖建…

Linux——K8S的pod的调度

DeploymentStatefulSetDaemonsetreplicaSetReplicacontroller // 从K8S的近期版本中将逐渐移除rcJobcronjob K8s 网络: 平台中的POD如何通信: CNI 容器网络插件Coredns的组件 负责提供平台中的名称解析平台中的应用如何被客户端访问 Service // 将…

django celery 定时任务 Crontab 计划格式

Celery 定时任务教程 Celery 是一个强大的异步任务队列/作业队列基于分布式消息传递的开源项目。它广泛用于处理各种类型的后台任务,例如发送电子邮件、处理图像、数据分析和视频转换等。 本文将介绍如何使用 Celery 实现定时任务,包括: 安…

【Linux】Shell概念、命令、操作(重定向、管道、变量)

文章目录 一、概念篇1、shell的概念2、shell的分类 二、命令篇1、cat2、echo3、ps4、grep4.1、匹配行首4.2、大小写 5、sed 三、操作篇1、自动补全2、查看历史命令3、命令替换4、重定向4.1、输入重定向4.2、输出重定向4.3、错误重定向 5、管道6、shell中的变量6.1、本地变量6.2…

深度学习 自动求梯度

代码示例: import torch# 创建一个标量张量 x,并启用梯度计算 x torch.tensor(3.0, requires_gradTrue)# 计算 y x^2 y torch.pow(x, 2)# 判断 x 和 y 是否需要梯度计算 print(x.requires_grad) # 输出 x 的 requires_grad 属性 print(y.requires_g…