0%

Promise控制并发

问题:当我们的应用瞬间发出很多请求,比如几十万http请求时,或者堆积了无数调用栈导致内存溢出,这个时候需要我们对http的连接数做限制

思路:

初始化一个pool数组作为并发池,然后先循环把并发池塞满,不断调用addTask,通过自定义请求函数request,每个任务task是一个Promise对象包装,执行完就pop出连接池,然后将新任务添加进并发池pool

方法一:不通过Promise.race:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
let urls = ["1", "2", "3", "4", "5", "6", "7"];
let pool = []; //并发池
let max = 3; //最大并发数量
//自定义请求函数
function request(url) {
return new Promise((resolve) => {
setTimeout(() => {
resolve(url);
console.log(`任务${url}完成`);
}, 1000);
}).then((res) => {
console.log("外部逻辑", res);
});
}
function addtask(url){
let task = request(url);
pool.push(task);
task.then(res=>{
//任务完成,从并发池中删除
pool.splice(pool.indexof(task),1);
cosole.log(`${url}完成,当前并发数为${pool.length}`)
//每当并发池结束一个任务,就再塞入一个任务
url=url.shift();
if(url!==undefined){
addTask(url);
}
})
}
//先把并发池塞满
while(pool.length<max){
let url=urls.shift();
addTask(url);
}

Promise.race()实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
let urls = ["1", "2", "3", "4", "5", "6", "7"];
let pool = []; //并发池
let max = 3; //最大并发数量
//自定义请求函数
function request(url) {
return new Promise((resolve) => {
setTimeout(() => {
resolve(url);
console.log(`任务${url}完成`);
}, 1000);
}).then((res) => {
console.log("外部逻辑", res);
});
}
function addTask(url) {
let task = request(url);
pool.push(task);
task.then((res) => {
//请求结束将Promise任务从pool中移除
pool.splice(pool.indexOf(task), 1);
console.log(`${url}结束,当前并发数:${pool.length}`);
});
}
function run(task) {
task.then((res) => {
let url = urls.shift();
if (url !== undefined) {
addTask(url);
run(Promise.race(pool));
}
});
}
while (pool.length < max) {
let url = urls.shift();
addTask(url);
}
let race = Promise.race(pool);
run(race);

Promise.race+async…await实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let urls = ["1", "2", "3", "4", "5", "6", "7"];
let pool = []; //并发池
let max = 3; //最大并发数量
//自定义请求函数
function request(url) {
return new Promise((resolve) => {
setTimeout(() => {
resolve(url);
console.log(`任务${url}完成`);
}, 1000);
}).then((res) => {
console.log("外部逻辑", res);
});
}
async function fn() {
for (let i = 0; i < urls.length; i++) {
let task = request(urls[i]);
//请求结束将Promise任务从pool中移除
task.then((res) => {
pool.splice(pool.indexOf(task), 1);
console.log(`${urls[i]}结束,当前并发数:${pool.length}`);
});
pool.push(task);
//并发池塞满后需要等待一个task完成才可以继续往里面塞任务
if (pool.length === max) {
await Promise.race(pool);
}
}
}
fn();