Skip to content

Commit

Permalink
Allow changing concurrency limit (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuhm1 authored Jul 8, 2024
1 parent 850768f commit 3e4fdd1
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 4 deletions.
5 changes: 5 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ export type LimitFunction = {
*/
readonly pendingCount: number;

/**
Get or set the concurrency limit.
*/
concurrency: number;

/**
Discard pending promises that are waiting to run.
Expand Down
27 changes: 23 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import Queue from 'yocto-queue';

export default function pLimit(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
validateConcurrency(concurrency);

const queue = new Queue();
let activeCount = 0;

const resumeNext = () => {
if (queue.size > 0) {
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
// Since `pendingCount` has been decreased by one, increase `activeCount` by one.
activeCount++;
Expand Down Expand Up @@ -72,7 +70,28 @@ export default function pLimit(concurrency) {
queue.clear();
},
},
concurrency: {
get: () => concurrency,

set(newConcurrency) {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;

queueMicrotask(() => {
// eslint-disable-next-line no-unmodified-loop-condition
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
});

return generator;
}

function validateConcurrency(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
}
4 changes: 4 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ This might be useful if you want to teardown the queue at the end of your progra

Note: This does not cancel promises that are already running.

### limit.concurrency

Get or set the concurrency limit.

## FAQ

### How is this different from the [`p-queue`](https://github.com/sindresorhus/p-queue) package?
Expand Down
40 changes: 40 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,43 @@ test('throws on invalid concurrency argument', t => {
pLimit(true);
});
});

test('change concurrency to smaller value', async t => {
const limit = pLimit(4);
let running = 0;
const log = [];
const promises = Array.from({length: 10}).map(() =>
limit(async () => {
++running;
log.push(running);
await delay(50);
--running;
}),
);
await delay(0);
t.is(running, 4);

limit.concurrency = 2;
await Promise.all(promises);
t.deepEqual(log, [1, 2, 3, 4, 2, 2, 2, 2, 2, 2]);
});

test('change concurrency to bigger value', async t => {
const limit = pLimit(2);
let running = 0;
const log = [];
const promises = Array.from({length: 10}).map(() =>
limit(async () => {
++running;
log.push(running);
await delay(50);
--running;
}),
);
await delay(0);
t.is(running, 2);

limit.concurrency = 4;
await Promise.all(promises);
t.deepEqual(log, [1, 2, 3, 4, 4, 4, 4, 4, 4, 4]);
});

0 comments on commit 3e4fdd1

Please sign in to comment.