编程深水区之并发③:Node.js的并发编程

server/2024/9/23 9:26:05/

在Node里耍多线程和多进程,会不会闪到腰?!

一、Node和JS的关系

  1. Node是JS的运行环境。最初JS只在浏览器中运行,它依赖于浏览器的JS引擎(如Chrome的V8、Firefox的SpiderMonkey)。Node从Chrome中获得灵感,并直接套用了V8引擎,让JS可以在服务器端运行,意味着可以使用JS来编写控制台应用、创建API和构建网络服务器。
  2. Node的核心部分使用C++编写,比如用于解析和执行JS的V8引擎(就是Chrome中的V8引擎),用于操作异步任务的libuv库,以及许多核心库(如fs、net、stream、worker_thread、child_process等)。Node将这些使用C++编写的底层功能,包装成JS接口,使我们可以使用JS来编写应用程序。
  3. 我们使用Vue或者React开发前端应用时,都要安装Node。调试运行时,实际上就是使用Node作为开发时服务器,调试时网站运行在本地Node服务器上,编译、打包和发布工作,则由Webpack或Vite来完成。
  4. JS的确是单线程,它处理并发的机制是非阻塞的事件循环机制。但Node的核心功能是C++编写的,自然支持创建多线程或多进程来完成异步操作

二、Node多线程

Node提供worker_threads模块,用于手动创建多线程。之前有介绍用于异步操作的libuv库,它主要作用于Node单线程事件循环机制中的异步操作,主线程碰到异步任务时,会把它扔给libuv,libuv完成后扔到任务队列里。libuv的高效,源于它基于线程池,这有些类似于C#的TPL了,后面章节再细说。
worker_threads并不依赖于libuv,它直接依赖于底层操作系统的线程实现,在每个Worker中,有独立的V8引擎和上下文。这至少意味着:(1)创建线程的代价是比较高的;(2)线程之间相互独立,需要依靠消息机制通讯,通讯开销会比较大。所以,没事别乱开线程。

2.1 worker_thread的常用API

  • Worker类:用来创建和管理Worker线程。一个Worker线程可以执行与主线程并行的任务。
  • isMainThread:一个布尔值,指示代码是否在主线程中运行。
  • parentPort:这是主线程与工作线程之间的通信通道。
  • workerData:这是传递给Worker线程的初始数据。可以在创建Worker时传递,并在Worker中访问。
  • MessageChannel:用于创建两个相互连接的通信端口,允许在不同线程之间进行通信。
  • SharedArrayBuffer/Atomics:允许在多个线程之间共享数据时,并进行安全的原子操作。

2.2 主线程和Worker线程通讯

//main.js
//以下文件包含了主线程和Worker线程的代码,通过isMainThread来判断
//实际开发中,一般将Worker线程的代码放到单独的JS文件中
const { Worker, isMainThread, parentPort, workerData }= require('worker_threads');if (isMainThread) { // 以下代码在主线程中=======================// 创建一个新的Worker线程// __filename指向当前文件路径,表示Worker也在当前文件中//workerData,用于向Worker线程传递数据const worker = new Worker(__filename, {workerData: { start: 1, end: 100 }});//主线程监听Worker线程,message、error和exit是事件名称//监听从Worker线程发来的消息//result为Worker线程中postMessage出来的值worker.on('message', result => {console.log(`Result from worker: ${result}`);});//监听Worker线程中的错误worker.on('error', error => {console.error(`Worker error: ${error}`);});//监听Worker线程的结束状态worker.on('exit', code => {if (code !== 0) {console.error(`Worker stopped with exit code ${code}`);} else {console.log('Worker finished successfully');}});} else { // 以下代码在Worker线程中============================// 访问workerDataconst { start, end } = workerData;// 简单的计算任务:计算[start, end]范围内的和let sum = 0;for (let i = start; i <= end; i++) {sum += i;}// 发送结果给主线程parentPort.postMessage(sum);
}

2.3 Worker线程之间通过MessageChannel通讯

//main.js,主线程================================================
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');// 创建一个新的MessageChannel,解构出两个通讯端口
const { port1, port2 } = new MessageChannel();// 创建第一个Worker
const worker1 = new Worker(path.resolve(__dirname, 'worker1.js'));// 创建第二个Worker
const worker2 = new Worker(path.resolve(__dirname, 'worker2.js'));// 将port1发送给worker1,port2发送给worker2
worker1.postMessage({ port: port1 }, [port1]);
worker2.postMessage({ port: port2 }, [port2]);//worker1.js,Worker1线程=========================================
const { parentPort } = require('worker_threads');//第一个message来自主线程,可获取通讯端口
parentPort.on('message', (message) => {const port = message.port;//第二个message来自其它Worker线程port.on('message', (msg) => {console.log(`Worker1 received: ${msg}`);// 发送响应消息port.postMessage('Hello from Worker1');});
});//worker2.js,Worker2线程=========================================
const { parentPort } = require('worker_threads');parentPort.on('message', (message) => {const port = message.port;port.postMessage('Hello from Worker2');port.on('message', (msg) => {console.log(`Worker2 received: ${msg}`);});
});

2.4 不同线程间共享信息SharedArrayBuffer 和Atomics

//使用SharedArrayBuffer 和Atomics,可以绕过消息通讯,极大提升多线程通讯性能/main.js,主线程===============================================
const { Worker } = require('worker_threads');
const path = require('path');//创建一个共享的ArrayBuffer,并初始化
const sharedBuffer = new SharedArrayBuffer(4); // 4字节的共享内存
const sharedArray = new Int32Array(sharedBuffer);
sharedArray[0] = 0;//创建两个Worker线程
const worker1 = new Worker(path.resolve(__dirname, 'worker.js'), { workerData: { sharedBuffer } 
});
const worker2 = new Worker(path.resolve(__dirname, 'worker.js'), { workerData: { sharedBuffer } 
});//监听Worker的消息
worker1.on('message', (msg) => console.log('From Worker1:', msg));
worker2.on('message', (msg) => console.log('From Worker2:', msg));//等待Workers完成任务
worker1.on('exit', () => console.log('Worker1 exited'));
worker2.on('exit', () => console.log('Worker2 exited'));//worker.js,Worker线程=========================================
const { parentPort, workerData } = require('worker_threads');
const { SharedArrayBuffer, Atomics } = require('atomics');// 获取共享的ArrayBuffer
const sharedArray = new Int32Array(workerData.sharedBuffer);// 使用Atomics,安全的进行原子操作
Atomics.add(sharedArray, 0, 1); // 增加共享数组的第一个元素// 获取当前值
const currentValue = Atomics.load(sharedArray, 0);// 向主线程发送消息
parentPort.postMessage(`Current value: ${currentValue}`);

三、Node多进程

Node不仅可以创建议多线程,还允许创建子进程。每个子进程,都是一个Node实例,有独立的V8引擎、Node运行时和内存资源。相比多线程,自然是更加消耗资源的(10-30M)。
Node提供了child_process和cluster两个模块用于创建和管理子进程。cluster建立在child_process之上,内置了负载均衡和自动重启机制,可以更加高效的利用CPU的多核性能,是专为Node服务器应用设计的。

3.1 Child Process模块

child_process模块提供了spawn、exec、execFile、fork等方法用于创建子进程:

  • **spawn**:用于启动一个新的进程,可以与其进行数据流的交互。
  • **exec**:用于运行一个命令,并将输出(stdout 和 stderr)作为回调函数的参数返回。
  • **execFile**:与exec类似,但更适合直接执行文件而不是命令字符串。
  • **fork**:专门用于创建新的Node.js进程,并且它有专门的通信通道,适用于父进程和子进程之间传递消息。

(1)spawn方法
它是最基本的创建子进程的方法,适用于需要与子进程进行长时间交互的场景。

//在子进程中,执行系统操作命令===================================
const { spawn } = require('child_process');// 示例:执行 ls 命令,[]中为参数
//spawn方法返回ChildProcess对象,具有 stdout、stderr、stdin属性
//分别对应子进程的标准输出、标准错误和标准输入流。
const ls = spawn('ls', ['-lh', '/usr']);// 监听子进程的标准输出
ls.stdout.on('data', (data) => {console.log(`stdout: ${data}`);
});// 监听子进程的标准错误输出
ls.stderr.on('data', (data) => {console.error(`stderr: ${data}`);
});// 监听子进程的退出事件
ls.on('close', (code) => {console.log(`子进程退出,退出码 ${code}`);
});//在子进程中,执行JS脚本=========================================
const { spawn } = require('child_process');// 示例:执行 JavaScript 文件
const child = spawn('node', ['script.js']);child.stdout.on('data', (data) => {console.log(`子进程 stdout:\n${data}`);
});child.stderr.on('data', (data) => {console.error(`子进程 stderr:\n${data}`);
});child.on('close', (code) => {console.log(`子进程退出,退出码 ${code}`);
});

(2)exec方法
相比 spawn() 方法,它将整个命令(包括参数)作为一个字符串传递给底层的 shell执行,适合于简单的命令和短期执行的任务。

//在子进程中,执行系统操作命令===================================
const { exec } = require('child_process');// 示例:执行 ls 命令
//直接在回调中监听
exec('ls -lh /usr', (error, stdout, stderr) => {if (error) {console.error(`执行错误:${error.message}`);return;}if (stderr) {console.error(`stderr: ${stderr}`);return;}console.log(`stdout: ${stdout}`);
});//在子进程中,执行JS脚本========================================
const { exec } = require('child_process');// 示例:执行 JavaScript 文件
exec('node script.js', (error, stdout, stderr) => {if (error) {console.error(`执行错误:${error.message}`);return;}if (stderr) {console.error(`子进程 stderr:\n${stderr}`);return;}console.log(`子进程 stdout:\n${stdout}`);
});

(3)execFile方法
execFile() 方法与 exec() 类似,但需要显式指定可执行文件的路径和参数列表,不会调用系统的 shell

//在子进程中,执行系统操作命令===================================
const { execFile } = require('child_process');// 示例:执行 node 命令
execFile('node', ['--version'], (error, stdout, stderr) => {if (error) {console.error(`执行错误:${error.message}`);return;}console.log(`stdout: ${stdout}`);
});//在子进程中,执行JS脚本========================================
const { execFile } = require('child_process');// 示例:执行 JavaScript 文件
execFile('node', ['script.js'], (error, stdout, stderr) => {if (error) {console.error(`执行错误:${error.message}`);return;}console.log(`子进程 stdout:\n${stdout}`);
});

(4)fork方法
fork()是Node.js特有的,用于创建一个新的Node.js进程,并且会在父进程和子进程之间创建一个通信通道。它非常适合在多进程架构中进行进程间通信(IPC)

//main.js=======================================================
const { fork } = require('child_process');const child = fork('./child.js');
child.on('message', (msg) => {console.log(`Message from child: ${msg}`);
});
child.send('Hello from parent');// child.js=====================================================
process.on('message', (msg) => {console.log(`Message from parent: ${msg}`);process.send('Hello from child');
});

3.2 Cluster模块

cluster模块,基于child_process的fork方法,在此基础上,增加了负载均衡和自动重启等高级功能:

  • Cluster模块允许创建多个工作进程,每个工作进程都是一个独立的 Node.js 实例,可以在不同的 CPU 核心上运行。
  • 每个工作进程都可以处理客户端请求,共享同一个 TCP 连接,从而提高服务器的并发处理能力和吞吐量。
  • Cluster 模块默认使用轮询(Round-Robin)策略将客户端连接分发到各个工作进程,实现负载均衡。可以通过配置自定义的负载均衡策略,如基于请求次数、CPU 负载等来动态分配客户端请求。
  • 主进程(Master)负责管理所有工作进程的生命周期,包括启动、停止和重启等。当一个工作进程崩溃或退出时,主进程可以自动重新启动该工作进程,保持应用程序的稳定性。
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length; //CPU核数if (cluster.isMaster) { //以下代码在主进程中执行console.log(`主进程 ${process.pid} 正在运行`);// 衍生工作进程for (let i = 0; i < numCPUs; i++) {cluster.fork(); //创建多个子进程}// 监听工作进程的退出事件cluster.on('exit', (worker, code, signal) => {console.log(`工作进程 ${worker.process.pid} 已退出`);});} else { //以下代码在子进程中执行// 每个工作进程都可以共享一个 TCP 连接// 这里是一个 HTTP 服务器示例http.createServer((req, res) => {res.writeHead(200);res.end('Hello World\n');}).listen(8000);console.log(`工作进程 ${process.pid} 已启动`);
}

*这是一个系列文章,将全面介绍多线程、协程和单线程事件循环机制,建议收藏、点赞哦!
*你在并发编程过程中碰到了哪些难题?欢迎评论区交流~~~


我是functionMC > function MyClass(){…}
C#/TS/鸿蒙/AI等技术问题,以及如何写Bug、防脱发、送外卖等高深问题,都可以私信提问哦!

Node并发编程.png


http://www.ppmy.cn/server/96272.html

相关文章

2024.8.6 作业

1> 使用消息队列完成两个进程之间相互通信 snd.c #include <myhead.h>struct msgbuf {long mtype;char mtext[1024]; };#define SIZE sizeof(struct msgbuf)-sizeof(long)int main(int argc,const char *argv[]) {pid_t pid fork();if(pid-1){perror("fork er…

DataEase安装和部署(超细教程)

概述: DataEase 是开源的数据可视化分析工具,帮助用户快速分析数据并洞察业务趋势,从而实现业务的改进与优化。DataEase 支持丰富的数据源连接,能够通过拖拉拽方式快速制作图表,并可以方便的与他人分享。 DataEase 的优势: 开源开放:零门槛,线上快速获取和安装,按月…

rsync远程同步+inotify监控

一、概述 1、关于rsync rsync远程同步&#xff1a;是开源的快速备份工具&#xff0c;可以在不同主机之间同步整个目录 在远程同步任务中&#xff0c;负责发起rsync同步操作的客户机称为发起端&#xff08;服务端&#xff09;&#xff0c;而负责响应来自客户机的rsync同步操作…

电脑自动重启是什么原因?重启原因排查和解决办法!

当你的电脑突然毫无预警地自动重启&#xff0c;不仅打断了工作流程&#xff0c;还可能导致未保存的数据丢失&#xff0c;这无疑令人很懊恼&#xff0c;那么&#xff0c;电脑自动重启是什么原因呢&#xff1f;有什么方法可以解决呢&#xff1f;别担心&#xff0c;在大多数情况下…

MATLAB(7)潮汐模型

一、前言 在MATLAB中模拟潮汐通常涉及到使用潮汐的理论模型&#xff0c;如调和常数模型&#xff08;Harmonic Constants Model&#xff09;&#xff0c;它基于多个正弦和余弦函数的叠加来近似潮汐高度随时间的变化。以下是一个简化的MATLAB代码示例&#xff0c;用于模拟一个基于…

C++笔记之编译过程和面向对象

回顾&#xff1a; “abcd”//数据类型 字符串常量 const char *p"abc"; new STU const char *//8 指针的内存空间 int float 指针的内存空间 p 指针指向的内存空间 "abc" 取决于字符串长度 指针变量的内容一级指针 指针变量的地址二级指针 …

LearnOpenGL之摄像机

前序 AndroidLearnOpenGL是本博主自己实现的LearnOpenGL练习集合&#xff1a; Github地址&#xff1a;https://github.com/wangyongyao1989/AndroidLearnOpenGL 系列文章&#xff1a; 1、LearnOpenGL之入门基础 2、LearnOpenGL之3D显示 3、LearnOpenGL之摄像机 4、LearnOpenG…

RabbitMq架构原理剖析及应用

文章目录 RabbitMQ 架构组件1. **Broker** (Broker Server)2. **Exchange**3. **Queue**4. **Producer** (消息生产者)5. **Consumer** (消息消费者)6. **Virtual Hosts** (虚拟主机) 工作流程内部原理1. **队列管理**2. **集群**3. **持久化与内存**4. **性能优化** 高级特性1…