From ff80e1ed0c4a8c20a40f8c0ddee008c794d1329c Mon Sep 17 00:00:00 2001 From: tuhm1 Date: Tue, 7 May 2024 01:04:55 +0700 Subject: [PATCH 1/3] allow changing concurrency limit --- index.d.ts | 5 +++++ index.js | 27 +++++++++++++++++++++++---- readme.md | 4 ++++ test.js | 40 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/index.d.ts b/index.d.ts index 9351750..08f06fb 100644 --- a/index.d.ts +++ b/index.d.ts @@ -9,6 +9,11 @@ export type LimitFunction = { */ readonly pendingCount: number; + /** + Concurrency limit. + */ + concurrency: number; + /** Discard pending promises that are waiting to run. diff --git a/index.js b/index.js index d2c48b5..484ad3e 100644 --- a/index.js +++ b/index.js @@ -1,15 +1,13 @@ import Queue from 'yocto-queue'; export default function pLimit(concurrency) { - if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { - throw new TypeError('Expected `concurrency` to be a number from 1 and up'); - } + validateConcurrency(concurrency); const queue = new Queue(); let activeCount = 0; const resumeNext = () => { - if (queue.size > 0) { + if (activeCount < concurrency && queue.size > 0) { queue.dequeue()(); // Since `pendingCount` has been decreased by one, increase `activeCount` by one. activeCount++; @@ -72,7 +70,28 @@ export default function pLimit(concurrency) { queue.clear(); }, }, + concurrency: { + get: () => concurrency, + + set(newConcurrency) { + validateConcurrency(newConcurrency); + concurrency = newConcurrency; + + queueMicrotask(() => { + // eslint-disable-next-line no-unmodified-loop-condition + while (activeCount < concurrency && queue.size > 0) { + resumeNext(); + } + }); + }, + }, }); return generator; } + +function validateConcurrency(concurrency) { + if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { + throw new TypeError('Expected `concurrency` to be a number from 1 and up'); + } +} diff --git a/readme.md b/readme.md index 4e890f8..306d29e 100644 --- a/readme.md +++ b/readme.md @@ -74,6 +74,10 @@ This might be useful if you want to teardown the queue at the end of your progra Note: This does not cancel promises that are already running. +### limit.concurrency + +Concurrency limit. + ## FAQ ### How is this different from the [`p-queue`](https://github.com/sindresorhus/p-queue) package? diff --git a/test.js b/test.js index 39cef54..c3ab72e 100644 --- a/test.js +++ b/test.js @@ -177,3 +177,43 @@ test('throws on invalid concurrency argument', t => { pLimit(true); }); }); + +test('change concurrency to smaller value', async t => { + const limit = pLimit(4); + let running = 0; + const log = []; + const promises = Array.from({length: 10}).map(() => + limit(async () => { + ++running; + log.push(running); + await delay(50); + --running; + }), + ); + await Promise.resolve(); + t.is(running, 4); + + limit.concurrency = 2; + await Promise.all(promises); + t.deepEqual(log, [1, 2, 3, 4, 2, 2, 2, 2, 2, 2]); +}); + +test('change concurrency to bigger value', async t => { + const limit = pLimit(2); + let running = 0; + const log = []; + const promises = Array.from({length: 10}).map(() => + limit(async () => { + ++running; + log.push(running); + await delay(50); + --running; + }), + ); + await Promise.resolve(); + t.is(running, 2); + + limit.concurrency = 4; + await Promise.all(promises); + t.deepEqual(log, [1, 2, 3, 4, 4, 4, 4, 4, 4, 4]); +}); From 9d61e10f91614ffc88a5325c1b56abcc0f3dc302 Mon Sep 17 00:00:00 2001 From: tuhm1 Date: Fri, 5 Jul 2024 12:48:01 +0700 Subject: [PATCH 2/3] Improve description --- index.d.ts | 2 +- readme.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/index.d.ts b/index.d.ts index 08f06fb..038ba41 100644 --- a/index.d.ts +++ b/index.d.ts @@ -10,7 +10,7 @@ export type LimitFunction = { readonly pendingCount: number; /** - Concurrency limit. + Get or set the concurrency limit. */ concurrency: number; diff --git a/readme.md b/readme.md index 306d29e..1d73ae1 100644 --- a/readme.md +++ b/readme.md @@ -76,7 +76,7 @@ Note: This does not cancel promises that are already running. ### limit.concurrency -Concurrency limit. +Get or set the concurrency limit. ## FAQ From 49311e50d855f35f412517f53fd73cae97954fb9 Mon Sep 17 00:00:00 2001 From: tuhm1 Date: Fri, 5 Jul 2024 14:34:31 +0700 Subject: [PATCH 3/3] Fix tests After v6 it takes 1 more microtask before starting an execution, so used `delay(0)` to wait instead. --- test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test.js b/test.js index c3ab72e..08bb306 100644 --- a/test.js +++ b/test.js @@ -190,7 +190,7 @@ test('change concurrency to smaller value', async t => { --running; }), ); - await Promise.resolve(); + await delay(0); t.is(running, 4); limit.concurrency = 2; @@ -210,7 +210,7 @@ test('change concurrency to bigger value', async t => { --running; }), ); - await Promise.resolve(); + await delay(0); t.is(running, 2); limit.concurrency = 4;