RabbitMQ高级特性 - 消费者消息确认机制

embedded/2024/9/24 10:20:32/

文章目录

  • RabbitMQ 消息确认机制
    • 背景
    • 消费者消息确认机制
      • 概述
      • 手动确认(RabbitMQ 原生 SDK)
      • 手动确认(Spring-AMQP 封装 RabbitMQ SDK)
        • AcknowledgeMode.NONE
        • AcknowledgeMode.AUTO(默认)
        • AcknowledgeMode.MANUAL
        • MANUAL 可能会引发的问题

RabbitMQ 消息确认机制


背景

在这里插入图片描述

上图中可以看出,从生产者发送消息消费者接收到消息并正确处理,这些里路线都可能会出现问题,那么为了保证这些消息最后能被正确处理,RabbitMQ 就提供了消息确认机制.

消费者消息确认机制

概述

在这里插入图片描述
为了保证消息从 队列 到 消费者正确消费,那么就引入了消费者消息确认机制.

a)消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种(以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK,非 Spring 提供).

  • 自动确认:当 autoAck = true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后不管消费者是否真正的消费这些消息,都会从内存中删除.(适合对消息可靠性要求不高的场景).
  • 手动确认:当 autoAck = false 时,RabbitMQ 会等待消费者显示的调用 Basic.Ack 命令(波安排时间哦且确认消息),然后才会从 内存或磁盘 中删除消息.(适合对消息可靠性要求高的场景).

Ps:可靠性高了,性能也就下降了,所以请综合考虑.

b)对于 MQ队列 中的消息,在 MQ管理平台上可以看到以下两种类别:
在这里插入图片描述
Ready:队列已经准备好消息,随时准备发送给消费者 的消息数量(只要消费者来要,就立刻发送).
Unacked:消息已经发送给消费者,但是消费者没有返回消息确认 的消息数量(消息确认包括 ack肯定确认nack否定确认

手动确认(RabbitMQ 原生 SDK)

消费者在收到消息之后,可以选择确认,也可以选择拒绝或者跳过,RabbitMQ因此提供了不同的确认应答方式,消费者客户端可通过调用 channel 的相关方法实现.

a)肯定确认:消费者已经接收到消息,并且成功处理消息,可以将其丢弃了.

Channel.basicAck(long deliveryTag, boolean multiple)
  • deliveryTag:消息的唯一标识(单调递增的 long),特点如下:
    • deliveryTag 是每个 Channel 通道独立维护,所以每个通道上的都是唯一的(生产者 和 Broker 建立一个 channel 会生成一个 deliverTag,消费者 和 Broker 建立一个 channel 会生成一个 deliverTag,这俩 deliverTag 是不同的).
    • 当消费者 ack确认 一条消息时,必须使用对应的通道上 deliveryTag 进行确认.
  • multiple:是否批量确认. 如果值为 true,那么就会一次性 ack确认 所有小于或等于指定的 deliveryTag 的消息,大大减少了网络开销
    • 假设 deliveryTag = 8,multiple = true:那么 deliveryTag <= 8 的消息都会被确认.
    • 假设 deliveryTag = 8,multiple = false:只确认 8.

Ps:deliveryTag 确保了消息传递的可靠性和顺序性.

b)否定确认(单个):用来拒绝这个消息. 被拒绝的消息如何处理,具体要看 requeue 参数.

Channel.basicReject(long deliveryTag, boolean requeue)
  • requeue:标识拒绝后,这条消息如何处理.
    • requeue = true:消息会重新存入队列,将来会发送给下一个订阅的消费者.
    • requeue = false:消息会从队列中移除,因此不会发送给消费者.

c)否定确认(批量):Channel.basicReject 只能拒绝一条消息,如果要批量拒绝消息,就可以使用 Channel.basicNack.

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

multiple:参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.

d)MQ 的管理平台上也提供了几种确认方式.
在这里插入图片描述

手动确认(Spring-AMQP 封装 RabbitMQ SDK)

Spring-AMQP 对消息确认提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}

这里根 RabbitMQ 原生 SDK 是有些不同的.

AcknowledgeMode.NONE

不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,然后从 队列 中移除消息.

a)配置手动确认

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: none

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...println("业务逻辑处理完成")}}

在这里插入图片描述

d)效果演示
触发接口之后,回到 MQ 管理平台,可以看到队列中消息已经被删除.
在这里插入图片描述

AcknowledgeMode.AUTO(默认)

分为以下情况:

  • 消费者处理消息过程中没有抛出异常,则自动确认消息,然后从 队列 中移除消息.
  • 消费者处理消息过程中抛出异常,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: auto

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...println("业务逻辑处理完成")}}

效果如下:
在这里插入图片描述
在这里插入图片描述
d)消费者(异常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...val a = 1 / 0println("业务逻辑处理完成")}}

效果如下:
消息未被确认,会不断重返队列,进行重试,因此 IDEA 中会循环报错输出.
在这里插入图片描述

AcknowledgeMode.MANUAL

分为以下情况:

  • 消费者在处理完消息后显示调用 basicAck 方法 来确认消息,然后从 队列 中移除消息.
  • 消费者在处理完消息后显示调用 basicNack 方法 来否定确认消息,是否从队列中移除消息需要看 requeue 参数的值
    • requeue = true:重返队列,不断重试.
    • requeue = false:丢弃消息.
  • 消费者在处理完消息后什么都不做,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者(异常处理消息,requeue = true)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...val a = 1 / 0println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, true) //requeue: true}}}

由于消息处理异常,发送 nack,并且 requeue = true,因此消息会重返队列,不断重试.
在这里插入图片描述
在这里插入图片描述

d)消费者(异常处理消息,requeue = false)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...val a = 1 / 0println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}

由于消息处理异常,发送 nack,并且 requeue = false,因此消息不会重返队列,消息被丢弃.
在这里插入图片描述
d)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}

消息被正常处理,返回 ack.
在这里插入图片描述

MANUAL 可能会引发的问题

在这里插入图片描述
如果这里捕获的不是 Exception 异常,那么消费者处理消息的时候,可能会引发一些不会被捕获的异常,就会导致没有返回 nack.
也就意味着,没有进行确认应答,那么 mq管理平台 上就会显示 Unacked 数值 +1.

Ps:具体还是需要根据业务场景而定

在这里插入图片描述


http://www.ppmy.cn/embedded/90787.html

相关文章

浅谈简单的程序优化技巧(C++)

在 C 编程中&#xff0c;优化是提升程序性能的关键步骤。常数优化&#xff0c;虽然看似细微&#xff0c;但在某些情况下却能显著提高程序的运行效率。本文将为您介绍一些实用的 C 常数优化技巧。 输入输出优化 看一下这道题&#xff1a; 【模板】快速读入 题目背景 制约解…

Python数据结构篇(二)

数据结构 数据结构列表列表的创建与操作列表推导式案例实操 元组案例实操 字典字典的创建与操作字典推导式案例实操 集合集合的创建与操作集合推导式案例实操 数据结构 Python 中常用的数据结构包括列表、元组、字典和集合。每种数据结构都有其独特的特性和使用场景 列表 列…

PHP反序列化漏洞从入门到深入8k图文介绍,以及phar伪协议的利用

文章参考&#xff1a;w肝了两天&#xff01;PHP反序列化漏洞从入门到深入8k图文介绍&#xff0c;以及phar伪协议的利用 前言 本文内容主要分为三个部分&#xff1a;原理详解、漏洞练习和防御方法。这是一篇针对PHP反序列化入门者的手把手教学文章&#xff0c;特别适合刚接触PH…

Java中的5种线程池类型

Java中的5种线程池类型 1. CachedThreadPool &#xff08;有缓冲的线程池&#xff09;2. FixedThreadPool &#xff08;固定大小的线程池&#xff09;3. ScheduledThreadPool&#xff08;计划线程池&#xff09;4. SingleThreadExecutor &#xff08;单线程线程池&#xff09;…

三、Spring-WebFlux实战案例-流式

目录 一、springboot之间通讯方式 1. 服务端 (Spring Boot) 1.1 添加依赖 1.2 控制器 2. 客户端 (WebClient) 2.1 添加依赖 2.2 客户端代码 3. 运行 二、web与服务之间通讯方式 1、服务端代码 2、客户端代码 3、注意事项 三、移动端与服务端之间通讯方式…

Bug 解决 | 无法正常登录或获取不到用户信息

目录 1、跨域问题 2、后端代码问题 3、前端代码问题 我相信登录这个功能是很多人做项目时候遇到第一个槛&#xff01; 看起来好像很简单的登录功能&#xff0c;实际上还是有点坑的&#xff0c;比如明明账号密码都填写正确了&#xff0c;为什么登录后请求接口又说我没登录&a…

我所理解的sprd-camera摄像头框架流程分析

摄像头的图像格式:RGB24,RGB565,RGB444,YUV4:2:2 RGB24 表示R、G、B ,3种基色都用8个二进制位表示,那么红色、绿色、蓝色各有256种,那么由这三种基色构成的颜色就是256X256X256=16,777,216种,约等于1677万。UV 和我们熟知的 RGB 类似,是一种颜色编码格式。 YUV 包含三…

“vcruntime140.dll找不到”的错误怎么处理?一键修复vcruntime140.dll

遇到“vcruntime140.dll找不到”的错误提示时&#xff0c;这通常指的是尝试运行需要Visual C 2015 Redistributable支持的软件时由于该文件的缺失而导致程序无法启动。这种情况在使用Windows操作系统的用户中并不少见。为了帮大家在将来遇到此类问题时能够迅速解决&#xff0c;…