在nodejs中使用RabbitMQ(七)实现生产者确认

server/2025/2/21 2:23:29/
  • 生产者:批量发送消息(每批10条),每条消息附带唯一 correlationId,并监听确认队列(ackQueue)。

  • 消费者:处理消息后,通过 ackQueue 返回确认消息(携带原 correlationId)。

  • 超时重试:若某批消息在指定时间内未全部确认,未确认的消息会重新加入待发送队列。

producer.ts

import amqp from 'amqplib';async function start() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });async function produce(limit: number, data: string[], timeout: number = 10000) {let message = [...data];if (message.length > limit) {message = message.slice(0, limit);} else if (message.length < limit) {limit = message.length;}// 消息确认let cache: Array<{correlationId: string,message: string,isDelete: boolean,}> = new Array(limit).fill(null).map((_, index) => {return {correlationId: Math.random().toString().slice(2, -1),message: message[index],isDelete: false,};});for (let i = 0; i < limit; ++i) {channel.sendToQueue(queue, Buffer.from(cache[i].message), {correlationId: cache[i].correlationId,replyTo: ackQueue});}const consume = await channel.consume(ackQueue, (message) => {if (!message) {console.error('message is null', message);return;}let index = cache.findIndex((item) => item.correlationId === message.properties.correlationId);if (index !== -1) {cache[index].isDelete = true;console.log('confirmed success:', `"${message.content.toString()}"`, cache.every(item => item.isDelete));} else {console.log('confirmed fail:', `"${message.content.toString()}"`, cache, cache.every(item => item.isDelete), message.properties.correlationId);}channel.ack(message);});const sleep = (time: number) => {return new Promise<void>(resolve => setTimeout(() => resolve(), time));}let stop = false;const interval = async () => {await sleep(0);if (cache.every(item => item.isDelete) || stop) {return;} else {await interval();}}await Promise.race([interval(), // 监听本批次消息是否已经处理完成sleep(timeout), // 本批次消息最长处理时间]);stop = true;await channel.cancel(consume.consumerTag);// 没有收到确认的消息返回下一批处理继续处理return cache.filter(item => !item.isDelete).map(item => item.message);}// 发送1000条数据,分100批,每批10个let msg = new Array(100).fill(null).map((_, index) => `${index} message ${Math.random().toString().slice(2, -1)}`);while (msg.length) {let res = await produce(10, msg.slice(0, 10), 6000);msg = [...res, ...msg.slice(10, msg.length)];console.log('完成一批:', msg.length, '发送结果:', res.length, res);}
}start();

consumer.ts

import amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });channel.consume(queue, (message) => {if (message) {console.log(message?.content.toString(), message?.properties?.replyTo, message?.properties?.correlationId);// 消息处理完后,向 ackQueue 发送确认消息channel.sendToQueue(ackQueue, message?.content, {// 使用相同的 correlationId 来标识确认消息correlationId: message?.properties?.correlationId,// 将原 replyTo 信息传递回来// replyTo: queue,});// 确认 queue11 中的消息channel.ack(message);} else {console.error('message is null', message);}}, { noAck: false });
}produce();

 


http://www.ppmy.cn/server/169427.html

相关文章

SpringBean生命周期的执行流程

Spring Bean 的生命周期&#xff0c;就是一个 Bean 对象从诞生到消亡所经历的一系列过程&#xff0c;咱们可以把它想象成一个人从出生到去世的一生&#xff0c;下面详细给你讲讲&#xff1a; 1. 出生&#xff08;实例化&#xff09; 这就好比一个新生命呱呱坠地。Spring 容器就…

ib网络状态探测

在 InfiniBand 网络中&#xff0c;Host Channel Adapter&#xff08;HCA&#xff09;是关键组件&#xff0c;了解其状态和配置对于网络管理和故障排查至关重要。以下是一些常用的命令&#xff0c;用于查询和管理 HCA 的状态和配置。 常用命令 ibstat 功能&#xff1a;显示 HCA…

webpack和vite打包原理及比较

Webpack 和 Vite 是前端领域两种主流的构建工具&#xff0c;它们在设计理念、打包机制和适用场景上有显著差异。以下是它们的详细原理及对比分析&#xff1a; 一、Webpack 的打包原理 1. 核心机制 模块化与依赖解析 Webpack 将所有文件&#xff08;JS、CSS、图片等&#xff0…

NVIDIA 开发者社区第十一届Sky Hackathon训练营实验手册---AWS Sagemaker AI部分

NVIDIA 开发者社区第十一届Sky Hackathon训练营实验手册 第一部分 Sagemaker实验手册 在这部分实验中&#xff0c;我们将利用AWS Sagemaker下载并部署NIM。 以下是实验步骤&#xff1a; 1. 登录实验平台 利用浏览器&#xff0c;访问下面的地址&#xff0c;打开AWS的控制台网…

在Kubernetes上部署DeepSeek-R1进行高效AI推理

在本篇文章中&#xff0c;我们将介绍如何使用亚马逊云科技的Kubernetes服务Amazon EKS Auto Mode&#xff0c;在亚马逊云科技上部署DeepSeek模型。Amazon EKS Auto Mode提供了更强的灵活性和可扩展性&#xff0c;同时无需管理Kubernetes控制节点、计算、存储和网络组件&#xf…

全功能Python测试框架:pytest

python通用测试框架大多数人用的是unittestHTMLTestRunner&#xff0c;这段时间看到了pytest文档&#xff0c;发现这个框架和丰富的plugins很好用&#xff0c;所以来学习下pytest. pytest是一个非常成熟的全功能的Python测试框架&#xff0c;主要有以下几个特点&#xff1a; …

23种设计模式 - 适配器模式

模式定义 适配器模式&#xff08;Adapter Pattern&#xff09;是一种结构型设计模式&#xff0c;用于解决接口不兼容问题。它通过将一个类的接口转换为客户端期望的接口&#xff0c;使原本因接口不匹配而无法协同工作的类能够协同工作。适配器模式分为类适配器&#xff08;通过…

el与data的2种写法

el的2种写法 1.el: #root, <div id"root"> </div><script type"text/javascript">const x new Vue({el: #root,data: {name: 伏尔加}})</script> 2. x.$mount(#root) <div id"root"> </div><script …