rabbitmq消息投递失败

news/2024/8/27 3:47:24/ 标签: rabbitmq, 分布式

在 RabbitMQ 中,消息投递失败可能会发生在多个阶段,比如从生产者到交换机、从交换机到队列、从队列到消费者等。处理消息投递失败需要采取适当的措施来确保消息的可靠性和系统的健壮性。以下是处理不同阶段消息投递失败的方法:

1. 从生产者到交换机的投递失败

当生产者发送消息到交换机时,如果交换机不存在或者消息被交换机拒绝(比如 mandatory 参数设置为 true 而没有合适的队列),可以通过以下方式处理:

使用 Confirm 模式

生产者可以使用 RabbitMQ 的 Confirm 模式来确保消息成功发送到交换机。

import com.rabbitmq.client.*;public class Producer {private final static String EXCHANGE_NAME = "example_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.confirmSelect(); // Enable publisher confirmationsString message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "routing_key", null, message.getBytes("UTF-8"));if (channel.waitForConfirms()) {System.out.println(" [x] Message sent successfully");} else {System.out.println(" [x] Message delivery failed");}}}
}

2. 从交换机到队列的投递失败

如果消息发送到交换机后没有合适的队列绑定,可以使用 mandatory 参数和 ReturnListener 来处理未路由的消息。

import com.rabbitmq.client.*;public class Producer {private final static String EXCHANGE_NAME = "example_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {String message = new String(body, "UTF-8");System.out.println(" [x] Message returned: " + message);// 处理未路由的消息});String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "invalid_routing_key", true, null, message.getBytes("UTF-8"));}}
}

3. 从队列到消费者的投递失败

在消息从队列投递到消费者时,如果消费者无法处理消息,消费者可以使用 NackReject 来处理失败的消息。

使用手动消息确认

消费者可以手动确认消息,如果处理失败,可以选择 Nack 消息并重新入队,或者 Reject 消息并将其投递到死信队列(Dead Letter Queue)。

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1); // Fair dispatchDeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 处理消息processMessage(message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,重新入队或投递到死信队列channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}};boolean autoAck = false; // Explicitly acknowledge messagechannel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}}private static void processMessage(String message) throws Exception {// 消息处理逻辑if (message.contains("error")) {throw new Exception("Processing error");}System.out.println(" [x] Processed '" + message + "'");}
}

4. 使用死信队列(DLQ)

配置死信队列,当消息在队列中被拒绝、过期或者达到最大重试次数时,将其投递到死信队列以便后续处理。

配置示例
import com.rabbitmq.client.*;import java.util.HashMap;
import java.util.Map;public class DLQExample {private static final String MAIN_QUEUE = "main_queue";private static final String DLQ_QUEUE = "dlq_queue";private static final String EXCHANGE_NAME = "exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// Declare DLQchannel.queueDeclare(DLQ_QUEUE, true, false, false, null);// Declare main queue with DLQ settingsMap<String, Object> argsMap = new HashMap<>();argsMap.put("x-dead-letter-exchange", "");argsMap.put("x-dead-letter-routing-key", DLQ_QUEUE);channel.queueDeclare(MAIN_QUEUE, true, false, false, argsMap);// Declare exchange and bind main queuechannel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueBind(MAIN_QUEUE, EXCHANGE_NAME, "routing_key");String message = "Test Message";channel.basicPublish(EXCHANGE_NAME, "routing_key", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

总结

为了处理 RabbitMQ 中的消息投递失败,应该综合使用以下策略:

  1. 使用 Confirm 模式和 ReturnListener 确保消息从生产者正确发送到交换机和队列。
  2. 使用手动消息确认机制处理消费者无法处理的消息。
  3. 配置死信队列处理无法处理或过期的消息。
  4. 确保消息处理逻辑具有幂等性,以防止重复处理导致的数据不一致。

通过这些方法,可以提高消息系统的可靠性和健壮性,确保消息不会丢失或重复处理。


http://www.ppmy.cn/news/1475812.html

相关文章

阿里云通义千

**阿里云通义千问是阿里云自主研发的一款超大规模语言模型**&#xff0c;专门用于深入理解和分析用户输入的自然语言&#xff0c;以便在不同领域和任务中为用户提供智能服务与协助。以下是对阿里云通义千问模型的相关介绍&#xff1a; 1. **模型能力** - **单轮对话能力**&…

整数或小数点后补0操作

效果展示&#xff1a; 整数情况&#xff1a; 小数情况&#xff1a; 小编这里是以微信小程序举例&#xff0c;代码通用可兼容vue等。 1.在utils文件下创建工具util.js文本 util.js页面&#xff1a; // 格式…

通讯录-C/C++

问题描述 设计一个通讯录管理程序&#xff0c;要求程序采用模块化设计方法&#xff0c;程序应采用由主控程序调用各模块实现各个功能的方式。程序应具有如下功能&#xff1a;输入记录、显示记录、查找记录、插入记录、记录排序、删除记录等。数据存储采用外存存储形式&#xff…

防火墙双机热备(接上一个NAT实验)

一、实验拓扑 二、实验需求 1、对现有网络进行改造升级&#xff0c;将当个防火墙组网改成双机热备的组网形式&#xff0c;做负载分担模式&#xff0c;游客区和DMZ区走FW3&#xff0c;生产区和办公区的流量走FW1 2、办公区上网用户限制流量不超过100M&#xff0c;其中销售部人员…

weblogic中间件运维常见问题

背景&#xff1a; 工作需要经常使用到weblogic中间件产品&#xff0c;在维护过程中有遇见的一些常见故障问题&#xff0c;这里分享给大家。 问题一&#xff1a;密码文件报错 问题描述&#xff1a; weblogic应用在启动过程中出现如下的报错内容&#xff1a; # tail -f nohup.ou…

vscode使用及调试方式和技巧

常用快捷键 ctrl ~ 显示隐藏终端面板 Ctrl\ 快速拆分文件编辑 Alt ↑↓ 移动当前代码行的位置 CtrlD 选中当前匹配项 CtrlB 切换侧边栏 alt 单机左键 或 长按鼠标滚轮鼠标左键下拉 添加多处光标 Ctrlp 快捷键设置 vscode调试 2022年了&#xff0c;该学会用VSC…

React组件间通信的几种方式

一、Props向下传递&#xff08;Top-Down Propagation&#xff09; 父组件通过props将其状态或数据传递给子组件。 父组件&#xff1a; class ParentComponent extends React.Component {state { message: Hello World };render() {return <ChildComponent message{this.…

SpringBootWeb 篇-入门了解 Swagger 的具体使用

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 Swagger 介绍 1.1 Swagger 和 Yapi 的使用场景 2.0 Swagger 的使用方式 2.1 导入 knife4j 的 maven 坐标 2.2 在配置类中加入 knife4j 相关配置 2.3 设置静态资源…

WPF实现一个带旋转动画的菜单栏

WPF实现一个带旋转动画的菜单栏 一、创建WPF项目及文件1、创建项目2、创建文件夹及文件3、添加引用 二、代码实现2.ControlAttachProperty类 一、创建WPF项目及文件 1、创建项目 打开VS2022,创建一个WPF项目&#xff0c;如下所示 2、创建文件夹及文件 创建资源文件夹&…

小程序创建与项目初始化(构建 npm + 集成 Sass)

一、打开微信开发者工具 确认 左侧导航栏是否选中的 小程序点击 【】创建小程序 二、创建小程序 三、初始化 清空 app.wxss、app.js 去掉 rendererOptions 和 componentFramework 不需要最新的搜索引擎 留下以下文件 四、自定义构建 npm 集成 Sass 首先 先把小程序源…

JMeter进行HTTP接口测试的技术要点

参数化 用户定义的变量 用的时候 ${名字} 用户参数 在参数列表中传递 并且也是${} csv数据文件设置 false 不忽略首行 要首行 从第一行读取 true 忽略首行 从第二行开始 请求时的参数设置&#xff1a; 这里的名称是看其接口需要的请求参数的名称 这里的变量名称就是为csv里面…

QLoRa使用教程

一、定义 定义案例1 二、实现 定义 QLoRa: 量化LoRa. 网址&#xff1a;https://huggingface.co/docs/peft/main/en/developer_guides/quantization案例1 1. 4bit 量化LoRa import torch from transformers import BitsAndBytesConfigconfig BitsAndBytesConfig(load_in_4b…

各地户外分散视频监控点位,如何实现远程集中实时监看?

公司业务涉及视频监控项目承包搭建&#xff0c;此前某个项目需求是为某林业公司提供视频监控解决方案&#xff0c;需要实现各地视频摄像头的集中实时监看&#xff0c;以防止国家储备林的盗砍、盗伐行为。 公司原计划采用运营商专线连接各个视频监控点位&#xff0c;实现远程视…

计算机图形学入门28:相机、透镜和光场

1.前言 相机(Cameras)、透镜(Lenses)和光场(Light Fields)都是图形学中重要的组成部分。在之前的学习中&#xff0c;都是默认它们的存在&#xff0c;所以现在也需要单独拿出来学习下。 2.成像方法 计算机图形学有两种成像方法&#xff0c;即合成(Synthesis)和捕捉(Capture)。前…

HBase 在统一内容平台业务的优化实践

作者&#xff1a;来自 vivo 互联网服务器团队-Leng Jianyu、Huang Haitao HBase是一款开源高可靠性、扩展性、高性能和灵活性的分布式非关系型数据库&#xff0c;本文围绕数据库选型以及使用HBase的痛点展开&#xff0c;从四个方面对HBase的使用进行优化&#xff0c;取得了一些…

使用Redis实现签到功能:Java示例解析

使用Redis实现签到功能&#xff1a;Java示例解析 在本博客中&#xff0c;我们将讨论一个使用Redis实现的签到功能的Java示例。该示例包括两个主要方法&#xff1a;sign()和signCount()&#xff0c;分别用于用户签到和计算用户当月的签到次数。 1. 签到方法&#xff1a;sign()…

3d为什么删掉模型不能返回?---模大狮模型网

在展览3D模型设计行业中&#xff0c;设计师们经常面临一个关键问题&#xff1a;一旦删除了模型的某些部分&#xff0c;为什么很难或者无法恢复原始状态?这不仅是技术上的挑战&#xff0c;更是设计过程中需要深思熟虑的重要考量。本文将探讨这一问题的原因及其在实际工作中的影…

【图解大数据技术】Spark

【图解大数据技术】Spark Spark简介RDDSpark示例Spark运行原理整体流程DAG 与 stage 为什么Spark比MapReduce快&#xff1f; Spark简介 Spark与MapReduce一样&#xff0c;也是大数据计算框架。Spark相比MapReduce拥有更快的执行速度和更低的编程复杂度。 Spark包括以下几个模…

AI算法17-贝叶斯岭回归算法Bayesian Ridge Regression | BRR

贝叶斯岭回归算法简介 贝叶斯岭回归&#xff08;Bayesian Ridge Regression&#xff09;是一种回归分析方法&#xff0c;它结合了岭回归&#xff08;Ridge Regression&#xff09;的正则化特性和贝叶斯统计的推断能力。这种方法在处理具有大量特征的数据集时特别有用&#xff…

搜索引擎中的相关性模型

一、什么是相关性模型&#xff1f; 相关性模型主要关注的是query和doc的相关性。例如给定query&#xff0c;和1000个doc&#xff0c;找到哪个doc是好query最相关的。 二、为什么需要相关性模型&#xff1f; 熟悉es的应该都熟悉BM25相关性算法。它是一个很简单的相关性算法。我…