《微服务实战》 第十五章 RabbitMQ 延迟队列

news/2024/11/19 20:36:05/

前言

实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

1、RabbitMQ延迟队列

1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

  • TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
  • Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
  • Dead Letter Exchanges(DLX),即死信交换机
  • Dead Letter Routing Key (DLK),死信路由键
/***********************延迟队列*************************/
//创建立即消费队列
@Bean
public Queue immediateQueue(){return new Queue("immediateQueue");
}
//创建立即消费交换机
@Bean
public DirectExchange immediateExchange(){return new DirectExchange("immediateExchange");
}
@Bean
public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");
}//创建延迟队列
@Bean
public Queue delayQueue(){Map<String,Object> params = new HashMap<>();//死信队列转发的死信转发到立即处理信息的交换机params.put("x-dead-letter-exchange","immediateExchange");//死信转化携带的routing-keyparams.put("x-dead-letter-routing-key","immediateRoutingKey");//设置消息过期时间,单位:毫秒params.put("x-message-ttl",60 * 1000);return new Queue("delayQueue",true,false,false,params);
}@Bean
public DirectExchange delayExchange(){return new DirectExchange("delayExchange");
}@Bean
public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");
}
@Test
public void sendDelay(){this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");
}

1.2、方式二:安装延迟队列插件

1.2.1、安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

/**************延迟队列一个单一queue*******************/
@Bean
public Queue delayNewQueue(){return new Queue("delayNewQueue");
}
@Bean
public CustomExchange delayNewExchange(){Map<String, Object> args = new HashMap<>();// 设置类型,可以为fanout、direct、topicargs.put("x-delayed-type", "direct");return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args);
}
@Bean
public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();
}
@Test
public void sendDelay() {//生产端写完了UserInfo userInfo = new UserInfo();userInfo.setPassword("13432432");userInfo.setUserAccount("tiger");this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo, a -> {//单位毫秒a.getMessageProperties().setDelay(30000);return a;});
}

2、消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。

生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

2.1、生产确认

@Bean
public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){RabbitTemplate template = new RabbitTemplate(connectionFactory);//消息发送到交换器Exchange后触发回调template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//  可以进行消息入库操作log.info("消息唯一标识 correlationData = {}", correlationData);log.info("确认结果 ack = {}", ack);log.info("失败原因 cause = {}", cause);}});// 配置这个,下面的ReturnCallback 才会起作用template.setMandatory(true);// 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {//  可以进行消息入库操作log.info("消息主体 message = {}", returnedMessage.getMessage());log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());log.info("回复描述 replyText = {}", returnedMessage.getReplyText());log.info("交换机名字 exchange = {}", returnedMessage.getExchange());log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());}});return template;
}
spring:cloud:nacos:discovery:server-addr: localhost:8848application:name: drp-user-service  #微服务名称datasource:username: rootpassword: rooturl: jdbc:mysql://127.0.0.1:3306/drpdriver-class-name: com.mysql.cj.jdbc.Driverrabbitmq:host: 127.0.0.1port: 5672username: tigerpassword: tigervirtual-host: tiger_vh# 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 确认消息已发送到队列publisher-returns: truelistener:simple:acknowledge-mode: manual # 开启消息消费手动确认retry:enabled: true

2.2、消费确认

@RabbitHandler
public void process(UserInfo data, Message message, Channel channel){log.info("收到directQueue队列信息:" + data);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//成功消费确认channel.basicAck(deliveryTag,true);log.info("消费成功确认完毕。。。。。");} catch (IOException e) {log.error("确认消息时抛出异常 ", e);        // 重新确认,成功确认消息try {Thread.sleep(50);channel.basicAck(deliveryTag, true);} catch (IOException | InterruptedException e1) {log.error("确认消息时抛出异常 ", e);// 可以考虑入库}}catch (Exception e){log.error("业务处理失败", e);try {// 失败确认channel.basicNack(deliveryTag, false, false);} catch (IOException e1) {log.error("消息失败确认失败", e1);}}
}

http://www.ppmy.cn/news/89154.html

相关文章

Maven 概述及下载安装

一、为什么要学习 Maven 我们构建一个项目需要用到很多第三方的类库&#xff0c;就需要引入大量的jar包&#xff0c;并且Jar包之间的关系错综复杂&#xff0c;缺少任何一个Jar包都会导致项目编译失败。Maven 能帮助我们下载及管理依赖。 本地项目代码开发完成后&#xff0c;我…

CMake初学笔记(一)

CMake初学笔记&#xff08;一&#xff09; CMake是什么CMake怎么实现跨平台CMake具体实践过程CMakeLists.txt编写快速入门常见函数例子 CMake是什么 跨平台编译工具&#xff0c;为了实现“write once, run everywhere”。 CMake怎么实现跨平台 开发者编写与平台无关的编译过…

【小沐学NLP】Python实现聊天机器人(OpenAI,模型概述笔记)

&#x1f37a;NLP开发系列相关文章编写如下&#x1f37a;&#xff1a;1&#x1f388;【小沐学NLP】Python实现词云图&#x1f388;2&#x1f388;【小沐学NLP】Python实现图片文字识别&#x1f388;3&#x1f388;【小沐学NLP】Python实现中文、英文分词&#x1f388;4&#x1…

Seata AT模式源码解析三(AT模式工作机制)

文章目录 代码示例流程源码解析开启全局事务注册分支事务一阶段提交全局事务提交分支事务二阶段提交全局事务回滚分支事务二阶段回滚 代码示例 从一个微服务示例开始&#xff0c;案例采用Seata官方提供的Demo。 用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持&…

Bits, Bytes and Integers——二进制unsigned以及Two-complement表示,十六进制

这篇文章梳理一下Bits, Bytes and Integers——二进制unsigned以及Two-complement表示&#xff0c;十六进制这些事儿。 计算机中所有数据都是用二进制的0和1组成的&#xff0c;直接上知识点。 二进制 Unsigned以及Two-complement 同样的一串二进制数&#xff0c;按照有符号…

0基础学习VR全景平台篇第31章:场景功能-嵌入图片

大家好&#xff0c;欢迎收看蛙色平台免费教程&#xff01; 功能位置示意 一、本功能将用在哪里&#xff1f; 嵌入功能可对VR全景作品嵌入【图片】【视频】【文字】【标尺】四种不同类型内容&#xff0c;本次主要带来图片类型的介绍&#xff0c;满足场景营销、重点标注、幻灯片…

docker移动默认地址

停止 Docker 服务&#xff1a;首先&#xff0c;停止正在运行的 Docker 服务。在 Linux 系统上&#xff0c;可以使用以下命令停止 Docker 服务&#xff1a; sudo systemctl stop docker 在 Windows 上&#xff0c;可以通过 Docker Desktop 的系统托盘图标或者任务管理器停止 Do…

HTTP/HTTPS协议详解

目录 一. HTTP详解 ✅1.1 概念 ✅1.2 HTTP的协议格式 1.2.1 HTTP请求体格式&#xff1a; 1.2.2 HTTP响应体格式&#xff1a; ✅1.3 HTTP请求方法 ✅1.4 认识请求报头 ✅1.5 HTTP请求过程 ✅1.6 认识状态码 二. HTTPS详解 ✅2.1 HTTPS简介 ✅2.2 HTTPS加密过程 TCP/UDP是位于传…