在nodejs中使用RabbitMQ(五)死信队列,延迟队列

news/2025/2/19 17:53:32/

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种机制,用于处理无法成功消费或不能按预期处理的消息。简单来说,死信队列用于存储那些不能被正常消费或处理的消息,以便后续审查或重新处理。

死信队列用于处理以下几种情况的消息:

1、消息过期:如果消息的 TTL(存活时间)到期,消息会被认为是死信,自动转发到死信队列。

2、消息被拒绝(nack:消费者拒绝消息时,如果消息被标记为不重新排队(requeue: false),消息会被送往死信队列。

3、队列满:如果队列已经达到最大长度,且还有新消息进入,RabbitMQ 会丢弃老的消息并将其送到死信队列。

4、消息无法路由:如果消息没有匹配到任何队列的路由规则,它会被送往死信队列。

配置死信队列

RabbitMQ 提供了两种主要机制来设置死信队列:

1、死信交换机(Dead Letter Exchange,DLX): 

  • 设置一个专门的交换机(Dead Letter Exchange,DLX),并将死信消息转发到该交换机。
  • 可以为队列设置 x-dead-letter-exchange 属性来指定死信交换机。

    2、死信路由键(Dead Letter Routing Key): 

    • 可以设置死信消息的路由键,以便将消息路由到适当的死信队列。
    • 通过 x-dead-letter-routing-key 属性来配置。

    3、 消息过期时间(TTL):

    • 你可以设置队列的消息存活时间(TTL)。消息在过期后会变为死信并转发到死信队列。
    • 通过 x-message-ttl 属性来设置消息的生存时间。

    死信队列创建

    1、一个生产者1,一个exchange交换机1,一个消费者1(主队列)。

    2、一个exchange交换机2,一个消费者2(死信队列)。

    3、消费者1要配置队列参数'x-dead-letter-exchange','x-dead-letter-routing-key', 'x-message-ttl'。

    生产者1通过exchange1发送消息给消费者1,如果消息不能正常处理,会通过exchange2转发给消费者2. 

    示例

    一个生产者,将消息发送给两个消费者,消费者消息如果处理失败,会将消息转发给死信队列。

    producer.ts

    import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange8';await channel.assertExchange(exchangeName,'direct',{durable: true,},);const deadExchangeName = 'exchange8_dead_letter';await channel.assertExchange(deadExchangeName,'direct',{durable: true,},);let routeKey = 'route.key';for (let i = 0; i < 4; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,routeKey,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
    }start();
    

    dead_letter.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const deadExchangeName = 'exchange8_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 死信队列const deadQueueName = 'queue8';channel.assertQueue(deadQueueName, {durable: true,  // 保证死信队列持久化});channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);channel.consume(deadQueueName, (msg) => {console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());if (msg) {channel.ack(msg);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

    consumer.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange8';const routeKey = 'route.key';const deadExchangeName = 'exchange8_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 主队列const queueName = 'queue7';channel.assertQueue(queueName, {durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在arguments: {// 对应的死信交换机(空字符串表示默认交换机)'x-dead-letter-exchange': deadExchangeName,// 死信队列的路由键'x-dead-letter-routing-key': deadRouteKey,// 队列消息过期时间1分钟'x-message-ttl': 60000,},});channel.bindQueue(queueName,exchangeName,routeKey,{},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);channel.consume(queueName, function (msg) {if (Math.ceil(Math.random() * 100) > 50 && msg) {console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());channel.ack(msg);} else if (msg) {console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);} else {}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

    consumer2.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange8';const routeKey = 'route.key';const deadExchangeName = 'exchange8_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 主队列const queueName = 'queue9';channel.assertQueue(queueName, {durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在arguments: {// 对应的死信交换机(空字符串表示默认交换机)'x-dead-letter-exchange': deadExchangeName,// 死信队列的路由键'x-dead-letter-routing-key': deadRouteKey,// 队列消息过期时间1分钟'x-message-ttl': 60000,},});channel.bindQueue(queueName,exchangeName,routeKey,{},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);channel.consume(queueName, function (msg) {if (Math.ceil(Math.random() * 100) > 50 && msg) {console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());channel.ack(msg);} else if (msg) {console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);} else {}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

     延迟队列

    方法一

    rabbitmq本身没有直接提供延迟队列,可以通过安装插件实现(注:目前支持到4.0.2)。

    rabbitmq_delayed_message_exchange

     文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

    下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

     方法二

     通过死信队列实现消息的延迟处理。主队列并不会被直接消费,而是通过设置 x-message-ttl(即消息的过期时间)来控制消息的存活时间。消息在主队列中的 TTL 到期后,它会被转发到死信队列(Dead Letter Queue)中,在死信队列中处理消息。

    相比于上述死信队列的实现,在consumer.ts中删除了channel.consume消息接收相关代码。

     producer.ts

    import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange9';await channel.assertExchange(exchangeName,'direct',{durable: true,},);const deadExchangeName = 'exchange9_dead_letter';await channel.assertExchange(deadExchangeName,'direct',{durable: true,},);let routeKey = 'route.key';for (let i = 0; i < 4; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,routeKey,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
    }start();
    

    consumer.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange9';const routeKey = 'route.key';const deadExchangeName = 'exchange9_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 主队列const queueName = 'queue10';channel.assertQueue(queueName, {durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在arguments: {// 对应的死信交换机(空字符串表示默认交换机)'x-dead-letter-exchange': deadExchangeName,// 死信队列的路由键'x-dead-letter-routing-key': deadRouteKey,// 队列消息过期时间10s'x-message-ttl': 10000,},});channel.bindQueue(queueName,exchangeName,routeKey,{},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);// channel.consume(queueName, function (msg) {//   if (Math.ceil(Math.random() * 100) > 50 && msg) {//     console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());//     channel.ack(msg);//   } else if (msg) {//     console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());//     // 第二个参数,false拒绝当前消息//     // 第二个参数,true拒绝小于等于当前消息//     // 第三个参数,3false从队列中清除//     // 第三个参数,4true从新在队列中排队//     channel.nack(msg, false, false);//   } else {//   }// }, {//   // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);//   noAck: false,//   arguments: {}// }, (err: any, ok: Replies.Empty) => {//   console.log(err, ok);// });});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

    dead_letter.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const deadExchangeName = 'exchange9_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 死信队列const deadQueueName = 'queue11';channel.assertQueue(deadQueueName, {durable: true,  // 保证死信队列持久化});channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);channel.consume(deadQueueName, (msg) => {console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());if (msg) {channel.ack(msg);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

     

     


    http://www.ppmy.cn/news/1572461.html

    相关文章

    学习京东写测试用例

    以下是京东等大型电商平台测试用例编写的通用方法论及示例&#xff0c;结合功能模块、测试类型和实际场景进行设计&#xff1a; 一、测试用例设计原则 覆盖全面&#xff1a;覆盖核心业务流程、异常场景、边界条件、用户角色差异。优先级分层&#xff1a; P0&#xff08;最高&a…

    【AIDevops】Deepseek驱动无界面自动化运维与分布式脚本系统,初探运维革命之路

    声明&#xff1a;笔者当前文章内容仍在构想阶段&#xff0c;仅部分实现 目录 引言 第一部分&#xff1a;基于DeepSeek大模型的单机GPT实现 1. DeepSeek大模型简介 2. 功能概述 3. 项目优势&#xff0c;实现技术栈及实现功能 4. 示例展示 5.腾讯云AI代码助手助力 第二部…

    <论文>DeepSeek-R1:通过强化学习激励大语言模型的推理能力(深度思考)

    一、摘要 本文跟大家来一起阅读DeepSeek团队发表于2025年1月的一篇论文《DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning | Papers With Code》&#xff0c;新鲜的DeepSeek-R1推理模型&#xff0c;作者规模属实庞大。如果你正在使用Deep…

    k8s配置GPU感知:k8s-device-plugin的使用(已踩完坑)

    1&#xff0c;定义 Kubernetes 的 NVIDIA 设备插件是一个 Daemonset&#xff0c;它允许自动&#xff1a; 暴露集群中每个节点上的 GPU 数量跟踪 GPU 的运行状况在 Kubernetes 集群中运行支持 GPU 的容器 2&#xff0c;需要满足的前置条件 NVIDIA drivers ~ 384.81nvidia-do…

    嵌入式硬件篇---原码、补码、反码

    文章目录 前言简介八进制原码、反码、补码1. 原码规则示例问题 2. 反码规则示例问题 3. 补码规则示例优点 4. 补码的运算5. 总结 十六进制原码、反码、补码1. 十六进制的基本概念2. 十六进制的原码规则示例 3. 十六进制的反码规则示例 4. 十六进制的补码规则示例 5. 十六进制补…

    ORB-SLAM3的源码学习: Settings.cc:settings构造函数

    前言 配置文件的相关的构造函数 1.函数声明 settings的构造函数 Settings::Settings(const std::string &configFile, const int &sensor) : bNeedToUndistort_(false), bNeedToRectify_(false), bNeedToResize1_(false), bNeedToResize2_(false) 这个构造函数接…

    spring 中 AspectJ 基于 XML 的实现分析

    前面的文章介绍了 spring 引入 AspectJ 之后&#xff0c;基于注解实现 AOP 的过程分析&#xff0c;今天我们来看下AspectJ 基于 XML 的 AOP 实现逻辑。 XML 的实现示例可以参考 AspectJ 对于 AOP 的实现。 aop:config 标签解析 先去 spring-aop 模块下&#xff0c;META-INF/…

    哈希:LeetCode49. 字母异位词分组 128.最长连续序列

    49. 字母异位词分组 给你一个字符串数组&#xff0c;请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 示例 1: 输入: strs ["eat", "tea", "tan", "ate",…