【RabbitMQ高级——过期时间TTL+死信队列】

ops/2024/10/22 7:07:37/

1. 过期时间TTL概述

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。

目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

2.TTL设置方法

2.1. 通过队列属性设置

java">package com.zju.service.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author guozonghao* @Description* @create 2024-10-04 09:37*/
@Configuration
public class TTLRabbitMqConfiguration {@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl_direct_exchange",true,false);}@Beanpublic Queue directTTLQueue(){//设置过期时间HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);//这里一定是int类型return new Queue("ttl.direct.queue",true,false,false,args);}@Beanpublic Binding directTTLBinding(){return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl");}
}

2.2. 通过消息属性设置

通过AMQP.BasicProperties这个类在发送消息时配置相关的属性

java">public class MessageTTLProducer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */channel.queueDeclare("ttl.queue2", true, false, false, null);// 6: 准备发送消息的内容String message = "你好,学相伴!!!";Map<String, Object> headers = new HashMap<String, Object>();headers.put("x", "1");headers.put("y", "1");AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2) // 传送方式.priority(1).contentEncoding("UTF-8") // 编码方式.expiration("5000") // 过期时间.headers(headers).build(); //自定义属性// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容for (int i = 0; i <10 ; i++) {channel.basicPublish("", "ttl.queue2", basicProperties, message.getBytes());System.out.println("消息发送成功!");}} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

3. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:

  • 消息被拒绝(没有被ack)
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。
在这里插入图片描述

3.1 创建死信队列

java">package com.zju.service.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author guozonghao* @Description* @create 2024-10-04 09:37*/
@Configuration
public class DeadRabbitMqConfiguration {@Beanpublic DirectExchange deadDirectExchange(){return new DirectExchange("dead_direct_exchange",true,false);}@Beanpublic Queue directDeadQueue(){return new Queue("dead.direct.queue",true);}@Beanpublic Binding directDeadBinding(){return BindingBuilder.bind(directDeadQueue()).to(deadDirectExchange()).with("dead");}
}

3.2 其他队列绑定死信队列

java">package com.zju.service.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author guozonghao* @Description* @create 2024-10-04 09:37*/
@Configuration
public class TTLRabbitMqConfiguration {@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl_direct_exchange",true,false);}@Beanpublic Queue directTTLQueue(){//设置过期时间HashMap<String, Object> args = new HashMap<>();args.put("x-message-ttl",5000);//设置过期时间args.put("x-max-length",5);//设置队列的最大长度args.put("x-dead-letter-exchange","dead_direct_exchange");//绑定自己创建的死信交换机args.put("x-dead-letter-routing-key","dead");//设置死信队列的路由keyreturn new Queue("ttl.direct.queue",true,false,false,args);}@Beanpublic Binding directTTLBinding(){return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl");}
}

http://www.ppmy.cn/ops/124175.html

相关文章

EdgeNAT: 高效边缘检测的 Transformer

EdgeNAT: Transformer for Efficient Edge Detection 介绍了一种名为EdgeNAT的基于Transformer的边缘检测方法。 1. 背景与动机 EdgeNAT预测结果示例。(a, b):来自BSDS500的数据集的输入图像。(c, d):对应的真实标签。(e, f):由EdgeNAT检测到的边缘。(e)显示了由于颜色变化…

Stable Diffusion绘画 | 如何做到不同动作表情,人物角色保持一致性(上篇)

由于 SD 具有强大的可控性&#xff0c;在固定人物角色方面&#xff0c;SD 是远超 MJ 的&#xff0c; 其中最好用&#xff0c;也是最优先的方法就是训练一个自己专属的角色模型&#xff0c;例如之前使用秋叶训练器得到的 LoRA模型。 另外&#xff0c;如果不想自己训练模型的话…

重学SpringBoot3-集成Redis(三)之注解缓存策略设置

更多SpringBoot3内容请关注我的专栏&#xff1a;《SpringBoot3》 期待您的点赞&#x1f44d;收藏⭐评论✍ 重学SpringBoot3-集成Redis&#xff08;三&#xff09;之注解缓存策略设置 1. 引入 Redis 依赖2. 配置 RedisCacheManager 及自定义过期策略2.1 示例代码&#xff1a;自定…

PostgreSQL学习笔记九:事务详解

在PostgreSQL中&#xff0c;事务&#xff08;TRANSACTION&#xff09;是一个重要的概念&#xff0c;旨在确保数据库操作的完整性和一致性。事务是一个逻辑单位&#xff0c;包含一系列数据库操作&#xff0c;这些操作要么全部成功&#xff0c;要么全部失败。事务的主要特性可以用…

【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL76

任意奇数倍时钟分频 描述 编写一个模块&#xff0c;对输入的时钟信号clk_in&#xff0c;实现任意奇数分频&#xff0c;要求分频之后的时钟信号占空比为50%。模块应包含一个参数&#xff0c;用于指定分频的倍数。 模块的接口信号图如下&#xff1a; 要求&#xff1a;使用Veril…

ChatGPT相关参数示例

max_token 用于控制最大输出长度&#xff0c;若ChatGPT的回复大于max_tokens&#xff0c;则对输出结果进行截断。 from openai import OpenAI client OpenAI(base_url"https://api.chatanywhere.tech/v1" ) response client.chat.completions.create(model"…

【含开题报告+文档+PPT+源码】基于过滤协同算法的旅游推荐管理系统设计与实现

开题报告 旅游业作为一种重要的经济活动&#xff0c;对于一个地区的经济发展和文化传承具有重要意义。泉州作为中国华东地区的重要城市&#xff0c;拥有丰富的自然资源和独特的文化底蕴&#xff0c;吸引了大量的游客。然而&#xff0c;随着旅游业的快速发展&#xff0c;游客数…

HCIP--以太网交换安全(三)MAC地址漂移防止与检测

MAC地址漂移防止与检测 一、MAC地址漂移防止与检测知识点 1.1MAC地址漂移的概述 MAC地址漂移是指交换机上一个vlan内有两个端口学习到同一个MAC地址&#xff0c;后学习到的MAC地址表项覆盖原MAC地址表项的现象。 1.2.MAC地址漂移的防止方法 &#xff08;1&#xff09;配置…