Nodejs-异步并发控制

devtools/2024/9/22 14:38:29/

异步并发控制

在 node 中可以利用异步发起并行调用。但是如果并发量过大,就会导致下层服务器吃不消。

bagpipe 解决方案

解决方案

  • 通过一个队列来控制并发量
  • 如果当前活跃的异步调用小于限定值,从队列中取出执行
  • 如果活跃调用达到限定值,调用暂时放到队列中
  • 每个异步调用结束的时候,从队列中取出新的异步调用执行
javascript">var Bagpipe = require("bagpipe");
var bagpipe = new Bagpipe(10);
for (var i = 0; i < 100; i++) {bagpipe.push(async, function () {// 异步回调执行});
}
bagpipe.on("full", function (lenght) {console.warn("底层系统不能及时完成,队列拥堵");
});

核心实现

javascript">Bagpipe.prototype.push = function (method) {var args = [].slice.call(arguments, 1);var callback = args[args.length - 1];if (typeof callback !== "function") {args.push(function () {});}if (this.options.disabled || this.limit < 1) {method.apply(null, args);return this;}if (this.queue.length < this.queueLength || !this.options.refuse) {this.queue.push({method: method,args: args,});} else {var err = new Error("too much async call in queue");err.name = "TooMuchAsyncCallError";callback(err);}if (this.queue.length > 1) {this.emit("full", this.queue.lenght);}this.next();return this;
};

next()方法主要是用来判断活跃调用的数量,如果正常,使用内部方法 run 来执行真正的调用。

javascript">Bagpipe.prototype.next = function () {var that = this;if (that.active < that.limit && this.queue.length) {var req = this.queue.shift();this.run(req.method, req.args);}
};
javascript">Bagpine.prototype.run = function (method, args) {var that = this;that.active++;var callback = args[args.length - 1];var timer = null;var called = false;args[args.length - 1] = function (err) {if (timer) {clearTimeout(timer);timer = null;}if (!called) {this._next();callback.apply(null, arguments);} else {if (err) {that.emit("outdated", err);}}};var timeout = that.options.timeout;if (timeout) {timer = setTimeout(function () {called = true;that._next();// pass the exceptionvar err = new Error(timeout + "ms timeout");err.name = "BagpipeTimeoutError";err.data = {name: method.name,method: method.toString(),args: args.slice(0, -1),};callback(err);}, timeout);}method.apply(null, args);
};
  • 拒绝模式
    对于大量的异步调用,会分场景进行区分。设计到并发控制,会造成部分等待,如果调用由实时方面的需求,需要快速返回。这种情境下需要快速失败,让调用方竟早返回。
  • 超时控制
    超时控制是为异步调用设置一个时间阈值,如果异步调用没有在规定的时间内完成,先执行用户传入的回调函数。

async解决方案

paralleLimit()和parallel类似,多了一个限制并发数量的参数,使得任务只能同时并发一定数量,不是无限制并发。

javascript">async.parallelLimit([function(callback) {fs.readFile('file1.txt','utf-8',callback)},function(callback) {fs.readFile('file2.txt','utf-8',callback)}
], 1, function(err, results) {// todo
})

paralleLimit()方法不能动态的增加并行任务。async提供了queue()方法来满足需求。

javascript">var q = async.queue(function(file,callback) {fs.readFile(file, 'utf-8', callback);
},2)
q.drain = function () {};
fs.readdirSync('.').forEach(function (file) {q.push(file, function (err, data) {})
})

queue实现了动态添加并行任务,但是想不paralleLimit(),queue是固定的。丢失了paralleLimit()的多样性。


http://www.ppmy.cn/devtools/32630.html

相关文章

Docker部署RabbitMQ与简单使用

官网地址&#xff1a; Messaging that just works — RabbitMQ 我的Docker博客:Docker-CSDN博客 1.结构 其中包含几个概念&#xff1a; **publisher**&#xff1a;生产者&#xff0c;也就是发送消息的一方 **consumer**&#xff1a;消费者&#xff0c;也就是消费消息的一方 …

QT+串口调试助手+基本版

一、创建串口调试助手UI界面 1、首先生成串口连接必要参数界面&#xff0c;删除关闭串口控件 2、给参数下拉框添加常见的选项&#xff0c;删除关闭串口控件 3、将串口调试助手参数界面布局整齐&#xff0c;删除关闭串口控件 4、更改控件名字&#xff0c;方便后续编程&#xff…

分拣机器人也卷的飞起来了

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;老K。专注分享智能仓储物流技术、智能制造等内容。 新书《智能物流系统构成与技术实践》 智能制造-话题精读 1、西门子、ABB、汇川&#xff1a;2024中国工业数字化自动化50强 2、完整拆解&#xff1a;智能…

CUDA常量内存

常量内存是一种专用内存&#xff0c;用于只读数据和统一访问线程束中线程的数据。常量内存对于内核代码是只读的&#xff0c;但对主机而言是可读写的。 常量内存位于设备的DRAM上&#xff0c;并且有一个专用的片上缓存。从每个SM的常量缓存中读取的延迟&#xff0c;比直接从常…

CCF PTA 2023年5月C++天空之城的树

【问题描述】 拉姆达人在修建天空之城时&#xff0c;主要是依赖巨大的飞行石去维持悬空状态&#xff0c;依赖强壮的大树去作为建筑 物的框架&#xff0c;假设大树是一棵有 n(n≤10 3)个结点的二叉树。给出每个结点的两个子结点编号&#xff08;均不超 过 n&#xff09;&#x…

分布式与一致性协议之一致哈希算法(三)

一致哈希算法 如何使用一致哈希算法实现哈希寻址 我们一起来看一个例子&#xff0c;对于1000万个key的3节点KV存储&#xff0c;如果我们使用一致哈希算法增加1个节点&#xff0c;即3节点集群变为4节点集群&#xff0c;则只需要迁移24.3%的数据,如代码所示 package mainimpor…

Golang | Leetcode Golang题解之第68题文本左右对齐

题目&#xff1a; 题解&#xff1a; // blank 返回长度为 n 的由空格组成的字符串 func blank(n int) string {return strings.Repeat(" ", n) }func fullJustify(words []string, maxWidth int) (ans []string) {right, n : 0, len(words)for {left : right // 当前…

OceanBase 分布式数据库【信创/国产化】- OceanBase 配置项和系统变量概述

本心、输入输出、结果 文章目录 OceanBase 分布式数据库【信创/国产化】- OceanBase 配置项和系统变量概述前言OceanBase 数据更新架构OceanBase 配置项和系统变量概述配置项配置项分类配置项查询系统变量系统变量分类系统变量查询配置项与系统变量的区分OceanBase 分布式数据库…