Skip to content

Commit

Permalink
Version 2.0. Use hooks instead of killer function.
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigogs committed Mar 24, 2021
1 parent 3203835 commit f401ec1
Show file tree
Hide file tree
Showing 4 changed files with 2,020 additions and 1,302 deletions.
32 changes: 26 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,31 @@
* If a `false` value is strictly returned from the processor function, its very `thread` dies.
* The poller is able to resolve promises, so it can be an async function.
* @param {Number} options.concurrency Number of parallel processors running simultaneously.
* @param {Function} [options.killer=undefined] An optional killer function to interrupt the poller
* from an outside scope.
* This function will be called in every iteration and once it returns `true` the `threads`
* will begin to stop after finishing the current processor. This can be an async function.
* @param {Object} [options.hooks] Flow hooks object.
* @param {Function} [options.hooks.beforeProcessor] An optional beforeProcessor hook function
* to intercept the poller from an outside scope.
* This function will be called in every iteration before executing the processor function,
* and it receives the current generator value as argument.
* Once it returns explicitly `false` the `threads` will begin to stop after finishing the current iteration.
* This can be an async function.
* @param {Function} [options.hooks.afterProcessor] An optional afterProcessor hook function
* to intercept the poller from an outside scope.
* This function will be called in every iteration after executing the processor function,
* and it receives the current generator value and the result value from the processor.
* Once it returns explicitly `false` the `threads` will begin to stop after finishing the current iteration.
* This can be an async function.
*
* @returns {Promise<void>}
*/
module.exports = async ({ generator, processor, concurrency, killer }) => {
module.exports = async ({
generator,
processor,
concurrency,
hooks: {
beforeProcessor = () => {},
afterProcessor = () => {},
} = {},
}) => {
const queue = Array(concurrency).fill(null);

let stop = false;
Expand All @@ -47,8 +65,10 @@ module.exports = async ({ generator, processor, concurrency, killer }) => {
do {
({ value, done } = await generator.next());
if (done) return;
if (await beforeProcessor(value) === false) { stop = true; continue; }
result = await processor(value);
if (result === false || killer && await killer()) stop = true;
if (result === false) { stop = true; continue; }
if (await afterProcessor(value, result) === false) { stop = true; }
} while (!stop);
};

Expand Down
113 changes: 109 additions & 4 deletions index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('promisePool', () => {
expect(values).toEqual(expect.arrayContaining(expected));
}, 10000);

it('Should kill the poller using the killer function', async () => {
it('Should receive current generator value from the beforeProcessor hook', async () => {
const values = [];
const expected = Array(1000).fill(1);

Expand All @@ -70,20 +70,125 @@ describe('promisePool', () => {
values.push(value);
}

let index = 0;
await promisePool({
generator: generatorFn(10, 100),
processor,
concurrency: 10,
killer: () => {
if (values.length > 500) {
return true;
hooks: {
beforeProcessor(value) {
expect(value).toEqual(values[index])
index++;
}
},
});

expect(values).not.toEqual(expect.arrayContaining(expected));
}, 10000);

it('Should receive processor result from the afterProcessor hook', async () => {
const values = [];
const expected = Array(1000).fill(1);

async function asyncCall(value) {
return new Promise(resolve => setTimeout(resolve, (10 + value)));
}

async function* generatorFn(startNumber, endNumber) {
for (let number = startNumber; number <= endNumber; number += 1) {
yield await asyncCall(number);
}
}

function processor(value) {
values.push(value);
}

let index = 0;
await promisePool({
generator: generatorFn(10, 100),
processor,
concurrency: 10,
hooks: {
afterProcessor(value, result) {
expect(value).toEqual(values[index])
expect(result).toEqual(undefined)
index++;
}
},
});

expect(values).not.toEqual(expect.arrayContaining(expected));
}, 10000);

it('Should halt the poller returning a `false` value from the beforeProcessor hook', async () => {
const values = [];
const expected = Array(11).fill(undefined);

async function asyncCall(value) {
return new Promise(resolve => setTimeout(resolve, (1 + value)));
}

async function* generatorFn(startNumber, endNumber) {
for (let number = startNumber; number <= endNumber; number += 1) {
yield await asyncCall(number);
}
}

function processor(value) {
values.push(value);
}

await promisePool({
generator: generatorFn(10, 100),
processor,
concurrency: 10,
hooks: {
beforeProcessor() {
if (values.length > 10) {
return false;
}
}
},
});

expect(values).toEqual(expect.arrayContaining(expected));
}, 10000);

it('Should halt the poller returning a `false` value from the afterProcessor hook', async () => {
const values = [];
const expected = Array(11).fill(undefined);

async function asyncCall(value) {
return new Promise(resolve => setTimeout(resolve, (10 + value)));
}

async function* generatorFn(startNumber, endNumber) {
for (let number = startNumber; number <= endNumber; number += 1) {
yield await asyncCall(number);
}
}

function processor(value) {
values.push(value);
}

await promisePool({
generator: generatorFn(10, 100),
processor,
concurrency: 10,
hooks: {
afterProcessor() {
if (values.length > 10) {
return false;
}
}
},
});

expect(values).toEqual(expect.arrayContaining(expected));
}, 10000);

it('Should terminate the threads by returning false in the processor', async () => {
const values = [];
const expected = Array(500).fill(0).map((v, i) => (v + i));
Expand Down
Loading

0 comments on commit f401ec1

Please sign in to comment.