diff --git a/index.js b/index.js index c7c9fba..11cf5f5 100644 --- a/index.js +++ b/index.js @@ -23,7 +23,7 @@ export default async function pMap( const result = []; const errors = []; - const skippedIndexes = []; + const skippedIndexesMap = new Map(); let isRejected = false; let isResolved = false; let isIterableDone = false; @@ -59,15 +59,28 @@ export default async function pMap( if (resolvingCount === 0 && !isResolved) { if (!stopOnError && errors.length > 0) { reject(new AggregateError(errors)); - } else { - isResolved = true; + return; + } - for (const skippedIndex of skippedIndexes) { - result.splice(skippedIndex, 1); - } + isResolved = true; + if (!skippedIndexesMap.size) { resolve(result); + return; } + + const pureResult = []; + + // Support multiple `pMapSkip`'s. + for (const [index, value] of result.entries()) { + if (skippedIndexesMap.get(index) === pMapSkip) { + continue; + } + + pureResult.push(value); + } + + resolve(pureResult); } return; @@ -86,12 +99,13 @@ export default async function pMap( const value = await mapper(element, index); + // Use Map to stage the index of the element. if (value === pMapSkip) { - skippedIndexes.push(index); - } else { - result[index] = value; + skippedIndexesMap.set(index, value); } + result[index] = value; + resolvingCount--; await next(); } catch (error) { diff --git a/test-multiple-pmapskips-performance.js b/test-multiple-pmapskips-performance.js new file mode 100644 index 0000000..204b768 --- /dev/null +++ b/test-multiple-pmapskips-performance.js @@ -0,0 +1,37 @@ +import test from 'ava'; +import inRange from 'in-range'; +import timeSpan from 'time-span'; +import pMap, {pMapSkip} from './index.js'; + +function generateSkipPerformanceData(length) { + const data = []; + for (let index = 0; index < length; index++) { + data.push(pMapSkip); + } + + return data; +} + +test('multiple pMapSkips - algorithmic complexity', async t => { + const testData = [generateSkipPerformanceData(1000), generateSkipPerformanceData(10000), generateSkipPerformanceData(100000)]; + const testDurationsMS = []; + + for (const data of testData) { + const end = timeSpan(); + // eslint-disable-next-line no-await-in-loop + await pMap(data, async value => value); + testDurationsMS.push(end()); + } + + for (let index = 0; index < testDurationsMS.length - 1; index++) { + // Time for 10x more items should take between 9x and 11x more time. + const smallerDuration = testDurationsMS[index]; + const longerDuration = testDurationsMS[index + 1]; + + // The longer test needs to be a little longer and also not 10x more than the + // shorter test. This is not perfect... there is some fluctuation. + // The idea here is to catch a regression that makes `pMapSkip` handling O(n^2) + // on the number of `pMapSkip` items in the input. + t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration})); + } +}); diff --git a/test.js b/test.js index 2120258..02a98d8 100644 --- a/test.js +++ b/test.js @@ -155,6 +155,28 @@ test('pMapSkip', async t => { ], async value => value), [1, 2]); }); +test('multiple pMapSkips', async t => { + t.deepEqual(await pMap([ + 1, + pMapSkip, + 2, + pMapSkip, + 3, + pMapSkip, + pMapSkip, + 4 + ], async value => value), [1, 2, 3, 4]); +}); + +test('all pMapSkips', async t => { + t.deepEqual(await pMap([ + pMapSkip, + pMapSkip, + pMapSkip, + pMapSkip + ], async value => value), []); +}); + test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; @@ -269,6 +291,28 @@ test('asyncIterator - pMapSkip', async t => { ]), async value => value), [1, 2]); }); +test('asyncIterator - multiple pMapSkips', async t => { + t.deepEqual(await pMap(new AsyncTestData([ + 1, + pMapSkip, + 2, + pMapSkip, + 3, + pMapSkip, + pMapSkip, + 4 + ]), async value => value), [1, 2, 3, 4]); +}); + +test('asyncIterator - all pMapSkips', async t => { + t.deepEqual(await pMap(new AsyncTestData([ + pMapSkip, + pMapSkip, + pMapSkip, + pMapSkip + ]), async value => value), []); +}); + test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = [];