第十一章 RabbitMQ之消费者确认机制

server/2024/10/18 20:20:11/

目录

一、介绍

二、演示三种ACK方式效果

2.1. none: 不处理

2.1.1. 消费者配置代码

2.1.2. 生产者主要代码 

2.1.3. 消费者主要代码 

2.1.4. 运行效果 

2.2. manual:手动模式

2.3. auto:自动模式 


一、介绍

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

ack:成功处理消息,RabbitMQ从队列中删除该消息

nack:消息处理失败,RabbitMQ需要再次投递消息

reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

none:不处理 消息投递给消费者后立刻ack 消息立刻从MQ删除(非常不安全不建议使用)

manual:手动模式 即手动ack或reject,需要在业务代码结束后,调用api发送ack,但是这种有代码入侵,不建议使用。

auto:自动模式 SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:

1. 如果是业务异常,会自动返回nack

2. 如果是消息处理或校验异常,自动返回reject

Spring默认未我们设定的是auto 自动模式,符合我们实际项目的需求。 

二、演示三种ACK方式效果

2.1. none: 不处理

2.1.1. 消费者配置代码

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

2.1.2. 生产者主要代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.1.3. 消费者主要代码 

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.1.4. 运行效果 

 

我们可以看到,当生产者投递到MQ的那一刻,会立刻返回ACK,此刻消费者的业务逻辑未执行完。

2.2. manual:手动模式

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack

我们定义了一个SimpleMessageListenerContainer,并为它设置了一个ChannelAwareMessageListener。在监听器内部,我们实现了消息的接收和处理,并在处理完成后使用channel.basicAck方法手动发送一个确认消息给RabbitMQ,表明消息已被消费。如果在处理消息时发生异常,我们可以使用channel.basicReject方法拒绝该消息,以便RabbitMQ可以将其重新排队或者进行其他配置的处理。 

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("yourQueueName"); // 设置监听的队列名称container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {// 消息处理逻辑System.out.println("Received message: " + new String(message.getBody()));// 确认消息已被成功处理channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 出现异常,拒绝该消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}});return container;}
}

2.3. auto:自动模式 

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack

当生产者投递到MQ后消费者在消费过程中发生业务异常,MQ会将它标记为Unacked,后续会一直投递该消息,直到消费成功为止。

 

下图看到有两条消息,其中一条是第一次投递失败重新投递的消息: 

至此我们思考一下,实际项目中我们推荐采用Spring AMQP为我们实现的auto 自动模式确认机制,虽然看上去我们的系统设计简单了,但是对于如果我们业务代码出现异常,消息在消费过程中执行一直失败,那么RabbitMQ后续会一直投递该消息,这期间异常消息如果一直消费不了,循环投递就会给我们系统造成极大的压力负担,这该怎么解决?下一章将给大家讲解失败消息的处理策略!


http://www.ppmy.cn/server/130474.html

相关文章

⽂件操作详解

本章讲述的是有关文件的相关内容,本章我们会认识到什么是文件,二进制文件与文本文件,文件的打开和关闭,⽂件的顺序读写和随机读写以及⽂件读取结束的判定和⽂件缓冲区。 1.什么是⽂件 像这样在磁盘(硬盘)上…

Pycharm里设置关于designer.exe以及pyuic5.exe的外部工具

文章目录 1.Pycharm与Pyuic5介绍(1)Pycharm(2)Pyuic5 2.Pycharm里设置外部工具(1)切换到外部工具(2)designer创建外部工具(3)pyuic5创建外部工具(4)使用designer和pyuic5 3.本章总结 1.Pycharm与Pyuic5介绍 (1)Pycharm Pycharm是专门用于python编程语言的编辑软件,…

Oracle 数据库安装及配置

✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…

Redis 持久化的两种方法详解

在使用 Redis 的过程中,持久化是一个非常重要的特性。它可以确保在 Redis 服务器重启或者出现故障时,数据不会丢失。Redis 提供了两种主要的持久化方法:RDB(Redis Database Backup)和 AOF(Append Only File…

互联网协议(IP)中最常用的端口

80 端口和 443 端口是互联网协议(IP)中最常用的两个端口,分别用于 HTTP 和 HTTPS 通信。以下是它们的作用、区别以及相关背景信息: 80 端口和 443 端口的作用 80 端口: 用于 HTTP(HyperText Transfer Prot…

OpenCV-光流估计

文章目录 一、光流估计介绍1.光流估计的基本概念2.光流估计的原理3.光流估计的前提4.OpenCV中的光流估计算法5.参数设置与调整 二、代码实现三、注意事项 OpenCV中的光流估计是计算机视觉领域中的一项重要技术,它通过分析图像序列中像素点的运动,来估计物…

wps文本框文字居中对齐

直接点对齐里的水平居中,垂直居中是将文本框水平垂直居中,文字不会居中 将文本框里的文字居中: 垂直居中: 水平居中:

AI测试入门:AI模型基准测试(Benchmark)

AI测试入门:AI模型基准测试(Benchmark) 1. 基准测试的定义2. 基准测试的目的3. 基准测试的常用指标4. 基准测试的流程5. 常用的AI基准测试框架总结1. 基准测试的定义 AI模型基准测试是一种评估AI模型性能的标准化方法,通过使用预定义的数据集、任务和评估指标,对AI模型在特…