RabbitMQ高可用延迟消息惰性队列

ops/2024/11/24 4:36:09/

目录

生产者确认

消息持久化

消费者确认

TTL延迟队列

TTL延迟消息

惰性队列


生产者确认

生产者确认就是:发送消息的人,要确保消息发送给了消息队列,分别是确保到了交换机,确保到了消息队列这两步。

1、在发送消息服务的application.yml中添加配置

spring:rabbitmq:publisher-confirm-type: correlated  # 异步回调publisher-returns: truetemplate:mandatory: true

2、确保消息到交换机

java">package cn.zsh.mq.spring;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.UUID;@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testConfirmCallBack() {// 1、定义消息String message = "ABC";// 设置一个消息的唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3、confirm-ackcorrelationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("消息发送异常:" + ex.toString());}@Overridepublic void onSuccess(CorrelationData.Confirm result) {if (result.isAck()) {// 说明到了交换机System.out.println("publish-confirm:ack==消息发送成功:" + correlationData.getId());} else {// 消息没有到交换机System.out.println("publish-confirm:nack==消息发送失败:" + correlationData.getId());}}});// 4、消息发送rabbitTemplate.convertAndSend("191exchange","191",message,correlationData);}}

3、确保消息从交换机路由到队列

创建公开CommonConfig类

java">package cn.zsh.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 发送消息到交换机没有到消息队列*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 1、获取RabbitTemplate(获取启动中的Bean的方式)RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 2、设置回调函数rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("发送消息失败,没到队列===消息:{}, 交换机:{}, 路由Key:{}, 响应CODE:{}, 相应内容:{}", message,exchange,routingKey,replyCode,replyText);}});}
}

消息持久化

消息持久化就是:确保消息不会在交换机或者队列中丢失。

案例:

使用SpringAMQP创建出来的交换机和队列,默认就是做了持久化的

java">package cn.zsh.mq.config;import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** 创建交换机与队列*/
@Component
public class FoundQueue {@Beanpublic DirectExchange qiuExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("qiu.deirect",true,false);}@Beanpublic Queue piqiuQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("piqiu.queue").build();}
}

消费者确认

消费者确认就是:消费者把消息从队列中获取出来,然后要消费成功,队列中的消息才能被删除掉。

方案一:消费者确认

加入这个配置以后,消费者消费失败,会直接重试或者删除,具体取决于设置的是none还是auto。

默认是none,不建议设置为auto模式因为会一直不断地尝试,这样会导致服务器压力很大。

java">spring:rabbitmq:listener:simple:acknowledge-mode: auto # none:投递完立马删除  auto:失败后让你再次重试(重新投递到队列)知道成功

方案二:消费者失败重试,重试固定次数后,则删除当前消息

加入这个配置以后,消费者消费失败会重试固定的次数,然后将消息删除。

java">spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000  # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-intervalmax-attempts: 3 # 最大重试次数stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

方案三:消费者失败重试,重试固定次数后,将当前消息发送给error交换机路由给error队列

加入这个配置之后,重试固定次数后,会将这条消费失败的消息发送给error交换机,路由给error队列。

1、在消费者(消息接收者)中加入配置

java">spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000  # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-intervalmax-attempts: 3 # 最大重试次数stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

2、创建error交换机和队列并绑定

java">package cn.zsh.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class ErrorConfig {/*** 定义交换机* @return*/@Beanpublic DirectExchange errorExchange2(){return new DirectExchange("error.direct");}/*** 定义队列* @return*/@Beanpublic Queue errorQueue2(){return new Queue("error.queue");}@Beanpublic Binding bindErrorQueue(DirectExchange errorExchange2,Queue errorQueue2){return BindingBuilder.bind(errorQueue2).to(errorExchange2).with("error");}
}

3、在启动类或者配置类中加入配置

java">package cn.zsh.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");}
}

TTL延迟队列

延迟队列:

延迟队列就是消息发送到当前队列,会延迟一段时间,然后进行处理,具体如下图:

发送消息给指定队列,然后消息回延迟固定的时间,这个延迟时间是在对应的延迟消息队列中设置的。经过延迟以后,会将消息发送给其他的交换机,然后再路由给对应的消息队列,再进行消费,实现延迟的效果。

使用案例:

1、创建处理延迟消息的队列和交换机

当前交换机名称为:dl.direct

当前消息队列名称为:dl.queue

RoutingKey是:dl

java">package cn.zsh.mq.config;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 创建,处理延迟消息的,交换机和队列*/
@Component
public class TtlConfig {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue",durable = "true"),    // 处理延迟消息的队列名称exchange = @Exchange(name = "dl.direct",durable = "true"),    // 处理延迟消息的交换机名称key = "dl"    // 当前的RoutingKey))public void consumerDdlMessage(String message){System.out.println("接收到延迟消息:" + message);}
}

2、创建延迟交换机、队列,并绑定

java">package cn.zsh.mq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class publisherTtlConfig {@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl.direct");    // 延迟交换机名称}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue")    // 延迟队列名称.ttl(10000) // 延迟队列的延迟时间(当前为10秒).deadLetterExchange("dl.direct") // 设置延时时间到了以后发送到哪个交换机(处理延迟消息的交换机).deadLetterRoutingKey("dl") // 设置具体到那个交换机的具体队列(处理延迟消息队列的RoutingKey).build();}/*** 绑定延迟队列与交换机* @return*/@Beanpublic Binding bingTtlQueue(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");    // 将延迟队列与交换机绑定,并设置RoutingKey(这个RoutingKey是当前延迟消息队列的RoutingKey)}
}

3、发送任意消息到当前延迟队列(ttl.queue)即可实现延迟效果。

延时时间到了以后,会将消息发送给(dl.direct)交换机,路由给RoutingKey为(dl)的消息队列dl.queue。有绑定了dl.queue的队列进行消息的最终处理。

TTL延迟消息

延迟消息:

延迟消息是给消息设置延迟时间,然后将消息发送给延迟队列,可以实现延迟。

注意!!!延迟消息的延迟时间,与延迟队列的延迟时间,哪个时间短,就使用哪个延迟时间。

例1:延迟消息设置延迟时间为5秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟5秒然后就会被处理。

例2:延迟消息设置延迟时间为20秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟10秒然后就会被处理。

使用案例:

延迟消息必须发送给延迟队列,因为延迟时间按最短的执行。发送给没有设置延迟时间的消息队列,会直接被消费。

java">package cn.zsh.mq.spring;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.nio.charset.StandardCharsets;
import java.util.UUID;@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testTTLMessage(){// 创建消息Message message = MessageBuilder.withBody("这是一条延时5秒后执行的消息".getBytes(StandardCharsets.UTF_8)).setExpiration("5000")// 延时时间.build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息(这个消息必须要发给延时的消息队列)rabbitTemplate.convertAndSend("ttl.direct","ttl",message,correlationData);}}

惰性队列

惰性队列是为了防止消息大量积压的一种队列。

消息队列中的消息一般都存在内存中,而消息大量积压,就会产生很多问题,这时候可以使用惰性队列,惰性队列的消息保存在磁盘中。

创建惰性队列:

方案一:

在代码中声明

java">package cn.itcast.mq.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class CommonConfig {@Beanpublic Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy().build();}
}

方案二:

在浏览器中MQ的控制台声明

第一步:

第二步:

额外补充:


http://www.ppmy.cn/ops/136234.html

相关文章

神领物流运单服务业务流程以及相关面试点

运单服务 在该服务中用到了什么技术 --> 美团Leaf 什么是美团Leaf 美团Leaf是有美团开源的 , 用于处理分布式环境中保证ID的唯一性 , 通过该技术能够实现Id的全局唯一性 , 高可用性 , 趋势递增性以及容灾性. 美团leaf主要通过两种方式实现业务 雪花算法 该方式区别于传…

VB.Net笔记-更新ing

目录 1.1 设置默认VS的开发环境为VB.NET&#xff08;2024/11/18&#xff09; 1.2 新建一个“Hello&#xff0c;world”的窗体&#xff08;2024/11/18&#xff09; 1.3 计算圆面积的小程序&#xff08;2024/11/18&#xff09; 显示/隐式 声明 &#xff08;2024/11/18&…

React-自定义Hook与逻辑共享

#题引&#xff1a;我认为跟着官方文档学习不会走歪路 在 React 中&#xff0c;自定义 Hook 是一种复用逻辑的方式。自定义 Hook 是一个 JavaScript 函数&#xff0c;名称以 use 开头&#xff0c;可以调用其他的 Hook, 可以返回任意值。 创建自定义Hook 假设你正在开发一款重…

MATLAB 2024a安装包下载及安装教程

[安装环境]: Win 11/Win 10 MATLAB和Mathematica、Maple并称为三大数学软件。它在数学类科技应用软件中在数值计算方面首屈一指。行矩阵运算、绘制函数和数据、实现算法、创建用户界面、连接其他编程语言的程序等。MATLAB的基本数据单位是矩阵&#xff0c;它的指令表达式与数学…

MySQL深度剖析-全局锁、表锁、行锁

一、锁的基本概念 事务与锁是不同的。事务具有ACID( 原子性、一致性、隔离性和持久性)&#xff0c;锁是用于解决隔离性的一种机制。事务的隔离级别通过锁的机制来实现。 锁机制是为了解决数据库的并发控制问题而产生的。如在同一时刻&#xff0c;客户端对同一个表做更新或查询…

elasticsearch介绍和部署

1 elasticsearch介绍 Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。可以很方便的使大量数据具有搜索、分析和探索的能力。充分利用Elasticsearch的水平伸缩性。Elasticsearch 的实现原理主要分为以下几个步骤&#xff0c;首先用户将数据提交到Elasticsea…

Dubbo HTTP接入之triple协议

在 triple协议规范 中我们曾详细介绍了 triple 对于浏览器、网关的友好性设计&#xff0c;其中非常重要的一点是 triple 同时支持跑在 HTTP/1、HTTP/2 上&#xff1a; 在后端服务之间使用高效的 triple 二进制协议。对于前端接入层&#xff0c;则支持所有标准 HTTP 工具如 cUR…

unity3d——基础篇2刷(三角函数练习题)

1. 移动速度和变化速度 面朝向移动速度 (moveSpeed): 控制对象沿其当前朝向&#xff08;通常是摄像机方向&#xff09;的移动速度。左右曲线移动变化的速度 (changeSpeed): 控制对象左右移动速度的变化频率。 2. 移动距离控制 左右曲线移动距离控制 (changeSize): 控制对象左…