前边我们使用RabbitMQ实现了高并发下对流量的削峰填谷。正常在实际应用中大概也就够用了。
有的时候呢,我们需要使用到延迟队列,RabbitMQ不像RocketMQ一样默认就支持延迟队列,RabbitMQ是不支持延迟队列的,但是呢?我们可以通过正常的队列加上消息的过期时间,配置死信队列,来模拟实现延迟队列。
一:创建普通队列(配置过期时间),绑定死信队列
很简单就是创建两个普通队列,将一个普通队列A指定为另一普通队列B的死信队列。
且给普通队列B配置过期时间。
1:配置类具体代码如下:
具体代码如下:
package com.modules.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQConfig
{@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.listener.prefetch}")private int prefetch;@Value("${spring.rabbitmq.listener.concurrency}")private int concurrentConsumers;@Value("${spring.rabbitmq.listener.max-concurrency}")private int maxConcurrentConsumers;/*** 链接RabbitMQ* @return*/@Beanpublic ConnectionFactory connectionDirectFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true); //必须要设置return connectionFactory;}/*** 配置RabbitMQ参数* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitDirectListenerContainerFactory(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionDirectFactory());//设置最小并发的消费者数量factory.setConcurrentConsumers(concurrentConsumers);//设置最大并发的消费者数量factory.setMaxConcurrentConsumers(maxConcurrentConsumers);//限流,单位时间内消费多少条记录factory.setPrefetchCount(prefetch);// json转消息//factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置rabbit 确认消息的模式,默认是自动确认//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置rabbit 确认消息的模式,默认是自动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 回调函数* @param connectionFactory* @return*/@Beanpublic RabbitTemplate createDirectRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Manatory,才能触发回调函数,无论消息推送结果怎么样都会强制调用回调函数rabbitTemplate.setMandatory(true);// 设置确认发送到交换机的回调函数 =》 消息推送到server,但是在server里找不到交换机 / 消息推送到sever,交换机和队列啥都没找到 / 消息推送到server,找到交换机了,但是没找到队列 / 消息推送成功rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(ack){System.out.println("发送者消息确认成功!");}else{System.out.println("发送者消息确认失败,考虑重发:"+cause);}//System.out.println("相关数据:"+correlationData);//System.out.println("确认情况:"+ack);//System.out.println("原因:"+cause);//System.out.println("===============================");});//设置确认消息已发送到队列的回调 =》 消息推送到server,找到交换机了,但是没找到队列 触发这个回调函数rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("交换机为:"+returnedMessage.getExchange());System.out.println("返回消息为:"+returnedMessage.getMessage());System.out.println("路由键为:"+returnedMessage.getRoutingKey());System.out.println("回应消息为:"+returnedMessage.getReplyText());System.out.println("回应代码为:"+returnedMessage.getReplyCode());System.out.println("===============================");});return rabbitTemplate;}//正常队列// 交换机public static final String NORMAL_EXCHANGE = "normal_exchange";// 队列名称public static final String NORMAL_QUEUE = "normal_queue";// 路由键public static final String NORMAL_ROUTE = "normal_route";//死信队列// 交换机public static final String DEAD_EXCHANGE = "dead_exchange";// 队列名称public static final String DEAD_QUEUE = "dead_queue";// 路由键public static final String DEAD_ROUTE = "dead_route";/*** 死信交换机* @return*/@Beanpublic Exchange deadExchange(){// 创建死信队列交换机return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();}/*** 死信队列* @return*/@Beanpublic Queue deadQueue(){// 创建死信队列return QueueBuilder.durable(DEAD_QUEUE).build();}/*** 死信交换机绑定死信队列* @param deadExchange* @param deadQueue* @return*/@Beanpublic Binding deadBinding(Exchange deadExchange,Queue deadQueue){// 死信交换机绑定死信队列return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTE).noargs();}/*** 设置正常队列(过期时间)*//*@Beanpublic Queue TTLQUEUE(){Map<String, Object> map = new HashMap<>();map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期return new Queue(NORMAL_QUEUE, true, false, false, map);}//*//*** 绑定死信交换机及路由key(该正常队列内的消息无法被正常消费时,会转发给绑定的死信交换机通过路由key转发到死信队列)*/@Beanpublic Queue normalQueue(){// 创建队列return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)// 绑定死信队列交换机.deadLetterRoutingKey(DEAD_ROUTE)// 绑定死信队列路由.ttl(30000)// 设置消息过期时间.build();}/*** 正常队列交换机*/@Beanpublic DirectExchange normalExchange(){// return new DirectExchange(NORMAL_EXCHANGE);// 创建交换机return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();}/*** 正常交换机绑定正常队列*/@Beanpublic Binding binding(Queue normalQueue,Exchange normalExchange){// 将 队列 交换机 路由key绑定到一起。return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTE).noargs();}//*/
}
2:生产者代码如下:
package com.modules.controller.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TTLController
{@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/java/ttlproduct")public String sendTrafficMessage(@RequestParam String message){for (int i = 1; i <= 100; i++){// 使用java多线程来模拟多用户并发请求final int temp = i;new Thread(()->{// 给RabbitMQ发送消息rabbitTemplate.convertAndSend("normal_exchange","normal_route","hello world:"+temp,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throwsAmqpException{// System.out.println("发送回调:"+temp);System.out.println(message);return message;}});}).start();}return "Message sent";}
}
这里向上边创建的普通队列推送消息。
3:消费者
消费者监听死信队列,上边我们创建的普通队列的消息过期时间是30秒,相当于我们向普通队列中推送消息之后,30秒过期则进入死信队列中,消费者监听死信队列,等待消息进入死信队列之后再进行消费处理。这样就模拟了一个延迟队列。
代码如下:
package com.modules.controller.rabbitmq;import com.rabbitmq.client.Channel;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class TTLConsumer
{@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 监听死信队列* @param message* @param channel* @throws InterruptedException* @throws IOException*/@RabbitListener(queues = "dead_queue")public void receiveMessage(Message message, Channel channel) throws InterruptedException, IOException{// 为了演示一个一个消费的情况,这里使用线程暂停来延迟控制台输出Thread.sleep(100);// ===============================// 处理消息,例如写入数据库或进行计算System.out.println("TTL Received message: " + new String(message.getBody()));//System.out.println("channel: " + channel);// =================================// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//System.out.println("deliveryTag:"+deliveryTag);channel.basicAck(deliveryTag, false);}
}
运行生产者,登录RabbitMQ控制台,如下图所示:
以上大概就是SpringBoot集成RabbitMQ实现延迟队列的全过程。
PS:redis也是可以通过zset来模拟实现延迟队列的,score存时间戳,每次取当前时间多少秒之前的数据即可。这里不做过多讨论。
有好的建议,请在下方输入你的评论。