Nodejs-异步并发控制

server/2024/10/18 19:22:20/

异步并发控制

在 node 中可以利用异步发起并行调用。但是如果并发量过大,就会导致下层服务器吃不消。

bagpipe 解决方案

解决方案

  • 通过一个队列来控制并发量
  • 如果当前活跃的异步调用小于限定值,从队列中取出执行
  • 如果活跃调用达到限定值,调用暂时放到队列中
  • 每个异步调用结束的时候,从队列中取出新的异步调用执行
javascript">var Bagpipe = require("bagpipe");
var bagpipe = new Bagpipe(10);
for (var i = 0; i < 100; i++) {bagpipe.push(async, function () {// 异步回调执行});
}
bagpipe.on("full", function (lenght) {console.warn("底层系统不能及时完成,队列拥堵");
});

核心实现

javascript">Bagpipe.prototype.push = function (method) {var args = [].slice.call(arguments, 1);var callback = args[args.length - 1];if (typeof callback !== "function") {args.push(function () {});}if (this.options.disabled || this.limit < 1) {method.apply(null, args);return this;}if (this.queue.length < this.queueLength || !this.options.refuse) {this.queue.push({method: method,args: args,});} else {var err = new Error("too much async call in queue");err.name = "TooMuchAsyncCallError";callback(err);}if (this.queue.length > 1) {this.emit("full", this.queue.lenght);}this.next();return this;
};

next()方法主要是用来判断活跃调用的数量,如果正常,使用内部方法 run 来执行真正的调用。

javascript">Bagpipe.prototype.next = function () {var that = this;if (that.active < that.limit && this.queue.length) {var req = this.queue.shift();this.run(req.method, req.args);}
};
javascript">Bagpine.prototype.run = function (method, args) {var that = this;that.active++;var callback = args[args.length - 1];var timer = null;var called = false;args[args.length - 1] = function (err) {if (timer) {clearTimeout(timer);timer = null;}if (!called) {this._next();callback.apply(null, arguments);} else {if (err) {that.emit("outdated", err);}}};var timeout = that.options.timeout;if (timeout) {timer = setTimeout(function () {called = true;that._next();// pass the exceptionvar err = new Error(timeout + "ms timeout");err.name = "BagpipeTimeoutError";err.data = {name: method.name,method: method.toString(),args: args.slice(0, -1),};callback(err);}, timeout);}method.apply(null, args);
};
  • 拒绝模式
    对于大量的异步调用,会分场景进行区分。设计到并发控制,会造成部分等待,如果调用由实时方面的需求,需要快速返回。这种情境下需要快速失败,让调用方竟早返回。
  • 超时控制
    超时控制是为异步调用设置一个时间阈值,如果异步调用没有在规定的时间内完成,先执行用户传入的回调函数。

async解决方案

paralleLimit()和parallel类似,多了一个限制并发数量的参数,使得任务只能同时并发一定数量,不是无限制并发。

javascript">async.parallelLimit([function(callback) {fs.readFile('file1.txt','utf-8',callback)},function(callback) {fs.readFile('file2.txt','utf-8',callback)}
], 1, function(err, results) {// todo
})

paralleLimit()方法不能动态的增加并行任务。async提供了queue()方法来满足需求。

javascript">var q = async.queue(function(file,callback) {fs.readFile(file, 'utf-8', callback);
},2)
q.drain = function () {};
fs.readdirSync('.').forEach(function (file) {q.push(file, function (err, data) {})
})

queue实现了动态添加并行任务,但是想不paralleLimit(),queue是固定的。丢失了paralleLimit()的多样性。


http://www.ppmy.cn/server/33027.html

相关文章

C#知识|泛型集合List相关方法

哈喽&#xff0c;你好&#xff0c;我是雷工&#xff01; 以下为泛型集合List相关方法的学习笔记。 01 集合定义 集合定义的时候&#xff0c;无需规定元素的个数。 02 泛型说明 泛型表示一种程序特性&#xff0c;也就是在定义的时候&#xff0c;无需指定特定的类型&#xff…

SQL 基础 | AVG 函数的用法

在SQL中&#xff0c;AVG()是一个聚合函数&#xff0c;用来计算某个列中所有值的平均值。 它通常与GROUP BY子句一起使用&#xff0c;以便对分组后的数据进行平均值计算。 AVG()函数在需要了解数据集中某个数值列的中心趋势时非常有用。 以下是AVG()函数的一些常见用法&#xff…

vue路由懒加载是什么

Vue路由懒加载是一种优化技术&#xff0c;旨在减少应用程序的初始加载时间并提高性能。具体来说&#xff0c;它允许我们在用户实际需要访问某个路由时&#xff0c;才加载对应的组件代码&#xff0c;而不是在应用程序启动时一次性加载所有组件。 举个例子来说明Vue路由懒加载的…

CMakeLists.txt语法规则:部分常用命令说明一

一. 简介 前一篇文章简单介绍了CMakeLists.txt 简单的语法。文章如下&#xff1a; CMakeLists.txt 简单的语法介绍-CSDN博客 接下来对 CMakeLists.txt语法规则进行具体的学习。本文具体学习 CMakeLists.txt语法规则中常用的命令。 二. CMakeLists.txt语法规则&#xff1a;…

DDD:根据maven的脚手架archetype生成ddd多模块项目目录结构

随着领域驱动的兴起&#xff0c;很多人都想学习如何进行ddd的项目开发&#xff0c;那ddd的项目结构是怎么样的&#xff1f;又是如何结合SpringBoot呢&#xff1f;那么针对这个问题&#xff0c;笔者使用maven的archetype封装一个相对通用的ddd的项目目录&#xff0c;方便一键生成…

抖音直播间小风车怎么挂?直播间小风车跳转微信怎么开通!

抖音直播已经成为了一个非常受欢迎的直播平台&#xff0c;而在直播间引流也是用户非常关注的一个话题。而针对这个问题&#xff0c;抖音也提供了一种非常好用的小工具——小风车&#xff0c;可以帮助用户在直播间进行引流。那么&#xff0c;抖音直播间小风车怎么挂&#xff1f;…

【开发技巧】青龙面板cookie过期

用青龙面板挂JD时&#xff0c;很多人要求的是通过手机上的特殊的浏览器&#xff08;开放Cookie获取功能的浏览器&#xff09;&#xff0c;例如Alook。但是我使用的是ios系统&#xff0c;就想直接利用电脑上浏览器获取cookie。&#xff08;获取教程有很多&#xff0c;在此笔者就…

【docker问题记录】虚拟机ubuntu22.04使用docker-compose出现容器不能ping通宿主机所在局域网的情况

直接说结论 原因可能是因为这个版本的ubuntu启动后有时会丢失网络图标此时宿主机相当于没有联网&#xff0c;但是docker相关进程已经启动&#xff0c;使用的是无效的网络配置&#xff0c;哪怕此时再用sudo nmcli networking on启动了网络依然docker-compose容器依然无法联网&a…