七、延时交换机与延时队列
1、延时问题
(1)、问题引入
场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了
(2)、解决方式
-
定时任务方式
每隔3秒扫描一次数据库,查询过期的订单然后进行处理;
优点:
- 简单,容易实现;
缺点:
- 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
- 性能较差,每次扫描数据库,如果订单量很大会影响性能
-
被动取消
当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);
优点:
- 对服务器而言,压力小;
缺点:
- 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
- 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
-
JDK延迟队列(单体应用,不能分布式下)
DelayedQueue
无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素
优点:
- 实现简单,任务延迟低;
缺点:
- 服务重启、宕机,数据丢失;
- 只适合单机版,不适合集群;
- 订单量大,可能内存不足而发生异常;
-
采用消息中间件(rabbitmq)
RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样
2、延时交换机
7.2.1、死信队列实现
(1)、实现方式
给正常队列绑定一个死信交换机和设置死信路由key
给正常队列设置消息过期时间,过期时间用于模拟延时操作
当消息过期后没有被消费就会转到死信队列
测试模块rabbitmq-07-delay-01
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ属性
server:port: 8080spring:application:name: delay-learn1rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机(public static final String EXCHANGE_NAME = "exchange.delay.normal.1";// 死信交换机public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.1";// 正常队列public static final String QUEUE_NAME = "queue.delay.normal.1";// 死信队列public static final String QUEUE_DLX_NAME = "queue.delay.dlx.1";// 正常路由keypublic static final String ROUTING_NAME = "order1";// 死信路由keypublic static final String ROUTING_DLX_NAME = "error1";}
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}
}
定义消息队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME).ttl(25000) //队列的过期时间.withArguments(arguments) // 设置对列的参数.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);}
}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay01Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay01Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
(2)、存在的问题
如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题
- 如果队头的消息过期时间长,后面的消息过期时间端,但是因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中
测试模块rabbitmq-07-delay-02
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ属性
server:port: 8080spring:application:name: delay-learn2rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机(public static final String EXCHANGE_NAME = "exchange.delay.normal.2";// 死信交换机public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.2";// 正常队列public static final String QUEUE_NAME = "queue.delay.normal.2";// 死信队列public static final String QUEUE_DLX_NAME = "queue.delay.dlx.2";// 正常路由keypublic static final String ROUTING_NAME = "order2";// 死信路由keypublic static final String ROUTING_DLX_NAME = "error2";}
生产者
发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.MessageProperties;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("25000"); //第一条消息Message message = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000"); //第二条消息Message message = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
消费者
消费者监听死信队列的消息来查看消息接收的时间
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessageService {/*** 延迟队列一定要接收死信队列的消息*/@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)public void receiveMsg(Message message) {String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}", body, new Date());}
}
定义消息队列
注意:正常队列不要设置过期时间
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)//.ttl(5000) //队列的过期时间.withArguments(arguments) // 设置对列的参数.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);}
}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay02Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay02Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
(3)、多队列解决过期时间问题
对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上
测试模块rabbitmq-07-delay-03
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ属性
server:port: 8080spring:application:name: delay-learn3rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.delay.normal.3";// 死信交换机public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.3";// 正常订单队列public static final String QUEUE_ORDER_NAME = "queue.delay.normal.order.3";// 正常支付队列public static final String QUEUE_PAY_NAME = "queue.delay.normal.pay.3";// 死信队列public static final String QUEUE_DLX_NAME = "queue.delay.dlx.3";// 订单路由keypublic static final String ROUTING_ORDER_NAME = "order3";// 支付路由keypublic static final String ROUTING_PAY_NAME = "pay3";// 死信路由keypublic static final String ROUTING_DLX_NAME = "error3";
}
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;import org.springframework.amqp.core.MessageProperties;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("10000"); //第一条消息Message message = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_ORDER_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("5000"); //第二条消息Message message = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_PAY_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
消费者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessageService {/*** 延迟队列一定要接收死信队列的消息*/@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)public void receiveMsg(Message message) {String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}", body, new Date());}
}
定义消息队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 订单队列** @return*/@Beanpublic Queue normalOrderQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_ORDER_NAME).withArguments(arguments) // 设置对列的参数.build();}/*** 支付队列** @return*/@Beanpublic Queue normalPayQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_PAY_NAME).withArguments(arguments) // 设置对列的参数.build();}/*** 正常交换机和订单队列绑定** @param normalExchange* @param normalOrderQueue* @return*/@Beanpublic Binding bindingOrderNormal(DirectExchange normalExchange, Queue normalOrderQueue) {return BindingBuilder.bind(normalOrderQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_ORDER_NAME);}/*** 正常交换机和支付队列绑定** @param normalExchange* @param normalPayQueue* @return*/@Beanpublic Binding bindingPayNormal(DirectExchange normalExchange, Queue normalPayQueue) {return BindingBuilder.bind(normalPayQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_PAY_NAME);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);}
}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay03Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay03Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
7.2.2、使用延时插件
(1)、安装插件
第一步:下载
选择对应的版本下载 rabbitmq-delayed-message-exchange 插件
下载地址:http://www.rabbitmq.com/community-plugins.html
第二步:插件拷贝到 RabbitMQ 服务器plugins目录下
第三步:解压缩
cd /usr/local/rabbitmq_server-4.0.7/plugins
unzip rabbitmq_delayed_message_exchange-v4.0.7.ez
如果unzip 没有安装,先安装一下
yum install unzip -y
第四步:启用插件
cd /usr/local/rabbitmq_server-4.0.7/sbin/
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
第五步:查询安装情况
查询安装的所有插件
./rabbitmq-plugins list
重启rabbitmq使其生效(此处也可以不重启)
(2)、实现原理
消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题
(3)、实现延时队列
消息只要发送到延时交换机即可,延时交换机绑定死信路由的key
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ属性
server:port: 8080spring:application:name: delay-learn4rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机(死信交换机)public static final String EXCHANGE_NAME = "exchange.delay.4";public static final String QUEUE_NAME = "queue.delay.4";public static final String ROUTING_NAME = "plugin4";}
生产者
生产者发送消息时要在headers中添加过期时间
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 25000);//第一条消息 延迟时间//messageProperties.setExpiration("25000"); //不要用这个Message message = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 15000);//第二条消息 延迟时间//messageProperties.setExpiration("15000"); //不要用这个Message message = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
消费者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessageService {/*** 延迟队列一定要接收死信队列的消息*/@RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)public void receiveMsg(Message message) {String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}", body, new Date());}
}
定义消息队列
注意延时交换机需要使用自定义类型定义
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 创建自定义交换机** @return*/@Beanpublic CustomExchange customExchange() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct"); //放一个参数//CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new CustomExchange(RabbitMQConstant.EXCHANGE_NAME, "x-delayed-message", true, false, arguments);}@Beanpublic Queue queue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME) //队列名称.build();}@Beanpublic Binding binding(CustomExchange customExchange, Queue queue) {//绑定也指定路由key,加noargs 方法return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();}}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay04Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
.QUEUE_NAME) //队列名称
.build();
}
@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {//绑定也指定路由key,加noargs 方法return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
}
}
**发送消息**```java
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay04Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}