SpringBoot(四十)SpringBoot集成RabbitMQ使用过期时间+死信队列实现延迟队列

devtools/2024/11/27 13:26:25/

前边我们使用RabbitMQ实现了高并发下对流量的削峰填谷。正常在实际应用中大概也就够用了。

有的时候呢,我们需要使用到延迟队列,RabbitMQ不像RocketMQ一样默认就支持延迟队列,RabbitMQ是不支持延迟队列的,但是呢?我们可以通过正常的队列加上消息的过期时间,配置死信队列,来模拟实现延迟队列。

一:创建普通队列(配置过期时间),绑定死信队列

很简单就是创建两个普通队列,将一个普通队列A指定为另一普通队列B的死信队列。

且给普通队列B配置过期时间。

1:配置类具体代码如下:

具体代码如下:

package com.modules.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQConfig
{@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.listener.prefetch}")private int prefetch;@Value("${spring.rabbitmq.listener.concurrency}")private int concurrentConsumers;@Value("${spring.rabbitmq.listener.max-concurrency}")private int maxConcurrentConsumers;/*** 链接RabbitMQ* @return*/@Beanpublic ConnectionFactory connectionDirectFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true); //必须要设置return connectionFactory;}/*** 配置RabbitMQ参数* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitDirectListenerContainerFactory(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionDirectFactory());//设置最小并发的消费者数量factory.setConcurrentConsumers(concurrentConsumers);//设置最大并发的消费者数量factory.setMaxConcurrentConsumers(maxConcurrentConsumers);//限流,单位时间内消费多少条记录factory.setPrefetchCount(prefetch);// json转消息//factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置rabbit 确认消息的模式,默认是自动确认//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置rabbit 确认消息的模式,默认是自动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 回调函数* @param connectionFactory* @return*/@Beanpublic RabbitTemplate createDirectRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Manatory,才能触发回调函数,无论消息推送结果怎么样都会强制调用回调函数rabbitTemplate.setMandatory(true);// 设置确认发送到交换机的回调函数 =》 消息推送到server,但是在server里找不到交换机 / 消息推送到sever,交换机和队列啥都没找到 / 消息推送到server,找到交换机了,但是没找到队列 / 消息推送成功rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(ack){System.out.println("发送者消息确认成功!");}else{System.out.println("发送者消息确认失败,考虑重发:"+cause);}//System.out.println("相关数据:"+correlationData);//System.out.println("确认情况:"+ack);//System.out.println("原因:"+cause);//System.out.println("===============================");});//设置确认消息已发送到队列的回调  =》 消息推送到server,找到交换机了,但是没找到队列 触发这个回调函数rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("交换机为:"+returnedMessage.getExchange());System.out.println("返回消息为:"+returnedMessage.getMessage());System.out.println("路由键为:"+returnedMessage.getRoutingKey());System.out.println("回应消息为:"+returnedMessage.getReplyText());System.out.println("回应代码为:"+returnedMessage.getReplyCode());System.out.println("===============================");});return rabbitTemplate;}//正常队列// 交换机public static final String NORMAL_EXCHANGE  = "normal_exchange";// 队列名称public static final String NORMAL_QUEUE  = "normal_queue";// 路由键public static final String NORMAL_ROUTE = "normal_route";//死信队列// 交换机public static final String DEAD_EXCHANGE = "dead_exchange";// 队列名称public static final String DEAD_QUEUE = "dead_queue";// 路由键public static final String DEAD_ROUTE = "dead_route";/*** 死信交换机* @return*/@Beanpublic Exchange deadExchange(){// 创建死信队列交换机return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();}/*** 死信队列* @return*/@Beanpublic Queue deadQueue(){// 创建死信队列return QueueBuilder.durable(DEAD_QUEUE).build();}/*** 死信交换机绑定死信队列* @param deadExchange* @param deadQueue* @return*/@Beanpublic Binding deadBinding(Exchange deadExchange,Queue deadQueue){// 死信交换机绑定死信队列return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTE).noargs();}/*** 设置正常队列(过期时间)*//*@Beanpublic Queue TTLQUEUE(){Map<String, Object> map = new HashMap<>();map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期return new Queue(NORMAL_QUEUE, true, false, false, map);}//*//*** 绑定死信交换机及路由key(该正常队列内的消息无法被正常消费时,会转发给绑定的死信交换机通过路由key转发到死信队列)*/@Beanpublic Queue normalQueue(){// 创建队列return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)// 绑定死信队列交换机.deadLetterRoutingKey(DEAD_ROUTE)// 绑定死信队列路由.ttl(30000)// 设置消息过期时间.build();}/*** 正常队列交换机*/@Beanpublic DirectExchange normalExchange(){// return new DirectExchange(NORMAL_EXCHANGE);// 创建交换机return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();}/*** 正常交换机绑定正常队列*/@Beanpublic Binding binding(Queue normalQueue,Exchange normalExchange){// 将 队列 交换机 路由key绑定到一起。return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTE).noargs();}//*/
}

2:生产者代码如下:

package com.modules.controller.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TTLController
{@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/java/ttlproduct")public String sendTrafficMessage(@RequestParam String message){for (int i = 1; i <= 100; i++){// 使用java多线程来模拟多用户并发请求final int temp = i;new Thread(()->{// 给RabbitMQ发送消息rabbitTemplate.convertAndSend("normal_exchange","normal_route","hello world:"+temp,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throwsAmqpException{// System.out.println("发送回调:"+temp);System.out.println(message);return message;}});}).start();}return "Message sent";}
}

这里向上边创建的普通队列推送消息。

3:消费者

消费者监听死信队列,上边我们创建的普通队列的消息过期时间是30秒,相当于我们向普通队列中推送消息之后,30秒过期则进入死信队列中,消费者监听死信队列,等待消息进入死信队列之后再进行消费处理。这样就模拟了一个延迟队列。

代码如下:

package com.modules.controller.rabbitmq;import com.rabbitmq.client.Channel;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class TTLConsumer
{@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 监听死信队列* @param message* @param channel* @throws InterruptedException* @throws IOException*/@RabbitListener(queues = "dead_queue")public void receiveMessage(Message message, Channel channel) throws InterruptedException, IOException{// 为了演示一个一个消费的情况,这里使用线程暂停来延迟控制台输出Thread.sleep(100);// ===============================// 处理消息,例如写入数据库或进行计算System.out.println("TTL Received message: " + new String(message.getBody()));//System.out.println("channel: " + channel);// =================================// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//System.out.println("deliveryTag:"+deliveryTag);channel.basicAck(deliveryTag, false);}
}

运行生产者,登录RabbitMQ控制台,如下图所示:

1.png

以上大概就是SpringBoot集成RabbitMQ实现延迟队列的全过程。

PS:redis也是可以通过zset来模拟实现延迟队列的,score存时间戳,每次取当前时间多少秒之前的数据即可。这里不做过多讨论。

有好的建议,请在下方输入你的评论。


http://www.ppmy.cn/devtools/137397.html

相关文章

Android8设置拔出充电器自动关机

通常Android机器拔出充电后&#xff0c;将进入断开充电流程&#xff0c;关闭充电灯和充电图标。 那么需要实现拔出充电器直接进入关机&#xff0c;则需要在充电判断机制中额外增加实现代码。 || || 修改方案如下&#xff1a; 在系统中存在服务时刻监听的充电状态&#xff…

24.11.23 Ajax

1动态网页技术与静态网页技术对比: 静态网页: 如果数据库中有用户列表 html中要显示 如果用户列表数据变化 html要改代码才能显示完整数据 (不能使用动态数据 ) 动态网页: servlet可以通过代码 以输出流显示数据 当数据库数据改变时 不需要改代码 2.为了解决html不能使用动…

Cocos编辑器

1、下载 下载地址&#xff1a;https://www.cocos.com/creator-download 2、编辑器界面介绍 官方链接&#xff1a;https://docs.cocos.com/creator/3.8/manual/zh/editor/ 3、项目结构 官方链接&#xff1a;https://docs.cocos.com/creator/3.8/manual/zh/getting-started/…

冷却小型电子设备

TLDR&#xff1a;间隙较小&#xff08;~1 毫米&#xff09;的设备可以进行基于传导的热模拟。 虚构的 ANSYS 腕带上的温度。 这些小玩意儿很酷&#xff0c;而且卖得很热。 小型设备的一个长期问题是它们容易发热&#xff0c;而且很难散热。大多数设备需要通过外表面散热。当你…

C# 程序来计算三角形的面积(Program to find area of a triangle)

给定一个三角形的边&#xff0c;任务是求出该三角形的面积。 例如&#xff1a; 输入&#xff1a;a 5, b 7, c 8 输出&#xff1a;三角形面积为 17.320508 输入&#xff1a;a 3, b 4, c 5 输出&#xff1a;三角形面积为 6.000000 方法&#xff1a;可以使用以下公式…

html+css+js打字游戏网页

1. 效果 2. html代码 <!doctype html> <html><head><meta charset"utf-8" /><title>打字练习</title><!--引入第三方动画库--><link rel"stylesheet" href"animate.css"><style>html {h…

网站布局编辑器前端开发:设计要点与关键考量

一、设计说明 &#xff08;一&#xff09;功能模块 可视化操作区域 这是用户进行网站布局设计的主要画布。通过拖放各种页面元素&#xff08;如文本框、图片、按钮、导航栏等&#xff09;到该区域&#xff0c;用户能够直观地构建网站页面的布局结构。支持对元素的实时缩放、旋…

【Linux】网络通信

TCP协议是一个安全的、面向连接的、流式传输协议&#xff0c;所谓的面向连接就是三次握手&#xff0c;对于程序猿来说只需要在客户端调用connect()函数&#xff0c;三次握手就自动进行了。先通过下图看一下TCP协议的格式&#xff0c;然后再介绍三次握手的具体流程。 TCP的三次握…