RabbitMQ学习—day6—springboot整合

server/2025/2/27 14:31:58/

目录

1. springboot配置

2. 开始写RabbitMq代码

 3. 队列优化

4. 插件实现延迟队列

5. 总结


前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

1. springboot配置

  1. 创建一个 Maven 工程或者 Spring Boot工程

  2. 添加依赖坐标,这里的 Spring Boot 是3.4.3 版本

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.51</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Knife4j API文档生产工具 --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.4.0</version></dependency><!-- swagger注解支持:Knife4j依赖本依赖 --><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>1.5.22</version></dependency></dependencies>

        3.创建 application.yml 文件

server:port: 8888
spring:rabbitmq:host: 你的服务器ipport: 5672username: adminpassword: 123456

这里是 8808 端口,可根据需求决定端口

        4. 配置swgger

@Configuration
public class Knife4jConfig {@Beanpublic OpenAPI springShopOpenAPI() {return new OpenAPI()// 接口文档标题.info(new Info().title("接口文档")// 接口文档简介.description("RabbitMq测试文档")// 接口文档版本.version("v1.0")// 开发者联系方式.contact(new Contact().name("luckily").email("3298244978@qq.com")));}}

        5. 新建主启动类

@Log4j2
@SpringBootApplication
public class RabbitMqDemoApplication {public static void main(String[] args) {SpringApplication app = new SpringApplication(RabbitMqDemoApplication.class);Environment env = app.run(args).getEnvironment();app.setBannerMode(Banner.Mode.CONSOLE);logApplicationStartup(env);}private static void logApplicationStartup(Environment env) {String protocol = "http";if (env.getProperty("server.ssl.key-store") != null) {protocol = "https";}String serverPort = env.getProperty("server.port");String contextPath = env.getProperty("server.servlet.context-path");if (StringUtils.isBlank(contextPath)) {contextPath = "/doc.html";} else {contextPath = contextPath + "/doc.html";}String hostAddress = "localhost";try {hostAddress = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {log.warn("The host name could not be determined, using `localhost` as fallback");}log.info("""----------------------------------------------------------\t应用程序“{}”正在运行中......\t接口文档访问 URL:\t本地: \t\t{}://localhost:{}{}\t外部: \t{}://{}:{}{}\t配置文件: \t{}----------------------------------------------------------""",env.getProperty("spring.application.name"),protocol,serverPort,contextPath,protocol,hostAddress,serverPort,contextPath,env.getActiveProfiles());}}

2. 开始写RabbitMq代码

代码架构

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wugMpbMe-1630999921193)(D:\学习资料\图片\image-20210902150742461.png)]

代码实现

配置类代码

/*
* TTL队列 配置文件类代码
*
* */
@Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String  X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码

 /** 发送延迟消息* */@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}
}

消费者

/** 队列TTL 消费者* */
@Slf4j
@Component
public class DeadLetterQueueConsumer {//接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

效果:

 3. 队列优化

问题:

第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求

代码架构图

增加一个队列QC实现动态延时

实现

配置文件类

 /** TTL队列 配置文件类代码** */@Configurationpublic class TtlQueueConfig {//普通交换机的名称public static final String  X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明QC队列@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable().withArguments(arguments).build();}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

生产者

/** 发送延迟消息* */
@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}//开始发消息@GetMapping("sendExpirationMsg/{message}/{ttlTime}")@Operation(summary = "Rabbitmq发送消息与延迟时间")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg->{//发送消息的时候 延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}

消费者

消费者代码不改变

效果:本来消息2是延迟2秒,消息1延迟20秒,消息2似乎要比消息1更早接收,但因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这是队列特性

怎么弥补这个缺陷,需要用到Rabbitmq的插件实现延迟队列

4. 插件实现延迟队列

我们之前延迟消息是在队列进行延迟,安装插件之后是在交换机进行延迟

  • 下载延迟插件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
  • 将延迟插件放到RabbitMQ的插件目录下:由于我通过docker容器安装的rabbitmq,所以我将安装包先通过xftp发送给主机,在通过docker命令给容器,进入容器安装

复制给容器

进入容器

启动插件

重启容器

docker restart 7af

网页端出现以下就表示成功安装插件

实战

配置文件类

 @Configurationpublic class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);};//声明交换机@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

 /** 发送延迟消息* */@Slf4j@RestController@RequestMapping("/ttl")public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息 基于插件的 消息 及 延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {// 发送消息的时候 延迟时长 单位msmsg.getMessageProperties().setDelay(delayTime);return msg;});}}

消费者

 // 消费者代码 基于插件的延迟消息@Slf4j@Componentpublic class DelayQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void recieveDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}

效果

成功!

5. 总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用
RabbitMQ.的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis.的zsset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景


http://www.ppmy.cn/server/171055.html

相关文章

SAP Webide系列(7)- 优化FreeStyle新建项目预设模板

目录 一、背景 二、优化目标 三、定位调整点 四、调整步骤 五、效果展示 六、附言 一、背景 在每次通过Webide进行FreeStyle方式自开发SAP UI5应用的时候&#xff0c;新建项目&#xff0c;得到的模板文件都是只有很少的内容&#xff08;没有路由配置、没有设置默认全屏等…

三七互娱游戏策划岗内推

【游戏策划】【美术设计】【市场推广】【游戏运营类】【技术开发】 1、协助完成战斗体验设计&#xff0c;包括动作、特效、镜头等&#xff1b; 2、负责战斗资源的需求文档撰写&#xff0c;对最终的战斗表现和打击感负责&#xff1b; 3、协助完成职业的设计与制作&#xff0c…

Windows AD组策略完整实战文档 | 企业级安全配置指南

&#x1f4c5; 更新日期&#xff1a;2025年2月26日‌ ‌&#x1f516; 适用版本&#xff1a;Windows Server 2025 | Windows 11 23H2‌ Windows AD组策略完整实战文档 | 企业级安全配置指南 &#x1f310; ‌一、AD组策略核心概念‌ 1.1 核心价值 markdown Copy Code &#x1…

ZT15 小红的区间查询

描述 小红拿到了一个仅由正整数组成的数组&#xff0c;她有以下两种操作&#xff1a; 1. 输入1 i x&#xff0c;代表将第i个正整数修改为x。 2. 输入2 i x&#xff0c;代表查询前i个正整数有多少个数等于x。 输入描述&#xff1a; 第一行输入两个正整数n和q&#xff0c;代表数…

MFC文件和注册表的操作

MFC文件和注册表的操作 日志、操作配置文件、ini、注册表、音视频的文件存储 Linux下一切皆文件 C/C操作文件 const char* 与 char* const const char* 常量指针&#xff0c;表示指向的内容为常量。指针可以指向其他变量&#xff0c;但是内容不能再变了 char szName[6]&qu…

火语言RPA--Excel添加Sheet页

【组件功能】&#xff1a;新增Excel工作区 配置预览 配置说明 Sheet页名称 支持T或# Excel工作簿名称。 示例 添加Excel工作簿 描述 添加一个新的Excel文档Sheet 新工作簿。 配置 第一步&#xff1a;打开或新建一个Excel文档&#xff0c;拖入 Excel添加Sheet页组件&am…

策略模式环境类的实现方式对比

文章目录 1、策略模式2、聚合策略类实现方式一3、聚合策略类实现方式二4、对比5、补充&#xff1a;ApplicationContextAware接口 1、策略模式 近期工作中&#xff0c;需要处理4.x和5.x两个版本的数据&#xff0c;所以自然想到的是策略模式&#xff0c;写一个抽象类&#xff0c…

PySpark实现Snowflake数据导出到Amazon Redshift

编写AWS EMR上的高性能PySpark代码&#xff0c;实现用SQL从Snowflake上下载数据到S3里的parquet文件&#xff0c;并导入Redshift表。 步骤一&#xff1a;配置EMR集群 首先确保您已经在AWS EMR上正确地设置了包含适当权限的角色和安全组的集群。该角色应允许访问Snowflake数据…