如何限制大量请求并发

news/2025/1/11 1:55:33/

前言:

1、主流浏览器在 HTTP/1.1 下对同一域名的最大并发请求数通常是 6~8 个。超过限制的请求会进入队列,等待空闲的连接。
2、可以利用Promise模拟任务队列,控制并发请求数量,以避免对服务器造成过大的压力。(先进先出)

代码

一个简单的请求队列调度器,用于控制并发请求的最大数量

import axios from 'axios'export const handQueue = (reqs // 请求总数
) => {reqs = reqs || []const requestQueue = (concurrency) => {concurrency = concurrency || 6 // 最大并发数const queue = [] // 请求池let current = 0const dequeue = () => {while (current < concurrency && queue.length) {current++;const requestPromiseFactory = queue.shift() // 出列requestPromiseFactory().then(() => { // 成功的请求逻辑}).catch(error => { // 失败console.log(error)}).finally(() => {current--dequeue()});}}return (requestPromiseFactory) => {queue.push(requestPromiseFactory) // 入队dequeue()}}const enqueue = requestQueue(6)for (let i = 0; i < reqs.length; i++) {enqueue(() => axios.get('/api/test' + i))}
}

功能拆解

  1. handQueue 函数:
    参数 reqs: 是一个数组,包含需要发送的请求。
    函数的主要目的是对这些请求进行队列管理,确保并发请求的数量不会超过设定的上限。

  2. requestQueue 函数:
    concurrency:最大并发数
    enqueue:用于将新的请求添加到队列并处理它们
    queue: 请求池,用于存储待处理的请求。
    current: 当前正在执行的请求数。

  3. dequeue 函数:

    1、从请求池中取出请求并发送。它在一个循环中运行,直到当前并发请求数current达到最大并发数concurrency或请求池queue为空。
    2、对于每个出队的请求,它首先增加current的值,然后调用请求函数requestPromiseFactory来发送请求。
    3、当请求完成(无论成功还是失败)后,它会减少current的值并再次调用dequeue,以便处理下一个请求。
    
    const dequeue = () => {  while (current < concurrency && queue.length) {  current++;  const requestPromiseFactory = queue.shift() // 出列  requestPromiseFactory()  .then(() => { // 成功的请求逻辑  })  .catch(error => { // 失败  console.log(error)  })  .finally(() => {  current--  dequeue()  });  }  
    }
    
    // 定义返回请求入队函数
    return (requestPromiseFactory) => {  queue.push(requestPromiseFactory) // 入队  dequeue()  
    }
    // 函数返回一个函数,这个函数接受一个参数requestPromiseFactory,表示一个返回Promise的请求工厂函数。
    // 这个返回的函数将请求工厂函数加入请求池queue,并调用dequeue来尝试发送新的请求,
    // 当然也可以自定义axios,利用Promise.all统一处理返回后的结果。
    

    使用一个 while 循环,当当前请求数 current 小于最大并发数 concurrency 并且队列 queue 中有待处理请求时:
    1、从队列中取出第一个请求工厂 requestPromiseFactory。
    2、调用 requestPromiseFactory() 开始请求。
    3、使用 then 和 catch 处理请求成功和失败的逻辑。
    4、在 finally 中,减少当前请求数 current,并递归调用 dequeue,以确保队列持续运行。

  4. 分发逻辑:
    for 循环遍历传入的请求数组 reqs。
    对于每个请求,通过 enqueue 将其加入队列。
    每个请求通过工厂函数 () => axios.get(‘/api/test’ + i) 包装,保证每次调用都返回新的请求 Promise。

优化

  1. 超时管理:
    如果某个请求卡住,会阻塞队列,可为请求设置超时,例如:

    const timeout = (promise, ms) =>Promise.race([promise,new Promise((_, reject) => setTimeout(() => reject('Timeout'), ms)),]);
    
  2. 错误处理:
    重试或告警机制:

     const retry = (fn, retries = 3) =>fn().catch(err => (retries > 0 ? retry(fn, retries - 1) : Promise.reject(err)));
    
  3. 队列清空检测:
    在所有请求处理完成后,触发回调或事件通知:

    if (queue.length === 0 && current === 0) {console.log('All requests completed');
    }
    
  4. 优化后的代码

    import axios from "axios";
    /*** 处理并发请求队列* @param {Array<Function>} reqs 请求总数* @param {Object} options 配置选项* @param {number} options.concurrency 最大并发数* @param {number} options.timeout 请求超时时间 (ms)* @param {Function} options.onQueueEmpty 队列清空后的回调*/
    export const handQueue = (reqs, options = {}) => {const { concurrency = 6, timeout = 10000, onQueueEmpty = () => {} } = options;// 确保请求队列为数组reqs = reqs || [];const queue = [];let current = 0;// 超时管理器const timeoutRequest = (requestPromise, timeout) =>new Promise((resolve, reject) => {const timer = setTimeout(() => reject(new Error("Request timeout")), timeout);requestPromise.then((res) => {clearTimeout(timer);resolve(res);}).catch((err) => {clearTimeout(timer);reject(err);});});// 出列处理const dequeue = async () => {while (current < concurrency && queue.length) {current++;const requestPromiseFactory = queue.shift(); // 出列try {await timeoutRequest(requestPromiseFactory(), timeout); // 包装超时处理console.log("Request success");} catch (error) {console.error("Request failed:", error.message); // 错误处理} finally {current--;dequeue();// 如果队列为空且没有正在处理的请求,调用队列清空回调if (queue.length === 0 && current === 0) {onQueueEmpty();}}}};// 入队函数const enqueue = (requestPromiseFactory) => {queue.push(requestPromiseFactory);dequeue();};// 将请求添加到队列for (let i = 0; i < reqs.length; i++) {enqueue(() => axios.get(`/api/test${i}`));}
    };
    
  5. 使用

    const reqs = Array.from({ length: 20 }, (_, i) => () => axios.get(`/api/test${i}`));
    handQueue(reqs, {concurrency: 4, // 最大并发数timeout: 5000,  // 超时时间 5 秒onQueueEmpty: () => {console.log("All requests have been processed!");},
    });
    

http://www.ppmy.cn/news/1562122.html

相关文章

Win11+WSL2+Ubuntu24.04安装Genesis并实现正常运行

什么是Genesis Genesis是一款由卡内基梅隆大学、马里兰大学、斯坦福大学、麻省理工学院等全球顶尖研究机构联合开发的开源生成式物理引擎。于2024年12月19日正式开源。它专为机器人、嵌入式 AI 和物理 AI 应用设计&#xff0c;能够模拟各种物理现象&#xff08;如刚体、液体、…

识日 1.2.8 | 完全免费的日语学习软件,小清新风格

识日是一款免费的日语学习软件&#xff0c;界面简洁美观&#xff0c;支持主题色设置。它提供了日语50音学习、单词查询、日语词典等功能&#xff0c;并有详细的发音和笔画展示。用户可以通过多种模式背单词&#xff0c;如快速模式、洗脑模式&#xff08;重复朗读单词&#xff0…

REVERSE-COMPETITION-CCSSSC-2025

REVERSE-COMPETITION-CCSSSC-2025 donntyouseeHappyLockkernel_traffic donntyousee elf64&#xff0c;ida反编译不太行&#xff0c;有花指令&#xff0c;直接调汇编 读输入 读输入前有条打印”plz input your flag”&#xff0c;肯定是在.init_array&#xff0c;确实有很多 …

vulhub-deathnote靶场实战

扫描靶机地址 访问网址访问不到&#xff0c;扫描端口 namp -sS -sV -T5 -A -p- 192.168.249.128 目录扫描&#xff1a; dirsearch -u 192.168.8.6 -i 200 gobuster gobuster dir -u http://192.168.8.6 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -x …

AI赋能金融服务:效率与安全的新高度

引言 金融服务行业正经历着前所未有的变革。日益增长的客户需求、日趋复杂的金融产品以及日益严峻的监管环境&#xff0c;都对金融机构提出了更高的要求。它们面临着提升效率、有效管控安全风险以及优化用户体验等诸多挑战。传统的人工操作模式已难以满足快速发展的市场需求。…

『SQLite』SELECT语句查询数据

通过 SQLite 的 SELECT 语法从指定数据库表中查询数据。 查询数据 SELECT 语句查询数据。 查询案例 sqlite_master 表简介 该表是 SQLite 数据库用于存储创建表、索引等的一张表。 注意 上述内容详讲见文章&#xff1a;SQLite 的 SELECT 操作&#xff08;内含案例&#xff…

GAN的应用

5、GAN的应用 ​ GANs是一个强大的生成模型&#xff0c;它可以使用随机向量生成逼真的样本。我们既不需要知道明确的真实数据分布&#xff0c;也不需要任何数学假设。这些优点使得GANs被广泛应用于图像处理、计算机视觉、序列数据等领域。上图是基于GANs的实际应用场景对不同G…

golang中的 break label, goto label,continue label,break如何跳出多层循环

goto label goto 可以无条件的跳转执行的位置&#xff0c;但是不能跨函数。 label 不是代码块&#xff0c;可以理解为它是一个 position&#xff0c;直接跳到这个 position 然后接着向下运行。 func worker4() {fmt.Println(1)goto labelfmt.Println(2) label:fmt.Println(3…