微服务day07

news/2024/11/18 20:41:13/

MQ高级

发送者可靠性,MQ的可靠性,消费者可靠性。

发送者可靠性

发送者重连

连接重试的配置文件:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

关闭MQ。

运行测试用例结果,进行了三次重连。

11-12 10:15:41:975  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]

发送者确认

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启

开启发送者确认:

在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

java">package com.itheima.publisher.config;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆

  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

java">    @Testpublic void testObgect1() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//failure出现异常时的处理逻辑,基本不会触发。log.error("发送失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//判断是否成功发送到服务器if (result.isAck()){log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});//准备Map数据Map map = new HashMap();map.put("name","jack");map.put("age",21);rabbitTemplate.convertAndSend("obgect.queue1",map,cd);//用来接受返回的值Thread.sleep(2000);}

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

数据持久化

默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。

java">    @Testpublic void testTopic3(){//自定义消息为非持久化Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes()).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为bluerabbitTemplate.convertAndSend(name,"china.goods",build);}

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

Lazy Queue

MQ可靠性小结

消费者可靠性

消费者确认机制

消费者失败重试策略

相关文档:

哈哈哈

业务幂等性

 修改发送者代码,修改消息转换器:

java">    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

修改消费者代码来获取消息id:

java">    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(Message message) throws InterruptedException {log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");}

 运行结果: 由于前面的步骤设置了报错,因此重发了3次。

java">11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985  WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error

只需要修改消费者的处理逻辑部分即可:

java">package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class Orderlisten {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.topic"),key = "pay.success"))public void listenPaySuccess(Long orderId){//1、根据id获取订单信息Order order = orderService.getById(orderId);//判断订单状态,只有待支付才修改if (order == null || order.getStatus() != 1){//条件错误直接结束return;}orderService.markOrderPaySuccess(orderId);}
}

小结,小小面试题

延迟消息

死信交换机

消息的消费者代码:

创建普通交换机和信息队列,将消息队列的死信绑定死信交换机

java">dleConfigtasion.classpackage com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class dleConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic DirectExchange ttlExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)return ExchangeBuilder.directExchange("ttl.direct").build();}//创建消息队列@Beanpublic Queue ttltQueue(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列//指定该队列的死信交换机return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");}}

死信交换机和消息队列:

java">leatinMq.class   @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlt.queue"),exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),key = {"hi"}))public void Directlisten1dlt(String msg){System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());}

消息发送方:需要指定消息的存活时间:

java">    @Testpublic void testTopic(){//修改路由键为bluerabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",new MessagePostProcessor() {//可以对生成的message进行处理@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间为10秒钟message.getMessageProperties().setExpiration("1000");return message;}});}

代码结束。

延时插件:延时插件下载地址

插件官方文档:延时插件官方文档

详细操作文档: 黑马文档

取消超时订单


http://www.ppmy.cn/news/1548078.html

相关文章

机器学习—Additional Layer Types

到目前为止&#xff0c;我们使用的所有神经网络都是密集型的&#xff0c;一层中的每个神经元&#xff0c;上一层的所有激活&#xff0c;事实证明&#xff0c;仅仅使用密集层类型&#xff0c;可以建立一些非常强大的学习算法&#xff0c;并帮助你建立关于神经网络能做什么的进一…

css:修改盒子样式

圆角边框 在css3中新增了圆角边框样式&#xff0c;这样我们的盒子就可以长得奇形怪状了 像csdn上的发布就是圆角边框 还有这些 .x,.y {background-color: cornflowerblue;width: 200px;height: 200px;margin: 0 auto;text-align: center;border-radius: 10px;} 10px是什么意思…

基于K8S1.28.2实验rook部署ceph

基于K8S1.28.2实验rook部署ceph 原文链接&#xff1a; 基于K8S1.28.2实验rook部署ceph | 严千屹博客 Rook 支持 Kubernetes v1.22 或更高版本。 rook版本1.12.8 K8S版本1.28.2 部署出来ceph版本是quincy版本 主机名ip1(NAT)系统新硬盘磁盘内存master1192.168.48.101Centos7.910…

【从零开始的算法学习日记✨优选算法篇✨】第一章:双指针技巧

✨放在开头&#xff1a; 【从零开始的算法学习日记】系列将包含四个篇章&#xff0c;分别是【优选算法篇】、【递归/搜索/回溯算法篇】、【动态规划篇】以及【贪心算法篇】&#xff0c;每一篇章下面都会有若干章节以及数十道例题详解&#xff0c;博主计划在明年开春前陆续出完这…

# JAVA中的Stream学习

JAVA中的Stream 1、Stream是什么? Stream 是 Java 8 引入的一个新的抽象层&#xff0c;用于处理数据集合。它可以让你以声明式的方式处理数据&#xff0c;类似于 SQL 语句的查询方式。 2、Stream能够做什么? 过滤&#xff1a;通过条件筛选数据。映射&#xff1a;转换数据…

30 秒!用通义灵码画 SpaceX 星链发射流程图

不想读前人“骨灰级”代码&#xff0c; 不想当“牛马”程序员&#xff0c; 想像看图片一样快速读复杂代码和架构&#xff1f; 来了&#xff0c;灵码又加新 buff&#xff01;&#xff01; 通义灵码支持代码逻辑可视化&#xff0c; 可以把你的每段代码画成流程图。 你可以把…

Java基础-组件及事件处理(中)

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 BorderLayout布局管理器 说明&#xff1a; 示例&#xff1a; FlowLayout布局管理器 说明&#xff1a; …

Vue 与 React 前端框架差异对比及案例分析

一、设计理念 1.Vue&#xff1a; Vue 被设计为渐进式框架&#xff0c;能够自底向上逐层应用。这意味着可以将其灵活地应用于现有项目的一部分&#xff0c;无需对整个项目进行大规模重构。强调数据驱动视图&#xff0c;通过响应式数据绑定&#xff0c;当数据发生变化时&#x…