前端开发中大并发量如何控制并发数

2024年 4月 30日 35.3k 0

写在前面

最近在进行移动端h5开发,首页需要加载的资源很多,一个lottie动效需要请求70多张图片,但是遇到安卓webview限制请求并发数,导致部分图片请求失败破图。当然图片资源可以做闲时加载和预加载,可以减轻播放动效时资源未加载的问题。

同样的,业务开发也会遇到需要异步请求几十个接口,如果同时并发请求浏览器会进行限制请求数,也会给后端造成请求压力。

场景说明

现在有个场景:

请你实现一个并发请求函数concurrencyRequest(urls, maxNum),要求如下:

  • 要求最大并发数 maxNum。
  • 每当有一个请求返回,就留下一个空位,可以增加新的请求。
  • 所有请求完成后,结果按照 urls 里面的顺序依次打出(发送请求的函数可以直接使用fetch即可)。

初始实现:

const preloadManger = (urls, maxCount = 5) => {
  let count = 0; // 计数 -- 用于控制并发数
  const createTask = () => {
    if (count  {
    count--;
    createTask();
  };

  createTask();
};

// 进行异步请求
const loader = async (url) => {
  const res = await fetch(url).then(res=>res.json());
  console.log("res",res);
  return res
}

const urls = [];
for (let i = 1; i urls的长度时,请求数为urls的长度。
  • 需要定义计数器count去判断是否全部请求完毕。
  • 无论请求成功与否,都应该将结果存在结果数组results中。
  • 结果数组results和urls数组的顺序保持一致,方便存取。
  • 代码实现

    在前面的初始实现的代码中,虽然都能满足基本需求,但是并没有考虑一些边界条件,对此需要根据上面设计思路重新实现得到:

    // 并发请求函数
    const concurrencyRequest = (urls, maxNum) => {
        return new Promise((resolve) => {
            if (urls.length === 0) {
                resolve([]);
                return;
            }
            const results = [];
            let index = 0; // 下一个请求的下标
            let count = 0; // 当前请求完成的数量
    
            // 发送请求
            async function request() {
                if (index === urls.length) return;
                const i = index; // 保存序号,使result和urls相对应
                const url = urls[index];
                index++;
                console.log(url);
                try {
                    const resp = await fetch(url);
                    // resp 加入到results
                    results[i] = resp;
                } catch (err) {
                    // err 加入到results
                    results[i] = err;
                } finally {
                    count++;
                    // 判断是否所有的请求都已完成
                    if (count === urls.length) {
                        console.log('完成了');
                        resolve(results);
                    }
                    request();
                }
            }
    
            // maxNum和urls.length取最小进行调用
            const times = Math.min(maxNum, urls.length);
            for(let i = 0; i < times; i++) {
                request();
            }
        })
    }

    测试代码:

    const urls = [];
    for (let i = 1; i  {
        console.log(res);
    })

    请求结果:

    上面代码基本实现了前端并发请求的需求,也基本满足需求,在生产中其实有很多已经封装好的库可以直接使用。比如:p-limit【https://github.com/sindresorhus/p-limit】

    阅读p-limit源码

    import Queue from 'yocto-queue';
    import {AsyncResource} from '#async_hooks';
    
    export default function pLimit(concurrency) {
     // 判断这个参数是否是一个大于0的整数,如果不是就抛出一个错误
     if (
      !((Number.isInteger(concurrency)
      || concurrency === Number.POSITIVE_INFINITY)
      && concurrency > 0)
     ) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
     }
    
     // 创建队列 -- 用于存取请求
     const queue = new Queue();
     // 计数
     let activeCount = 0;
    
     // 用来处理并发数的函数
     const next = () => {
      activeCount--;
    
      if (queue.size > 0) {
       // queue.dequeue()可以理解为[].shift(),取出队列中的第一个任务,由于确定里面是一个函数,所以直接执行就可以了;
       queue.dequeue()();
      }
     };
    
     // run函数就是用来执行异步并发任务
     const run = async (function_, resolve, arguments_) => {
      // activeCount加1,表示当前并发数加1
      activeCount++;
    
      // 执行传入的异步函数,将结果赋值给result,注意:现在的result是一个处在pending状态的Promise
      const result = (async () => function_(...arguments_))();
    
      // resolve函数就是enqueue函数中返回的Promise的resolve函数
      resolve(result);
    
      // 等待result的状态发生改变,这里使用了try...catch,因为result可能会出现异常,所以需要捕获异常;
      try {
       await result;
      } catch {}
    
      next();
     };
    
     // 将run函数添加到请求队列中
     const enqueue = (function_, resolve, arguments_) => {
      queue.enqueue(
       // 将run函数绑定到AsyncResource上,不需要立即执行,对此添加了一个bind方法
       AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
      );
    
      // 立即执行一个异步函数,等待下一个微任务(注意:因为activeCount是异步更新的,所以需要等待下一个微任务执行才能获取新的值)
      (async () => {
       // This function needs to wait until the next microtask before comparing
       // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
       // when the run function is dequeued and called. The comparison in the if-statement
       // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
       await Promise.resolve();
    
       // 判断activeCount是否小于concurrency,并且队列中有任务,如果满足条件就会将队列中的任务取出来执行
       if (activeCount  0) {
        // 注意:queue.dequeue()()执行的是run函数
        queue.dequeue()();
       }
      })();
     };
    
     // 接收一个函数fn和参数args,然后返回一个Promise,执行出队操作
     const generator = (function_, ...arguments_) => new Promise(resolve => {
      enqueue(function_, resolve, arguments_);
     });
    
     // 向外暴露当前的并发数和队列中的任务数,并且手动清空队列
     Object.defineProperties(generator, {
      // 当前并发数
      activeCount: {
       get: () => activeCount,
      },
      // 队列中的任务数
      pendingCount: {
       get: () => queue.size,
      },
      // 清空队列
      clearQueue: {
       value() {
        queue.clear();
       },
      },
     });
    
     return generator;
    }

    整个库只有短短71行代码,在代码中导入了yocto-queue库,它是一个微型的队列数据结构。

    手写源码

    在进行手撕源码时,可以借助数组进行简易的实现:

    class PLimit {
        constructor(concurrency) {
            this.concurrency = concurrency;
            this.activeCount = 0;
            this.queue = [];
            
            return (fn, ...args) => {
                return new Promise(resolve => {
                   this.enqueue(fn, resolve, args);
                });
            }
        }
        
        enqueue(fn, resolve, args) {
            this.queue.push(this.run.bind(this, fn, resolve, args));
    
            (async () => {
                await Promise.resolve();
                if (this.activeCount  0) {
                    this.queue.shift()();
                }
            })();
        }
        
        async run(fn, resolve, args) {
            this.activeCount++;
    
            const result = (async () => fn(...args))();
    
            resolve(result);
    
            try {
                await result;
            } catch {
            }
    
            this.next();
        }
        
        next() {
            this.activeCount--;
    
            if (this.queue.length > 0) {
                this.queue.shift()();
            }
        }
    }

    小结

    在这篇文章中,简要介绍了为什么要进行并发请求,阐述了使用请求池队列实现并发请求的设计思路,简要实现代码。

    此外,还阅读分析了p-limit的源码,并使用数组进行简要的源码编写,以实现要求。

    参考文章

    • 【源码共读】大并发量如何控制并发数https://juejin.cn/post/7179220832575717435?searchId=20240430092814392DC2208C545E691A26
    • 前端实现并发控制网络请求https://mp.weixin.qq.com/s/9uq2SqkcMSSWjks0x7RQJg。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论