RabbitMQ 消息队列 优化发送邮件

server/2025/2/23 0:12:09/

express 发送邮件

  • 最简单的异步发送邮件方法
  • 为何要使用 RabbitMQ?
  • 如何在 Node 项目中集成 RabbitMQ?

一、 不用 await 发送邮件

在实际开发之前,不妨先思考下,我们最终的目的是为了让邮件异步发送。那发送邮件这里有个await,我们干嘛不直接把这个await去掉,这不就完事了吗?这样不就是不等待邮件发送完成,直接提示成功了吗?

我要告诉大家,你想的一点也没错,这样做 100% 可以的。当然这样做,也会造成一些问题。因为不等待异步执行的结果,如果发送邮件出现错误了,会导致catch里无法捕获错误

1、参考案例:
router.post('/sign_up', validateCaptcha, async function (req, res) {try {const body = {email: req.body.email,username: req.body.username,nickname: req.body.nickname,password: req.body.password,sex: 2,role: 0}const user = await User.create(body);delete user.dataValues.password;         // 删除密码// 请求成功,删除验证码,防止重复使用await delKey(req.body.captchaKey);const html = `您好,<span style="color: red">${user.nickname}。</span><br><br>恭喜,您已成功注册会员!<br><br>xw`await sendMail(user.email, '「xw」的注册成功通知', html);success(res, '创建用户成功。', {user}, 201);} catch (error) {failure(res, error);}
});
2、去掉sendMail前的await
router.post('/sign_up', validateCaptcha, async function (req, res) {try {const body = {email: req.body.email,username: req.body.username,nickname: req.body.nickname,password: req.body.password,sex: 2,role: 0}const user = await User.create(body);delete user.dataValues.password;         // 删除密码// 请求成功,删除验证码,防止重复使用await delKey(req.body.captchaKey);const html = `您好,<span style="color: red">${user.nickname}。</span><br><br>恭喜,您已成功注册会员!<br><br>xw`sendMail(user.email, '「xw」的注册成功通知', html);success(res, '创建用户成功。', {user}, 201);} catch (error) {failure(res, error);}
});

结果 :可以注册成功 但是终端崩溃了,再去请求其他任何接口,都是无法访问的了

在这里插入图片描述

在这里插入图片描述

3、原因就是因为,这是一个异步操作,我们没有等待异步操作的结果,所以没法在这里捕获异常。解决方法也很简单,打开utils/mail.js,给发送邮件函数,加上try…catch即可。
/*** 发送邮件* @param email* @param subject* @param html* @returns {Promise<void>}*/
const sendMail = async (email, subject, html) => {try {await transporter.sendMail({from: process.env.MAILER_USER,to: email,subject,html,});} catch (error) {console.log('邮件发送失败:', error);}
};

在这里插入图片描述

最后运行:虽然现在还是报错了,但是程序自身并没有崩溃。你去访问其他接口,也是完全不受影响的。

那么现在,我们就用非常简单的方式,实现了异步发送邮件了。这样是完全可行的

二、 为何要使用 RabbitMQ

既然这么简单就能实现了,那为何还要使用RabbitMQ呢?我觉得主要有以下三点目的

  • 1、解耦:将发送邮件的功能独立出去,这样即使邮件服务出现问题,也不会直接影响主应用的性能和可用性。
  • 2、增加并发:对于高并发的情况,将任务放入队列中可以起到缓冲作用,增加程序的吞吐量,防止后端服务因瞬间请求过多而崩溃。
  • 3、错误处理:如果消息队列中的任务处理失败,可以进行重试,确保任务最终能够成功完成。
    所以,对于非常简单的功能,你确实可以不写await来达到异步的目的。但对于大型程序来说,为了有更好的性能,增加并发处理能力,提高错误处理的可靠性,使用消息队列是一个更好的选择。

所以,对于非常简单的功能,你确实可以不写await来达到异步的目的。但对于大型程序来说,为了有更好的性能,增加并发处理能力,提高错误处理的可靠性,使用消息队列是一个更好的选择。

三. 在 Node 项目中集成 RabbitMQ

3.1. 环境变量

打开项目的.env文件,加入

RABBITMQ_URL=amqp://admin:xw@localhost

这样连接到RabbitMQ的时候,相关的信息,就可以从环境变量读取。

.env.example中加入

RABBITMQ_URL=

README.md中加入对应的说明

RABBITMQ_URL=你的连接- `RABBITMQ_URL`配置为消息队列服务器地址。
3.2.、连接到 RabbitMQ

新建utils/rabbit-mq.js,里面创建一个连接

const amqp = require('amqplib');
const sendMail = require('./mail');// 创建全局的 RabbitMQ 连接和通道
let connection;
let channel;/*** 连接到 RabbitMQ* @returns {Promise<*>}*/
const connectToRabbitMQ = async () => {if (connection && channel) return;  // 如果已经连接,直接返回try {connection = await amqp.connect(process.env.RABBITMQ_URL);channel = await connection.createChannel();await channel.assertQueue('mail_queue', { durable: true });} catch (error) {console.error('RabbitMQ 连接失败:', error);}
};
  • 顶部做了相关的引用。
  • 创建了全局的连接和通道。
  • 如果已经连接了,就直接返回。如果没有连接就连上,并创建通道。
  • 创建了队列,名字叫做:mail_queue。
  • 使用了durable: true,表示队列需要持久化。

3.3、 邮件队列生产者

接着继续建一个方法,来发送邮件

/*** 邮件队列生产者(发送消息)*/
const mailProducer = async (msg) => {try {await connectToRabbitMQ(); // 确保已连接channel.sendToQueue('mail_queue', Buffer.from(JSON.stringify(msg)), { persistent: true });} catch (error) {console.error('邮件队列生产者错误:', error);}
};
  • 这里就直接将消息,发送到队列中。
  • 注意用JSON.stringify转了一下,说明我们传过来的msg将会是对象格式。
  • 使用了persistent: true,表示消息需要持久化。
3.4、 邮件队列消费者
/*** 邮件队列消费者(接收消息)*/
const mailConsumer = async () => {try {await connectToRabbitMQ();channel.consume('mail_queue',async (msg) => {const message = JSON.parse(msg.content.toString());await sendMail(message.to, message.subject, message.html);}, {noAck: true,});} catch (error) {console.error('邮件队列消费者错误:', error);}
};module.exports = {mailProducer,mailConsumer,
};
  • 我们监听了mail_queue队列。
  • 如过收到消息了,就执行发送邮件。
  • 使用了noAck: true,表示自动确认消息。

四. 实际运用

4.1、启用生产者

打开routes/auth.js

// const sendMail = require('../utils/mail');
const { mailProducer } = require('../utils/rabbit-mq');/*** 用户注册* POST /auth/sign_up*/
router.post('/sign_up', validateCaptcha, async function (req, res) {try {// ...// 将邮件发送请求放入队列const msg = {to: user.email,subject: '「xw」的注册成功通知',html: `您好,<span style="color: red">${user.nickname}</span>。<br><br>恭喜,您已成功注册会员!<br><br>xw`,};await mailProducer(msg);success(res, '创建用户成功。', { user }, 201);} catch (error) {failure(res, error);}
});
  • 顶部将发送邮件的方法去掉,并引用一下生产者
  • 将要发送的信息,改为对象格式。
  • 调用生产者方法,就将信息发送到队列中了。
4.2、 启动消费者

根目录的app.js。在里面加上:


require('dotenv').config();// 启动邮件消费者
const { mailConsumer } = require('./utils/rabbit-mq');
(async () => {await mailConsumer();console.log('邮件消费者已启动');
})();

这样只要 Node 项目一启动,消费者就会一直自动运行起来。在真实大型项目里,为了不影响主程序的稳定运行。更好的方式,应该将发送邮件解耦了,可以让消费者在专门的程序中独立启动

解耦封装

rabbitmqjs_292">1、封装生产者、队列/utils/rabbit-mq.js
const amqp = require('amqplib');
const sendMail = require('./mail');// 创建全局的 RabbitMQ 连接和通道
let connection;
let channel;// 封装一个重试连接的函数,增加连接的稳定性
const connectWithRetry = async (url, retries = 5, delay = 5000) => {let attempt = 0;while (attempt < retries) {try {return await amqp.connect(url);} catch (error) {attempt++;console.error(`RabbitMQ 连接尝试 ${attempt} 失败:`, error);if (attempt < retries) {await new Promise(resolve => setTimeout(resolve, delay));}}}throw new Error('无法连接到 RabbitMQ,已达到最大重试次数');
};/*** 连接到 RabbitMQ* @returns {Promise<*>}*/
const connectToRabbitMQ = async () => {if (connection && channel) return;try {connection = await connectWithRetry(process.env.RABBITMQ_URL);channel = await connection.createChannel();// 监听连接关闭事件,方便处理异常connection.on('close', () => {console.warn('RabbitMQ 连接已关闭,尝试重新连接...');connection = null;channel = null;});// 监听连接错误事件,增强错误处理能力connection.on('error', (err) => {console.error('RabbitMQ 连接发生错误:', err);});await channel.assertQueue('mail_queue', { durable: true });} catch (error) {console.error('RabbitMQ 连接失败:', error);throw error;}
};/*** 邮件队列生产者(发送消息)*/
const mailProducer = async (msg) => {try {await connectToRabbitMQ(); // 确保已连接// 消息持久化设置,提高消息可靠性const options = { persistent: true };const sent = channel.sendToQueue('mail_queue', Buffer.from(JSON.stringify(msg)), options);if (!sent) {console.warn('消息未能立即入队,等待下次机会');}} catch (error) {console.error('邮件队列生产者错误:', error);throw error;}
};/*** 邮件队列消费者(接收消息)*/
const mailConsumer = async () => {try {await connectToRabbitMQ();// 消费消息时,手动确认消息,避免消息丢失channel.consume('mail_queue', async (msg) => {if (msg) {try {const message = JSON.parse(msg.content.toString());await sendMail(message.to, message.subject, message.html);channel.ack(msg); // 手动确认消息} catch (error) {console.error('处理邮件消息时出错:', error);channel.nack(msg, false, true); // 消息处理失败,重新入队}}}, { noAck: false }); // 关闭自动确认console.log('邮件队列消费者已开始监听');} catch (error) {console.error('邮件队列消费者错误:', error);throw error;}
};module.exports = {mailProducer,mailConsumer,
};
2、封装消费者/utils/mail-consumer.js
require('dotenv').config();
const { mailConsumer } = require('./rabbit-mq');// 封装启动消费者的函数,方便后续扩展和错误处理
const startMailConsumer = async () => {try {await mailConsumer();console.log('邮件消费者已启动');} catch (error) {console.error('启动邮件消费者时出错:', error);process.exit(1);}
};// 启动消费者
startMailConsumer();// 监听进程信号,优雅关闭消费者
process.on('SIGINT', async () => {console.log('收到 SIGINT 信号,正在优雅关闭邮件消费者...');try {// 这里可以添加关闭连接和通道的逻辑process.exit(0);} catch (error) {console.error('关闭邮件消费者时出错:', error);process.exit(1);}
});

如果在app.js里面使用了消费者 把代码去掉

3、安装pm2
npm i pm2
4、根目录新建cosystem.config.js
module.exports = {apps: [{name: "express-app",script: "./bin/www", // 实际路径watch: process.env.NODE_ENV === 'development', // 根据环境变量决定是否开启监听interpreter: "node",env: {NODE_ENV: "development"},env_production: {NODE_ENV: "production"}},{name: "mail-consumer",script: "./utils/mail-consumer.js", // 实际路径interpreter: "node",env: {NODE_ENV: "development"},env_production: {NODE_ENV: "production"}}]
};
4、本地运行
pm2 start ecosystem.config.js

pm2 简单的方法

命令说明示例
pm2 start <app.js>启动一个 Node.js 应用程序。可指定 JavaScript 文件、JSON 配置文件或其他可执行文件。pm2 start app.js
pm2 list列出所有由 PM2 管理的应用程序,显示应用程序的状态、进程 ID、名称等信息。pm2 list
pm2 stop <app_name_or_id>停止指定名称或 ID 的应用程序,但不会从 PM2 的列表中删除。pm2 stop my_apppm2 stop 0
pm2 stop all停止所有由 PM2 管理的应用程序。pm2 stop all
pm2 restart <app_name_or_id>重启指定名称或 ID 的应用程序。pm2 restart my_apppm2 restart 0
pm2 restart all重启所有由 PM2 管理的应用程序。pm2 restart all
pm2 delete <app_name_or_id>从 PM2 的管理列表中删除指定名称或 ID 的应用程序,同时停止该应用程序。pm2 delete my_apppm2 delete 0
pm2 delete all从 PM2 的管理列表中删除所有应用程序。pm2 delete all
pm2 show <app_name_or_id>显示指定名称或 ID 的应用程序的详细信息,如环境变量、执行模式等。pm2 show my_apppm2 show 0
pm2 logs <app_name_or_id>显示指定名称或 ID 的应用程序的日志信息。若不指定则显示所有应用日志。pm2 logs my_apppm2 logs 0
pm2 monit实时监控由 PM2 管理的所有应用程序的 CPU 和内存使用情况。pm2 monit
pm2 save保存当前 PM2 管理的应用程序列表,以便在系统重启后自动恢复这些应用。pm2 save
pm2 resurrect恢复之前使用 pm2 save 保存的应用程序列表。pm2 resurrect
5、 宝塔中部署

启动选项

 pm2 start ecosystem.config.js --no-daemon

在这里插入图片描述


http://www.ppmy.cn/server/169980.html

相关文章

stm32单片机个人学习笔记16(SPI通信协议)

前言 本篇文章属于stm32单片机&#xff08;以下简称单片机&#xff09;的学习笔记&#xff0c;来源于B站教学视频。下面是这位up主的视频链接。本文为个人学习笔记&#xff0c;只能做参考&#xff0c;细节方面建议观看视频&#xff0c;肯定受益匪浅。 STM32入门教程-2023版 细…

Golang访问Google Sheet

步骤 1、创建Project https://console.cloud.google.com/welcome?hlzh-cn&projectvelvety-being-444310-c1 2、启用Google Sheet API https://console.cloud.google.com/apis/library?hlzh-cn&projectvelvety-being-444310-c1 3、创建服务账号 https://conso…

Maven导入hutool依赖报错-java: 无法访问cn.hutool.core.io.IORuntimeException 解决办法

欢迎大家来到我的博客~欢迎大家对我的博客提出指导&#xff0c;有错误的地方会改进的哦~点击这里了解更多内容 目录 一、报错二、解决办法 一、报错 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-captcha</artifactId> </de…

QT开发技术 [opencv加载onnx模型,dnn推理]

一、导出onnx 模型 yolo export modelxx\xx\best.pt formatonnx 二、qt加载onnx模型&#xff0c;推理显示 std::string fileName QCoreApplication::applicationDirPath().toStdString() "/Model/best.onnx";cv::dnn::Net net cv::dnn::readNetFromONNX(fileNam…

MYSQL的第一次

目录 前情提要 题目解析 连接并使用数据库 创建employees表 创建orders表? 创建invoices表?? ?查看建立的表 前情提要 需要下载mysql并进行配置&#xff0c;建议下载8.0.37&#xff0c;详情可见MySQL超详细安装配置教程(亲测有效)_mysql安装教程-CSDN博客 题目解析…

RabbitMQ介绍以及基本使用

文章目录 一、什么是消息队列&#xff1f; 二、消息队列的作用&#xff08;优点&#xff09; 1、解耦 2、流量削峰 3、异步 4、顺序性 三、RabbitMQ基本结构 四、RabbitMQ队列模式 1、简单队列模式 2、工作队列模式 3、发布/订阅模式 4、路由模式 5、主题模式 6、…

OpenMetadata MySQL数据质量治理实现分析

架构概览 #mermaid-svg-avONKLWf2EfDAaLY {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-avONKLWf2EfDAaLY .error-icon{fill:#552222;}#mermaid-svg-avONKLWf2EfDAaLY .error-text{fill:#552222;stroke:#552222;}#…

【深度学习】Python多线程/多进程在神经网络模型的应用实战

一、Pyhon多线程和多进程的理解和对比分析 1. 基本概念 1.1 多线程 定义&#xff1a;多线程是指一个程序同时运行多个线程&#xff0c;每个线程共享同一进程的内存空间。 特点&#xff1a; 线程之间可以共享全局变量、文件句柄等资源。线程切换开销较小&#xff0c;适合 I/…