-
Notifications
You must be signed in to change notification settings - Fork 30.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement the map method on readable stream. This starts the alignment with the tc39-iterator-helpers proposal and adds a `.map` method to every Node.js readable stream. Co-Authored-By: Robert Nagy <ronag@icloud.com> PR-URL: #40815 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
- Loading branch information
1 parent
2f12bb6
commit 32afd27
Showing
6 changed files
with
317 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
'use strict'; | ||
|
||
const { AbortController } = require('internal/abort_controller'); | ||
const { | ||
codes: { | ||
ERR_INVALID_ARG_TYPE, | ||
}, | ||
AbortError, | ||
} = require('internal/errors'); | ||
const { validateInteger } = require('internal/validators'); | ||
|
||
const { | ||
MathFloor, | ||
Promise, | ||
PromiseReject, | ||
PromisePrototypeCatch, | ||
Symbol, | ||
} = primordials; | ||
|
||
const kEmpty = Symbol('kEmpty'); | ||
const kEof = Symbol('kEof'); | ||
|
||
async function * map(fn, options) { | ||
if (typeof fn !== 'function') { | ||
throw new ERR_INVALID_ARG_TYPE( | ||
'fn', ['Function', 'AsyncFunction'], this); | ||
} | ||
|
||
if (options != null && typeof options !== 'object') { | ||
throw new ERR_INVALID_ARG_TYPE('options', ['Object']); | ||
} | ||
|
||
let concurrency = 1; | ||
if (options?.concurrency != null) { | ||
concurrency = MathFloor(options.concurrency); | ||
} | ||
|
||
validateInteger(concurrency, 'concurrency', 1); | ||
|
||
const ac = new AbortController(); | ||
const stream = this; | ||
const queue = []; | ||
const signal = ac.signal; | ||
const signalOpt = { signal }; | ||
|
||
const abort = () => ac.abort(); | ||
options?.signal?.addEventListener('abort', abort); | ||
|
||
let next; | ||
let resume; | ||
let done = false; | ||
|
||
function onDone() { | ||
done = true; | ||
} | ||
|
||
async function pump() { | ||
try { | ||
for await (let val of stream) { | ||
if (done) { | ||
return; | ||
} | ||
|
||
if (signal.aborted) { | ||
throw new AbortError(); | ||
} | ||
|
||
try { | ||
val = fn(val, signalOpt); | ||
} catch (err) { | ||
val = PromiseReject(err); | ||
} | ||
|
||
if (val === kEmpty) { | ||
continue; | ||
} | ||
|
||
if (typeof val?.catch === 'function') { | ||
val.catch(onDone); | ||
} | ||
|
||
queue.push(val); | ||
if (next) { | ||
next(); | ||
next = null; | ||
} | ||
|
||
if (!done && queue.length && queue.length >= concurrency) { | ||
await new Promise((resolve) => { | ||
resume = resolve; | ||
}); | ||
} | ||
} | ||
queue.push(kEof); | ||
} catch (err) { | ||
const val = PromiseReject(err); | ||
PromisePrototypeCatch(val, onDone); | ||
queue.push(val); | ||
} finally { | ||
done = true; | ||
if (next) { | ||
next(); | ||
next = null; | ||
} | ||
options?.signal?.removeEventListener('abort', abort); | ||
} | ||
} | ||
|
||
pump(); | ||
|
||
try { | ||
while (true) { | ||
while (queue.length > 0) { | ||
const val = await queue[0]; | ||
|
||
if (val === kEof) { | ||
return; | ||
} | ||
|
||
if (signal.aborted) { | ||
throw new AbortError(); | ||
} | ||
|
||
if (val !== kEmpty) { | ||
yield val; | ||
} | ||
|
||
queue.shift(); | ||
if (resume) { | ||
resume(); | ||
resume = null; | ||
} | ||
} | ||
|
||
await new Promise((resolve) => { | ||
next = resolve; | ||
}); | ||
} | ||
} finally { | ||
ac.abort(); | ||
|
||
done = true; | ||
if (resume) { | ||
resume(); | ||
resume = null; | ||
} | ||
} | ||
} | ||
|
||
module.exports = { | ||
map, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { | ||
Readable, | ||
} = require('stream'); | ||
const assert = require('assert'); | ||
const { setTimeout } = require('timers/promises'); | ||
|
||
{ | ||
// Map works on synchronous streams with a synchronous mapper | ||
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); | ||
const result = [2, 4, 6, 8, 10]; | ||
(async () => { | ||
for await (const item of stream) { | ||
assert.strictEqual(item, result.shift()); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Map works on synchronous streams with an asynchronous mapper | ||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { | ||
await Promise.resolve(); | ||
return x + x; | ||
}); | ||
const result = [2, 4, 6, 8, 10]; | ||
(async () => { | ||
for await (const item of stream) { | ||
assert.strictEqual(item, result.shift()); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Map works on asynchronous streams with a asynchronous mapper | ||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { | ||
return x + x; | ||
}).map((x) => x + x); | ||
const result = [4, 8, 12, 16, 20]; | ||
(async () => { | ||
for await (const item of stream) { | ||
assert.strictEqual(item, result.shift()); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Concurrency + AbortSignal | ||
const ac = new AbortController(); | ||
let calls = 0; | ||
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => { | ||
calls++; | ||
await setTimeout(100, { signal }); | ||
}, { signal: ac.signal, concurrency: 2 }); | ||
// pump | ||
assert.rejects(async () => { | ||
for await (const item of stream) { | ||
// nope | ||
console.log(item); | ||
} | ||
}, { | ||
name: 'AbortError', | ||
}).then(common.mustCall()); | ||
|
||
setImmediate(() => { | ||
ac.abort(); | ||
assert.strictEqual(calls, 2); | ||
}); | ||
} | ||
|
||
{ | ||
// Concurrency result order | ||
const stream = Readable.from([1, 2]).map(async (item, { signal }) => { | ||
await setTimeout(10 - item, { signal }); | ||
return item; | ||
}, { concurrency: 2 }); | ||
|
||
(async () => { | ||
const expected = [1, 2]; | ||
for await (const item of stream) { | ||
assert.strictEqual(item, expected.shift()); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Error cases | ||
assert.rejects(async () => { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const unused of Readable.from([1]).map(1)); | ||
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); | ||
assert.rejects(async () => { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const _ of Readable.from([1]).map((x) => x, { | ||
concurrency: 'Foo' | ||
})); | ||
}, /ERR_OUT_OF_RANGE/).then(common.mustCall()); | ||
assert.rejects(async () => { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const _ of Readable.from([1]).map((x) => x, 1)); | ||
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); | ||
} | ||
{ | ||
// Test result is a Readable | ||
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x); | ||
assert.strictEqual(stream.readable, true); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters