7.RabbitMQ延时交换机

server/2025/3/7 1:44:13/

七、延时交换机与延时队列

1、延时问题

(1)、问题引入

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了

(2)、解决方式

  • 定时任务方式

    每隔3秒扫描一次数据库,查询过期的订单然后进行处理;

    优点:

    • 简单,容易实现;

    缺点:

    • 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
    • 性能较差,每次扫描数据库,如果订单量很大会影响性能
  • 被动取消

    当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);

    优点:

    • 对服务器而言,压力小;

    缺点:

    • 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
    • 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
  • JDK延迟队列(单体应用,不能分布式下)

    DelayedQueue

    无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素

    优点:

    • 实现简单,任务延迟低;

    缺点:

    • 服务重启、宕机,数据丢失;
    • 只适合单机版,不适合集群;
    • 订单量大,可能内存不足而发生异常;
  • 采用消息中间件(rabbitmq)

    RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

    不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样

    在这里插入图片描述

2、延时交换机

7.2.1、死信队列实现

(1)、实现方式

给正常队列绑定一个死信交换机和设置死信路由key

给正常队列设置消息过期时间,过期时间用于模拟延时操作

当消息过期后没有被消费就会转到死信队列

测试模块rabbitmq-07-delay-01

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ属性

server:port: 8080spring:application:name: delay-learn1rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机(public static final String EXCHANGE_NAME = "exchange.delay.normal.1";// 死信交换机public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.1";// 正常队列public static final String QUEUE_NAME = "queue.delay.normal.1";// 死信队列public static final String QUEUE_DLX_NAME = "queue.delay.dlx.1";// 正常路由keypublic static final String ROUTING_NAME = "order1";// 死信路由keypublic static final String ROUTING_DLX_NAME = "error1";}

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}
}

定义消息队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME).ttl(25000) //队列的过期时间.withArguments(arguments) // 设置对列的参数.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);}
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay01Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay01Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述

(2)、存在的问题

如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题

  • 如果队头的消息过期时间长,后面的消息过期时间端,但是因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中

测试模块rabbitmq-07-delay-02

引入依赖

	<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ属性

server:port: 8080spring:application:name: delay-learn2rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机(public static final String EXCHANGE_NAME = "exchange.delay.normal.2";// 死信交换机public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.2";// 正常队列public static final String QUEUE_NAME = "queue.delay.normal.2";// 死信队列public static final String QUEUE_DLX_NAME = "queue.delay.dlx.2";// 正常路由keypublic static final String ROUTING_NAME = "order2";// 死信路由keypublic static final String ROUTING_DLX_NAME = "error2";}

生产者

发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.MessageProperties;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("25000"); //第一条消息Message message = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000"); //第二条消息Message message = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

消费者

消费者监听死信队列的消息来查看消息接收的时间

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessageService {/*** 延迟队列一定要接收死信队列的消息*/@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)public void receiveMsg(Message message) {String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}", body, new Date());}
}

定义消息队列

注意:正常队列不要设置过期时间

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)//.ttl(5000) //队列的过期时间.withArguments(arguments) // 设置对列的参数.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);}
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay02Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay02Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述

(3)、多队列解决过期时间问题

对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上

测试模块rabbitmq-07-delay-03

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ属性

server:port: 8080spring:application:name: delay-learn3rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.delay.normal.3";// 死信交换机public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.3";// 正常订单队列public static final String QUEUE_ORDER_NAME = "queue.delay.normal.order.3";// 正常支付队列public static final String QUEUE_PAY_NAME = "queue.delay.normal.pay.3";// 死信队列public static final String QUEUE_DLX_NAME = "queue.delay.dlx.3";// 订单路由keypublic static final String ROUTING_ORDER_NAME = "order3";// 支付路由keypublic static final String ROUTING_PAY_NAME = "pay3";// 死信路由keypublic static final String ROUTING_DLX_NAME = "error3";
}

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;import org.springframework.amqp.core.MessageProperties;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("10000"); //第一条消息Message message = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_ORDER_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("5000"); //第二条消息Message message = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_PAY_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

消费者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessageService {/*** 延迟队列一定要接收死信队列的消息*/@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)public void receiveMsg(Message message) {String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}", body, new Date());}
}

定义消息队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 订单队列** @return*/@Beanpublic Queue normalOrderQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_ORDER_NAME).withArguments(arguments) // 设置对列的参数.build();}/*** 支付队列** @return*/@Beanpublic Queue normalPayQueue() {Map<String, Object> arguments = new HashMap<>();// 重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);return QueueBuilder.durable(RabbitMQConstant.QUEUE_PAY_NAME).withArguments(arguments) // 设置对列的参数.build();}/*** 正常交换机和订单队列绑定** @param normalExchange* @param normalOrderQueue* @return*/@Beanpublic Binding bindingOrderNormal(DirectExchange normalExchange, Queue normalOrderQueue) {return BindingBuilder.bind(normalOrderQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_ORDER_NAME);}/*** 正常交换机和支付队列绑定** @param normalExchange* @param normalPayQueue* @return*/@Beanpublic Binding bindingPayNormal(DirectExchange normalExchange, Queue normalPayQueue) {return BindingBuilder.bind(normalPayQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_PAY_NAME);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);}
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay03Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay03Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述

7.2.2、使用延时插件

(1)、安装插件

第一步:下载

选择对应的版本下载 rabbitmq-delayed-message-exchange 插件

下载地址:http://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

第二步:插件拷贝到 RabbitMQ 服务器plugins目录下

在这里插入图片描述

第三步:解压缩

cd /usr/local/rabbitmq_server-4.0.7/plugins
unzip rabbitmq_delayed_message_exchange-v4.0.7.ez

在这里插入图片描述

如果unzip 没有安装,先安装一下

yum install unzip -y

第四步:启用插件

cd /usr/local/rabbitmq_server-4.0.7/sbin/
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

在这里插入图片描述

第五步:查询安装情况

查询安装的所有插件

./rabbitmq-plugins list 

重启rabbitmq使其生效(此处也可以不重启)

在这里插入图片描述

(2)、实现原理

消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中

延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;

在这里插入图片描述

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现

解决了消息过期时间不一致出现的问题

(3)、实现延时队列

消息只要发送到延时交换机即可,延时交换机绑定死信路由的key

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ属性

server:port: 8080spring:application:name: delay-learn4rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机(死信交换机)public static final String EXCHANGE_NAME = "exchange.delay.4";public static final String QUEUE_NAME = "queue.delay.4";public static final String ROUTING_NAME = "plugin4";}

生产者

生产者发送消息时要在headers中添加过期时间

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 25000);//第一条消息 延迟时间//messageProperties.setExpiration("25000"); //不要用这个Message message = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 15000);//第二条消息 延迟时间//messageProperties.setExpiration("15000"); //不要用这个Message message = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

消费者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessageService {/*** 延迟队列一定要接收死信队列的消息*/@RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)public void receiveMsg(Message message) {String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}", body, new Date());}
}

定义消息队列

注意延时交换机需要使用自定义类型定义

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 创建自定义交换机** @return*/@Beanpublic CustomExchange customExchange() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct"); //放一个参数//CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new CustomExchange(RabbitMQConstant.EXCHANGE_NAME, "x-delayed-message", true, false, arguments);}@Beanpublic Queue queue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME) //队列名称.build();}@Beanpublic Binding binding(CustomExchange customExchange, Queue queue) {//绑定也指定路由key,加noargs 方法return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();}}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay04Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

.QUEUE_NAME) //队列名称
.build();
}

@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {//绑定也指定路由key,加noargs 方法return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
}

}


**发送消息**```java
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq07Delay04Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

http://www.ppmy.cn/server/173022.html

相关文章

Genesis:用于机器人及其他领域的生成式通用物理引擎

Genesis 是一个综合物理模拟平台&#xff0c;由清华、北京大学、哈佛、英伟达等众多顶尖机构合力研发&#xff0c;专为通用机器人、具身人工智能和物理人工智能应用而设计。它同时具有多种功能&#xff1a; 从头开始重新构建的通用物理引擎&#xff0c;能够模拟各种材料和物理…

js操作字符串的常用方法

1. 查找和截取​​​​​​​ 1.1 indexOf 作用&#xff1a;查找子字符串在字符串中首次出现的位置。 是否改变原字符串&#xff1a;不会改变原字符串。 返回值&#xff1a;如果找到子字符串&#xff0c;返回其起始索引&#xff08;从 0 开始&#xff09;&#xff1b;如果未…

使用Python SciPy库来计算矩阵的RCS特征值并生成极坐标图

在Python中&#xff0c;计算矩阵的RCS&#xff08;Rayleigh商迭代法&#xff09;特征值通常涉及使用数值线性代数库&#xff0c;如NumPy或SciPy。RCS&#xff08;Rayleigh商迭代法&#xff09;是一种用于计算矩阵特征值和特征向量的迭代方法。 以下是一个简单的示例&#xff0…

logback日志输出配置范例

logback日志输出配置范例 在wutool中&#xff0c;提供了logback日志输出配置范例&#xff0c;实现日志文件大小限制、滚动覆盖策略、定时清理等功能。 关于wutool wutool是一个java代码片段收集库&#xff0c;针对特定场景提供轻量解决方案&#xff0c;只要按需选择代码片段…

敏捷开发之分支管理策略

分支管理策略在软件开发中是一种重要的管理手段,用于支持并行开发和测试,以下是对其的理解及举例说明: 基本理解 概念:分支管理策略是指在版本控制系统中,创建和管理不同的代码分支,以实现不同的开发目标、阶段或任务的隔离和并行处理。通过合理的分支管理,可以让开发团…

IDEA 使用codeGPT+deepseek

一、环境准备 1、IDEA 版本要求 安装之前确保 IDEA 处于 2023.x 及以上的较新版本。 2、Python 环境 安装 Python 3.8 或更高版本 为了确保 DeepSeek 助手能够顺利运行&#xff0c;您需要在操作系统中预先配置 Python 环境。具体来说&#xff0c;您需要安装 Python 3.8 或更高…

期权帮|股指期货入门知识:什么是股指期货基差?什么是股指期货价差?

锦鲤三三每日分享期权知识&#xff0c;帮助期权新手及时有效地掌握即市趋势与新资讯&#xff01; 股指期货入门知识&#xff1a;什么是股指期货基差&#xff1f;什么是股指期货价差&#xff1f; 股指期货的基差与价差是两个重要的价格关系指标&#xff0c;它们反映了现货市场…

实时语义分割之FarSeeNet(2020)模型解析代码复现及滑坡检测实战

论文《FarSee-Net: Real-Time Semantic Segmentation by Efficient Multi-scale Context Aggregation and Feature Space Super-resolution》原理解析及代码复现。 用自己的数据训练yolov11目标检测 实时语义分割之BiSeNetv2&#xff08;2020&#xff09;结构原理解析及建筑物提…