RabbitMQ 消息顺序性保证

ops/2025/2/12 5:17:22/

方式一:Consumer设置exclusive

在这里插入图片描述

注意条件

  • 作用于basic.consume
  • 不支持quorum queue
    在这里插入图片描述
    当同时有A、B两个消费者调用basic.consume方法消费,并将exclusive设置为true时,第二个消费者会抛出异常:
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'test' in vhost '/' in exclusive use, class-id=60, method-id=20)at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)at java.base/java.lang.Thread.run(Thread.java:840)

Spring AMQP 如何通过exclusive实现顺序消费:

在这里插入图片描述
核心逻辑

while (!DirectMessageListenerContainer.this.started && isRunning()) {this.cancellationLock.reset();try {for (String queue : queueNames) {consumeFromQueue(queue);}}catch (AmqpConnectException | AmqpIOException e) {long nextBackOff = backOffExecution.nextBackOff();if (nextBackOff < 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {DirectMessageListenerContainer.this.aborted = true;shutdown();this.logger.error("Failed to start container - fatal error or backOffs exhausted",e);this.taskScheduler.schedule(this::stop, Instant.now());break;}this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);doShutdown();try {Thread.sleep(nextBackOff); // NOSONAR}catch (InterruptedException e1) {Thread.currentThread().interrupt();}continue; // initialization failed; try again having rested for backOff-interval}DirectMessageListenerContainer.this.started = true;DirectMessageListenerContainer.this.startedLatch.countDown();
}
  1. 抛出异常后,会重试
  2. 重试间隔、次数受recoveryInterval(默认无限)、recoveryBackOff控制

方式二:single active consumer

在这里插入图片描述

原理:

在这里插入图片描述

代码示例

Channel ch = ...;
Map<String, Object> arguments = newHashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

在这里插入图片描述
参考资料:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams


http://www.ppmy.cn/ops/157151.html

相关文章

OpenCV:图像修复

目录 简述 1. 原理说明 1.1 Navier-Stokes方法&#xff08;INPAINT_NS&#xff09; 1.2 快速行进方法&#xff08;INPAINT_TELEA&#xff09; 2. 实现步骤 2.1 输入图像和掩膜&#xff08;Mask&#xff09; 2.2 调用cv2.inpaint()函数 2.3 完整代码示例 2.4 运行结果 …

血压计OCR文字检测数据集VOC+YOLO格式2147张11类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;2147 标注数量(xml文件个数)&#xff1a;2147 标注数量(txt文件个数)&#xff1a;2147 …

亚远景-从SPICE到ASPICE:汽车软件开发的标准化演进

一、SPICE标准的起源与背景 SPICE&#xff0c;全称“Software Process Improvement and Capability dEtermination”&#xff0c;即“软件流程改进和能力测定”&#xff0c;是由国际标准化组织ISO、国际电工委员会IEC、信息技术委员会JTC1联合发起制定的ISO 15504标准。该标准旨…

word数学模式公式显示不全

1.调整段落间距&#xff0c;更换行距 2.调整单个公式的内部位置 右键公式---字体-----

百度的冰桶算法

百度的冰桶算法&#xff08;Ice Bucket Algorithm&#xff09;是百度搜索引擎用于打击低质量内容的一种算法。该算法主要针对那些通过大量堆砌关键词、内容质量低下、用户体验差的网页进行惩罚&#xff0c;从而提升搜索结果的质量。 冰桶算法的核心目标&#xff1a; 打击低质…

kafka服务端之控制器

文章目录 概述控制器的选举与故障恢复控制器的选举故障恢复 优雅关闭分区leader的选举 概述 在Kafka集群中会有一个或多个broker&#xff0c;其中有一个broker会被选举为控制器&#xff08;Kafka Controler&#xff09;&#xff0c;它负责管理整个集群中所有分区和副本的状态。…

nuxt3中使用useFetch请求刷新不返回数据或返回html结构问题解决-完整nuxt3useFetchtch请求封装

前言 如果使用nuxt3写项目&#xff0c;可以查看nuxt3实战&#xff1a;完整的 nuxt3 vue3 项目创建与useFetch请求封装&#xff0c;此篇内容有详细步骤 但在此篇内容中useFetch请求在页面有多个请求的情况下&#xff0c;或者放在客户端渲染情境下是失败的&#xff0c;所以在此篇…

Spring Boot篇

为什么要用Spring Boot Spring Boot 优点非常多&#xff0c;如&#xff1a; 独立运行 Spring Boot 而且内嵌了各种 servlet 容器&#xff0c;Tomcat、Jetty 等&#xff0c;现在不再需要打成 war 包部署到 容器 中&#xff0c;Spring Boot 只要打成一个可执行的 jar 包就能独…