SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)

news/2024/9/17 7:12:45/ 标签: java-rabbitmq, spring boot, rabbitmq

SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)

  • (一)死信队列
    • 使用场景
    • 具体用法
    • 前提
    • 示例:
  • (二)延迟队列
    • 使用场景
    • 方法一:通过死亡队列实现
    • 方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现

(一)死信队列

死信队列是一个重要的概念,用于处理那些因各种原因无法被正常消费的消息。
它不是RabbitMQ直接提供的一个现成的方法或工具,而是通过特定的配置和机制来实现的。

使用场景

死信队列在多种场景下都非常有用,包括但不限于:

  1. 消息重试机制:当消息处理失败时,可以将其发送到死信队列进行重试。
  2. 异常消息处理:对于无法被正常处理的异常消息,可以将其存储在死信队列中,以便后续分析处理。
  3. 延迟消息处理:通过结合消息的TTL(Time-To-Live,生存时间)和死信队列,可以实现消息的延迟处理。
  4. 确保消息不丢失:在消息处理过程中,如果发生消费者崩溃或网络故障等情况,消息可能会丢失。通过死信队列,可以确保这些消息得到保留,并在系统恢复后重新处理。

具体用法

要在RabbitMQ中设置和使用死信队列,通常需要按照以下步骤进行:

  1. 定义死信交换机(DLX):首先,需要定义一个交换机作为死信交换机,它可以是任何类型的交换机(如direct、fanout、topic等)。
  2. 配置原队列:在声明原队列时,需要指定两个参数:x-dead-letter-exchange和x-dead-letter-routing-key。前者指定了当消息变成死信时应该发送到的交换机(即死信交换机),后者指定了发送到该交换机的路由键。
  3. 声明死信队列:接着,需要声明一个或多个死信队列,并将它们绑定到死信交换机上。这样,当死信消息被发送到死信交换机时,就可以根据路由键将其路由到相应的死信队列中。
  4. 处理死信消息:最后,需要编写消费者代码来监听死信队列中的消息,并对这些消息进行相应的处理。

前提

要想进入死信队列,得出现异常,出现异常后,会根据你的配置帮你放到死信队列中 所以异常不要被捕获。
如果实在要捕获的话,就得你在消费者这边去做“发送消息的”操作,自己把发送过来消息塞到死信队列中

示例:

消费者 mq的yml配置(重试机制)

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:# 重试机制retry:enabled: true #是否开启消费者重试

配置类:

package com.example.reactboot.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DirectExchangeConfig {//===========================普通===========================//定义队列的名称常量public static final String DIRECT_QUEUE = "directQueue";public static final String DIRECT_QUEUE2 = "directQueue2";//定义直接交换机的名称常量public static final String DIRECT_EXCHANGE = "directExchange";//定义路由键常量,用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY = "direct";//定义路由键常量,用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY_2= "direct2";//定义队列,名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数@Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);// 创建队列,设置为持久化、非排他、非自动删除,并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}//定义直接交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}//定义队列,名称为DIRECT_QUEUE2@Beanpublic Queue directQueue2() {return new Queue(DIRECT_QUEUE2, true);}//定义一个绑定,将directQueue队列绑定到directExchange交换机上,//使用direct作为路由键@Beanpublic Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}// 定义一个绑定Bean,将directQueue2队列也绑定到directExchange交换机上,@Beanpublic Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY_2);}//===========================死信===========================// 定义死信交换机的名称public static final String DLX_EXCHANGE = "dlx_exchange";// 定义发送到死信交换机的路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";// 定义死信队列的名称public static final String DLX_QUEUE = "dlx_queue";/*** 声明死信交换机,这里使用Direct类型。* @return 返回一个配置好的DirectExchange对象。*/@BeanDirectExchange dlxExchange() {// 创建并返回Direct类型的交换机return new DirectExchange(DLX_EXCHANGE,true, false);}/*** 声明死信队列。* @return 返回一个配置好的Queue对象,用作死信队列。*/@BeanQueue dlxQueue() {// 创建并返回死信队列,设置为持久化return new Queue(DLX_QUEUE, true);}/*** 绑定死信队列到死信交换机,使用指定的路由键。*/@BeanBinding binding(Queue dlxQueue,DirectExchange dlxExchange) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY);}}

生产者发送消息:

package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RestController
public class RabbitMqTest {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sendMQ")public String sendMessage() {rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");return "direct消息发送成功!!";}}

消费者消费消息:

package com.example.reactboot.queueListener;import com.example.reactboot.config.DirectExchangeConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: DirectQueueListener* @description: 直连交换机的监听器* @author: sh.Liu* @date: 2021-08-23 16:03*/
@Slf4j
@Component
public class DirectQueueListener {//监听普通队列@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)public void process(String xx){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("DirectReceiver消费者收到消息1  : " + xx + " 接收时间:" + sdf.format(new Date()) + "\n");//先执行业务代码int i = 1 / 0;}//监听死信队列@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DLX_QUEUE)public void process3(String testMessage) {System.out.println("死信得列里面的  : " + testMessage + "\n");}}

(二)延迟队列

延迟队列是一种特殊的消息队列,其内部消息是有序的,并且具有延时属性。
在RabbitMQ中,虽然AMQP协议本身没有直接支持延迟队列,但可以通过一些变通的方法(如使用死信队列配合消息的TTL属性,或者使用RabbitMQ的延迟消息插件)来实现延迟队列的功能。

使用场景

延迟队列在多种业务场景中都有广泛的应用,包括但不限于:

  1. 订单超时未支付自动取消:用户下单后,如果在规定时间内未完成支付,系统可以自动取消订单。
  2. 退款超时通知:用户申请退款后,如果长时间未得到处理,系统可以自动通知相关运营人员介入。
  3. 新用户注册后的引导邮件:用户注册账号后,系统可以在一段时间后发送欢迎邮件或引导邮件。
  4. 会议提醒:在预定的会议开始前一段时间,系统自动发送提醒给参会人员。
  5. 任务调度:在指定时间后执行某项任务,如定时清理日志、执行批处理任务等。

方法一:通过死亡队列实现

以下是使用死信队列配合TTL属性实现延迟队列的基本步骤:

  1. 定义死信交换机(DLX, Dead-Letter Exchange)和死信队列(DLQ, Dead-Letter Queue)
  2. 设置普通队列的TTL和死信交换机:在创建普通队列时,可以为其设置TTL属性,指定消息在该队列中的最大存活时间。同时,需要将该队列的死信交换机设置为前面定义的DLX,以便消息在过期后能够被发送到DLQ。
  3. 生产者发送消息:生产者将消息发送到普通队列,并指定消息的TTL。消息在队列中等待,直到TTL过期。
  4. 消息过期并发送到死信队列:当消息的TTL过期后,RabbitMQ会自动将该消息发送到其配置的死信交换机,再由死信交换机根据路由键将其发送到DLQ。
  5. 消费者从死信队列消费消息:消费者监听DLQ,当有新消息到达时,进行消费处理。

就是:把普通队列的消息设置存活时间,目前有两者方式:
1.在队列上面设置消息的过期时间
2.直接在消息上面设置过期时间。

方式一(队列上面设置消息过期时间):

上面的关于 死信示例 完全可以复用进行测试

在以下的方法里面多加一行 args.put(“x-message-ttl”, 10000);

    //定义队列,名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数@Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);Map<String, Object> args = new HashMap<>();// 设置消息TTL为10秒args.put("x-message-ttl", 10000);// 设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);// 创建队列,设置为持久化、非排他、非自动删除,并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}  

你可以把 DirectQueueListener 里面的 process 方法注释掉(以免被消费掉)。
再执行生产者的 sendMessage 方法。
这个时候你就可以看到下面关于 监听死信队列 的方法 ,等10秒后就会打印你发的消息了

方式二(消息上面设置过期时间):

上面的关于 死信示例 完全可以复用进行测试

改一下 这个 生产者发送消息:

package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RestController
public class RabbitMqTest {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sendMQ")public String sendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct! "+sdf.format(new Date()), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间,超过5秒消息就会消失message.getMessageProperties().setExpiration("5000");//设置编码格式message.getMessageProperties().setContentEncoding("UTF-8");return message;}});     return "direct消息发送成功!!";}}

你可以把 DirectQueueListener 里面的 process 方法注释掉(以免被消费掉)。
再执行生产者的 sendMessage 方法。
这个时候你就可以看到下面关于 监听死信队列 的方法 ,等5秒后就会打印你发的消息了

到这里其实就结束了,剩下的就是监听到死信队列里面的消息后的业务操作了

rabbitmq_delayed_message_exchange_346">方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现

后续在说


http://www.ppmy.cn/news/1522253.html

相关文章

【Oracle点滴积累】解决IMP-00017、ORA-20005、ORA-06512错误的方法

广告位招租&#xff01; 知识无价&#xff0c;人有情&#xff0c;无偿分享知识&#xff0c;希望本条信息对你有用&#xff01; 今天和大家分享 IMP-00017: folloging statement failed with ORACLE error 20005 ORA-20005: object statistics are locked (stattype ALL) 错…

WordPress上可以内容替换的插件

插件下载地址&#xff1a;WordPress内容替换插件 – 果果开发 类型 替换的类型&#xff1a;文章、自定义文章类型、分类、标签、媒体库、页面、评论、数据库表&#xff0c;不同的类型可以替换不同的字段。 替换字段 替换的字段&#xff0c;哪些字段内容需要替换。除了数据库…

Q215 数组中第K大的元素

思路 可以用排序&#xff0c;但是不用全有序 还有个要求是O&#xff08;n&#xff09; 快排改版 快排只排需要的部分 public int findKthLargest(int[] nums, int k) {return quickSort(nums, 0, nums.length-1, nums.length-k);}public static int quickSort(int[] nums, …

JVM3-双亲委派机制

目录 概述 作用 如何指定加载类的类加载器&#xff1f; 面试题 打破双亲委派机制 自定义类加载器 线程上下文类加载器 Osgi框架的类加载器 概述 由于Java虚拟机中有多个类加载器&#xff0c;双亲委派机制的核心是解决一个类到底由谁加载的问题 双亲委派机制&#xff…

2409wtl,切换视图

原文 介绍 我从一个基于SDI(单文档接口)WTL向导的应用开始,添加了一些从控件继承的窗口和一些对话框窗口(表单视图),然后才发现我必须,使SDI框架动态加载和卸载子窗口. 本文演示了两个可用来完成的技术:在SDI应用中的视图间动态切换.这是我使用的两个. 技术 1技术:第一个方…

指针作为函数参数详解

一级指针传参 形参指针的指向没有被改变 void test(int* p1) {*p1 8; }int main() {int a 5;int* p &a;test(p);printf("%d\n", a); }输出 8总结: 由代码和上图可知&#xff0c;实参p是个指针&#xff0c;其值为变量a的地址&#xff0c;将其传参给形参p1&…

webpack+lite-server 构建项目示例

首先安装以下库 npm install --save-dev webpack webpack-cli lite-server npm install --save-dev babel-loader babel/core babel/preset-env项目结构 webpack.config.js 配置 const path require("path");module.exports {entry: "./src/index.js",…

5G前传-介绍

1. 引用 知识分享系列一&#xff1a;5G基础知识-CSDN博客 5G前传的最新进展-CSDN博客 灰光和彩光_通信行业5G招标系列点评之二&#xff1a;一文读懂5G前传-光纤、灰光、彩光、CWDM、LWDM、MWDM...-CSDN博客 术语&#xff1a; 英文缩写描述‌BBU&#xff1a;Building Baseba…

华为云征文|Flexus云服务X实例安装ODBC驱动,在ODBC中建立MySQL数据库连接,通过QT连接云数据库

引出 4核12G-100G-3M规格的Flexus X实例使用测评第2弹&#xff1a;Flexus云服务X实例安装ODBC驱动&#xff0c;在ODBC中建立MySQL数据库连接&#xff0c;通过QT连接云数据库 什么是Flexus云服务器X实例 官方解释&#xff1a; Flexus云服务器X实例是新一代面向中小企业和开发…

基于发布-订阅模型的音视频流分发框架

有时需要同时网络推流和把流封装为某格式&#xff0c;或做一些其它操作。这就需要一个分发流的机制&#xff0c;把同一路流分发给多个使用者去操作&#xff0c;下面实现了一个简易的线程安全的音视频流分发框架。代码如下&#xff1a; avStreamHub.h #ifndef STREAMHUB_H #def…

算法专题一: 双指针

目录 前言1. 移动零&#xff08;easy&#xff09;2. 复写零&#xff08;easy&#xff09;3. 快乐数&#xff08;medium&#xff09;4. 盛水最多的容器&#xff08;medium&#xff09;5. 有效三角形的个数&#xff08;medium&#xff09;6. 和为 s 的两个数字&#xff08;easy&a…

Docker 进阶构建:镜像、网络与仓库管理

目录 三. docker镜像构建 1. docker镜像结构 2. 镜像运行的基本原理 3. 镜像获得方式 4. 镜像构建 5. Dockerfile实例 6. 镜像优化方案 6.1. 镜像优化策略 6.2. 镜像优化示例:缩减镜像层 6.3. 镜像优化示例:多阶段构建 6.4. 镜像优化示例:使用最精简镜像 四. docke…

网络安全服务基础Windows--第15节-CA与HTTPS理论

公钥基础设施&#xff08;Public Key Infrastructure&#xff0c;简称 PKI&#xff09;是指⼀套由硬件、软件、⼈员、策略和程序组成的系统&#xff0c;⽤于创建、管理、分发、使⽤、存储和撤销数字证书。PKI 的核⼼⽬的是通过使⽤公钥加密技术来确保电⼦通信的安全性。PKI 为数…

八月二十九日(day 39)docker6

1.前端&#xff08;nginx&#xff09; [rootlocalhost ~]# docker pull nginx //拉取nginx镜像 [rootlocalhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE nginx latest 5ef79149e0ec 2 we…

springboot数据库连接由localhost改成IP以后访问报错500(2024/9/7

步骤很详细&#xff0c;直接上教程 情景复现 一.没改为IP之前正常 二.改完之后报错 问题分析 SQL没开启远程连接权限 解决方法 命令行登入数据库 mysql -u root -p切换到对应数据库 use mysql;设置root用户的连接权限允许其他IP连接数据库 update user set host % whe…

前端技术(六)—— AJAX详解

一、原生 AJAX 1. AJAX 简介 AJAX 全称为 Asynchronous JavaScript And XML&#xff0c;就是异步的 JS 和 XML。 通过 AJAX 可以在浏览器中向服务器发送异步请求&#xff0c;最大的优势&#xff1a;无刷新获取数据。 AJAX 不是新的编程语言&#xff0c;而是一种将现有的标准组…

C语言程序设计(初识C语言后部分)

留一片空白&#xff0c;随时浓墨重彩。 二十&#xff0c;结构体 结构体类型的声明 结构体初始化 结构体成员访问 结构体传参 1.结构体的声明 1&#xff09;结构的基础知识 结构是一些值的集合&#xff0c;这些值称为成员变量。结构的每个成员可以是不同类型的变量。 2&…

上海网站设计-网站手机端制作

随着移动互联网的迅猛发展&#xff0c;越来越多的人通过手机上网&#xff0c;这使得网站手机端的设计和制作变得尤为重要。在这种背景下&#xff0c;上海的网站设计行业迎来了新的机遇与挑战。 首先&#xff0c;网站手机端制作的必要性不容忽视。根据统计数据显示&#xff0c;手…

Flask框架 完整实战案例 附代码解读 【3】

Flask 是一个轻量级的可定制框架&#xff0c;使用Python语言编写&#xff0c;较其他同类型框架更为灵活、轻便、安全且容易上手。 前面已经写过项目从新建运行安装到测试部署的全流程&#xff0c;其中有写Flask框架从新建到部署全流程&#xff0c;但是只有部分代码。本篇主要是…

微软发布Phi-3.5 SLM,附免费申请试用

Phi-3 模型系列是Microsoft 小型语言模型 (SLM) 系列中的最新产品。 它们旨在具有高性能和高性价比&#xff0c;在语言、推理、编码和数学等各种基准测试中的表现均优于同类和更大规模的模型。Phi-3 模型的推出扩大了 Azure 客户的高质量模型选择范围&#xff0c;为他们编写和…