RabbitMQ 之 死信队列

ops/2024/11/25 20:04:16/

一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

三、死信实战

代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。

消息 TTL 过期

1.application.yml 配置

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: metieVirtualHosts
#    listener:
#      simple:
#        retry:
#          ### 开启消费者(程序出现异常的情况下会进行重试)
#          enabled: true
#          ### 最大重试次数
#          max-attempts: 5
#          ### 重试间隔次数
#          initial-interval: 3000
#        acknowledge-mode: manualserver:port: 8080###死信队列
mayikt:dlx:exchange: mayikt_dlx_exchangequeue: mayikt_order_dlx_queueroutingkey: dlx### 订单order:exchange: mayikt_order_exchangequeue: mayikt_order_queueroutingkey: mayikt.order

2MQConfig配置

package com.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** @description:* @author: huangan* @date: 2024/11/23 20:48*/
@Component
public class DeadLetterMQConfig {/*** 订单交换机*/@Value("${mayikt.order.exchange}")private String orderExchange;/*** 订单队列*/@Value("${mayikt.order.queue}")private String orderQueue;/*** 订单路由key*/@Value("${mayikt.order.routingkey}")private String orderRoutingkey;/*** 死信交换机*/@Value("${mayikt.dlx.exchange}")private String dlxExchange;/*** 死信队列*/@Value("${mayikt.dlx.queue}")private String dlxQueue;/*** 死信路由*/@Value("${mayikt.dlx.routingkey}")private String dlxRoutingkey;/*** 声明死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return new DirectExchange(dlxExchange);}/*** 声明死信队列* @return*/@Beanpublic Queue dlxQueue(){return new Queue(dlxQueue);}/*** 声明订单业务交换机* @return*/@Beanpublic DirectExchange orderExchange(){return new DirectExchange(orderExchange);}/*** 声明订单队列* @return*/@Beanpublic Queue orderQueue(){//订单队列要绑定死信交换机Map<String,Object> arguments = new HashMap<>(2);arguments.put("x-dead-letter-exchange",dlxExchange);arguments.put("x-dead-letter-routing-key",dlxRoutingkey);return new Queue(orderQueue,true,false,false,arguments);}/*** 绑定死信队列到死信交换机* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingkey);}/*** 绑定订单队列到订单交换机* @return*/@Beanpublic Binding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingkey);}
}

3.生产者OrderProducer

package com.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @description:* @author: huangan* @date: 2024/11/23 21:58*/
@RestController
public class OrderProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 订单交换机*/@Value("${mayikt.order.exchange}")private String orderExchange;/*** 订单路由key*/@Value("${mayikt.order.routingkey}")private String orderRoutingkey;@RequestMapping("/sendOrder")public String sendOrder(){String msg ="测试数据";rabbitTemplate.convertAndSend(orderExchange,orderRoutingkey,msg,message -> {//设置消息过期时间10秒过期message.getMessageProperties().setExpiration("10000");return  message;});return "success";}}

4.订单消费者

package com.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @description: 订单队列* @author: huangan* @date: 2024/11/23 22:13*/
@Component
@Slf4j
public class OrderConsumer {/*** 监听队列回调方法* @param msg*/@RabbitListener(queues = "mayikt_order_queue")public void orderConsumer(String msg){log.info(">>正常订单消费者消费MSG:{}<<",msg);}
}

5.死信消费者

package com.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @description: 死信队列* @author: huangan* @date: 2024/11/23 22:17*/
@Slf4j
@Component
public class OrderDlxConsumer {/*** 监听队列回调方法* @param msg*/@RabbitListener(queues = "mayikt_order_dlx_queue")public void orderConsumer(String msg){log.info(">>死信队列消费订单消费者消费MSG:{}<<",msg);}
}


http://www.ppmy.cn/ops/136656.html

相关文章

Lucene(2):Springboot整合全文检索引擎TermInSetQuery应用实例附源码

前言 本章代码已分享至Gitee: https://gitee.com/lengcz/springbootlucene01 接上文。Lucene(1):Springboot整合全文检索引擎Lucene常规入门附源码 如何在指定范围内查询。从lucene 7 开始&#xff0c;filter 被弃用&#xff0c;导致无法进行调节过滤。 TermInSetQuery 指定…

cookie反爬----普通服务器,阿里系

目录 一.常见COOKIE反爬 普通&#xff1a; 1. 简介 2. 加密原理 二.实战案例 1. 服务器响应cookie信息 1. 逆向目标 2. 逆向分析 2. 阿里系cookie逆向 1. 逆向目标 2. 逆向分析 实战&#xff1a; 无限debugger原理 1. Function("debugger").call() 2. …

postman 调用 下载接口(download)使用默认名称(response.txt 或随机名称)

官网地址&#xff1a;https://www.postman.com 介绍 Postman 是一款流行的 API 开发和测试工具&#xff0c;用于发送 HTTP 请求、测试接口、调试服务器响应以及进行 API 文档管理。它支持多种请求类型&#xff08;如 GET、POST、PUT、DELETE 等&#xff09;&#xff0c;并且功能…

微信小程序技术架构图

一、视图层1.WXML&#xff08;WeiXin Markup Language&#xff09; 这是微信小程序的标记语言&#xff0c;类似于 HTML。它用于构建小程序的页面结构。例如&#xff0c;通过标签来定义各种视图元素&#xff0c;如<view>&#xff08;类似于 HTML 中的<div>&#xff…

ceph 18.2.4二次开发,docker镜像制作

编译环境要求 #需要ubuntu 22.04版本 参考https://docs.ceph.com/en/reef/start/os-recommendations/ #磁盘空间最好大于200GB #内存如果小于100GB 会有OOM的情况发生,需要重跑 目前遇到内存占用最高为92GB替换阿里云ubuntu 22.04源 将下面内容写入/etc/apt/sources.list 文件…

冒泡排序(Java)

冒泡排序&#xff08;Bubble Sort&#xff09;是一种简单的排序算法&#xff0c;它重复地遍历要排序的列表&#xff0c;比较相邻的元素并交换它们的位置&#xff0c;直到整个列表排序完成。冒泡排序的名称来源于越小的元素会经由交换慢慢“浮”到数列的顶端。 原理 比较相邻的…

GPU服务器厂家:科研服务器领域机遇与博弈,AMD 新UDNA 架构

科研服务器作为推动科学研究进步的核心基础设施&#xff0c;其性能与架构的创新对于整个科研生态有着极为关键的影响。AMD 全新推出的 UDNA 架构&#xff0c;引发了广泛的关注与讨论。 AMD UDNA 架构于科研服务器的产品数据与市场格局 AMD 在计算机硬件领域的影响力持续攀升&a…

使用ENSP实现默认路由

一、项目拓扑 二、项目实现 1.路由器AR1配置 进入系统试图 sys将路由器命名为R1 sysname R1关闭信息中心 undo info-center enable 进入g0/0/0接口 int g0/0/0将g0/0/0接口IP地址配置为2.2.2.1/24 ip address 2.2.2.1 24进入g0/0/1接口 int g0/0/1将g0/0/1接口IP地址配置为1.…