036 RabbitMQ消息确认 死信队列 延时队列

ops/2024/11/17 3:01:26/

文章目录

  • 生产者确认模式
    • application.properties
    • MessageController.java
    • MessageConfirmRallback.java
  • 生产者回退模式
    • application.properties
    • MessageConfirmRallback.java
    • MessageController.java
  • 消费者手动确认
    • application.properties
    • ConsumerAckQueueListener.java
  • 死信队列
  • 延时队列

测试链接 http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingKey=order.A&msg=aaa

思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性

问题解决方案:
1.生产者确认模式
2.生产者回退模式

目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下
目标2: 消息能够从交换机百分百进入到队列
实现步骤:
1.配置开启生产者回退模式
2.编写生产者回退的回调方法
3.设置回退回调方法
4.测试

生产者确认模式

application.properties

# 配置开启生产者确认模式
spring.rabbitmq.publisher-confirms=true

MessageController.java

package com.cubemall.controller;/*
目标: 搭建RabbitMQ高级特性演示环境
1.搭建消费者工程[复用之前工程]
2.搭建提供者工程[复用之前工程]
3.编写MessageController: 用来发送消息
交换机
路由键
消息内容
4.RabbitMQ配置交换机和队列,及路由键
5.编写消费者监听思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性问题解决方案:
1.生产者确认模式
2.生产者回退模式目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下*/import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {//发送消息接口@Autowiredprivate RabbitTemplate rabbitTemplate;//定义发送消息的接口@RequestMapping("/direct/sendMsg")public String sendMsgtoMQ(String exchange,String routingKey,String msg){rabbitTemplate.convertAndSend(exchange,routingKey,msg);return "已投递";}
}

MessageConfirmRallback.java

package com.cubemall.controller;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*
发送消息回调确认类:消息如果没有进入交换机,会回调当前类中的confirm*/
@Component
public class MessageConfirmRallback implements RabbitTemplate.ConfirmCallback {//配置回调的方法@Autowiredprivate RabbitTemplate rabbitTemplate;//配置在当前对象注入之后,再设置当前对象到RabbitTemplate对象中@PostConstruct//注解作用: 在当前对象初始化完毕之后执行的方法public void initRabbittemplate(){rabbitTemplate.setConfirmCallback(this::confirm);}/*** 不论是否进入交换机,都会回调当前方法* @param correlationData 消息投递封装对象* @param ack 是否投递成功* @param exception 如果错误,错误原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String exception) {if (ack) {System.out.println("消息进入了交换机成功{}");}else {System.out.println("消息进入了交换机失败{} 原因:"+exception);}}
}

生产者回退模式

application.properties

# 配置开启生产者的回退模式
spring.rabbitmq.publisher-returns=true

MessageConfirmRallback.java

package com.cubemall.controller;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*
发送消息回调确认类:消息如果没有进入交换机,会回调当前类中的confirm*/
@Component
public class MessageConfirmRallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {//配置回调的方法@Autowiredprivate RabbitTemplate rabbitTemplate;//配置在当前对象注入之后,再设置当前对象到RabbitTemplate对象中@PostConstruct//注解作用: 在当前对象初始化完毕之后执行的方法public void initRabbittemplate(){rabbitTemplate.setConfirmCallback(this::confirm);rabbitTemplate.setReturnCallback(this::returnedMessage);}/*** 不论是否进入交换机,都会回调当前方法* @param correlationData 消息投递封装对象* @param ack 是否投递成功* @param exception 如果错误,错误原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String exception) {if (ack) {System.out.println("消息进入了交换机成功{}");}else {System.out.println("消息进入了交换机失败{} 原因:"+exception);}}/*** 消息从交换机进入队列失败回调方法:只会在失败的情况下* @param message the returned message.* @param replyCode the reply code.* @param replyText the reply text.* @param exchange the exchange.* @param routingKey the routing key.*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息从交换机进入队列失败: >>>>>>>");System.out.println("exchange = " + exchange);System.out.println("replyCode = " + replyCode);System.out.println("replyText = " + replyText);System.out.println("routingKey = " + routingKey);}
}

MessageController.java

package com.cubemall.controller;/*
目标: 搭建RabbitMQ高级特性演示环境
1.搭建消费者工程[复用之前工程]
2.搭建提供者工程[复用之前工程]
3.编写MessageController: 用来发送消息
交换机
路由键
消息内容
4.RabbitMQ配置交换机和队列,及路由键
5.编写消费者监听思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性问题解决方案:
1.生产者确认模式
2.生产者回退模式目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下目标2: 消息能够从交换机百分百进入到队列
实现步骤:
1.配置开启生产者回退模式
2.编写生产者回退的回调方法
3.设置回退回调方法
4.测试*/import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {//发送消息接口@Autowiredprivate RabbitTemplate rabbitTemplate;//定义发送消息的接口@RequestMapping("/direct/sendMsg")public String sendMsgtoMQ(String exchange,String routingKey,String msg){rabbitTemplate.convertAndSend(exchange,routingKey,msg);return "已投递";}
}

消费者手动确认

application.properties

# 配置开启消费端手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual# 配置开启重试
spring.rabbitmq.listener.direct.retry.enabled=true

ConsumerAckQueueListener.java

package com.cubemall.listeners;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*
消费者消息队列监听器
问题: 消费者能不能百分百接收到请求,而且业务逻辑处理出现异常,消息还能不能算接收到呢?目标: 演示消费者手动确认的过程
实现步骤:
1.编写监听器类 和对于监听的方法,编写手动签收的业务逻辑
2.配置开启手动签收
3.测试*/
@Component
@RabbitListener(queues = "order.A")
public class ConsumerAckQueueListener {//处理消息方法@RabbitHandlerpublic void simpleHandler(String msg, Message message, Channel channel) throws IOException {System.out.println("下单消息{},内容为: " + msg);//获取消息的投递标签long deliveryTag = message.getMessageProperties().getDeliveryTag();try {if (msg.contains("苹果")) {throw new RuntimeException("不允许售卖苹果手机");}//签收消息/*** 参数1: 投递标签* 参数2: 是否是批量签收,true一次性签收所有消息,如果是false则只签收当前消息*/channel.basicAck(deliveryTag,false);System.out.println("签收成功{}");} catch (IOException e) {//e.printStackTrace();//参数1: 投递标签//参数2: 是否批量//参数3: 是否重回队列channel.basicNack(deliveryTag,false,true);System.out.println("签收失败{}");}//拒绝签收消息: 出现异常了,拒绝签收}
}

死信队列

死信队列

延时队列

延时队列

消费者消息队列监听器
问题1: 消费者能不能百分百接收到请求,而且业务逻辑处理出现异常,消息还能不能算接收到呢?

目标: 演示消费者手动确认的过程
实现步骤:
1.编写监听器类 和对于监听的方法,编写手动签收的业务逻辑
2.配置开启手动签收
3.测试

问题2: 消息在队列中,如果没有被消费者消费?
TTL–> Time to Live (存活时间/有效期)
目标: 演示消息队列中消息失效超时过程
步骤:
1.配置新的队列order.B,设置队列内消息的超时时间5s x-message-ttl
2.将队列绑定order_exchange交换机上
3.发送消息,测试

问题3: 消息发送失败了,消息丢失了?消息有效期到了
死信队列: 当消息失效了,统一进入的一个队列,这个队列称之为死信队列
主要有三种情况:
1.到达了消息队列容量上限!
2.消费者如果拒绝签收,不重回队列!
3.消息超时了!

目标: 演示成为死信的过程
步骤:
1.建立死信队列deadQueue
2.建立死信交换机deadExchange
3.死信队列绑定死信交换机:order.dead
4.队列order.B绑定死信交换机 x-dead-letter-exchange x-dead-letter-routing-key
5.向队列发送消息,测试死信交换机

需求: 1.新用户注册成功7天后,发送消息问候?
2.下单后,30分钟未支付,取消订单,回滚票
延迟队列: 消息进入队列后不会被消费,只有到达指定的时间后才会被消费!


http://www.ppmy.cn/ops/134316.html

相关文章

如何使用 python 中的 Pillow 创建可自定义的图标生成器

在本篇博客中,我们将探讨如何使用 wxPython 和 Pillow 库创建一个简单的图标生成器。用户可以通过该工具选择目标文件夹,并生成三种不同尺寸(16x16、48x48、128x128)的PNG格式图标文件。图标将采用“截图框”的主题,通…

猫狗识别之BUG汇总

一、github登不上去问题 下载watt toolkit 下载地址:https://steampp.net/ 可以下载后加速,访问github 二、猫狗总体参考核心 B哥的博客 https://github.com/bubbliiiing/classification-keras?tabreadme-ov-file 三、CSDN很多会员才能阅读问题 根据…

电子应用产品设计方案-9:全自动智能马桶系统设计方案

一、系统概述 本全自动智能马桶系统旨在提供舒适、卫生、便捷和智能化的如厕体验。通过融合多种传感器技术、电子控制单元和机械执行机构,实现马桶的自动冲洗、座圈加热、臀部清洗、烘干等功能,并具备智能感应、用户个性化设置和健康监测等特色功能。 二…

Redis中的过期删除与内存淘汰

1.Redis中的过期删除策略 在 Redis 中,过期删除策略是为了管理存储在 Redis 中的带有过期时间的数据。每当数据存储时,可能会为其设定一个过期时间。当到达这个时间点后,该数据就被标记为“过期”。为了确保不再需要的过期数据不会占用系统资…

ARM-Linux嵌入式开发环境搭建

平台: PC: VMware CentOs6.3 内核版本:linux2.6.32 Qt: : Qt4.8.2(32b) 一、qt的版本介绍 按照不同的图形界面来划分,分为四个版本: Win32版:适用于windows平台; X11版:适用…

MySQL5.7.37安装配置

1.下载MySQL软件包并解压 2.配置环境变量 3.新建my.ini文件并输入信息 [mysqld] #端口号 port 3306 #mysql-5.7.27-winx64的路径 basedirC:\mysql-5.7.37\mysql-5.7.37-winx64 #mysql-5.7.27-winx64的路径\data datadirC:\mysql-5.7.37\mysql-5.7.37-winx64\data #最大连接数…

vue 中监听页面尺寸变化就调用函数

方法一:使用 window.onresize 结合 Vue 实例的生命周期钩子(不推荐,存在覆盖风险) 虽然可以直接使用原生的 window.onresize 事件来监听窗口大小变化,但这种方式在 Vue 项目中有一些局限性,因为如果在多个…

Leetcode 罗马数字转整数

代码的算法思想可以分为以下几步: 建立映射表: 首先,代码使用 HashMap 来存储罗马数字字符与其对应的整数值关系。例如,I 对应 1,V 对应 5,以此类推。这是为了方便后续快速查找每个罗马字符对应的整数值。 …