Skip to content

Commit 164471f

Browse files
committed
stream: support flatMap
1 parent ebe253a commit 164471f

File tree

3 files changed

+172
-0
lines changed

3 files changed

+172
-0
lines changed

doc/api/stream.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,6 +2025,55 @@ console.log(anyBigFile); // `true` if all files in the list are bigger than 1MiB
20252025
console.log('done'); // Stream has finished
20262026
```
20272027

2028+
### `readable.flatMap(fn[, options])`
2029+
2030+
<!-- YAML
2031+
added: REPLACEME
2032+
-->
2033+
2034+
> Stability: 1 - Experimental
2035+
2036+
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
2037+
every item in the stream.
2038+
* `data` {any} a chunk of data from the stream.
2039+
* `options` {Object}
2040+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2041+
abort the `fn` call early.
2042+
* `options` {Object}
2043+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
2044+
on the stream at once. **Default:** `1`.
2045+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2046+
aborted.
2047+
* Returns: {Readable} a stream flat-mapped with the function `fn`.
2048+
2049+
This method returns a new stream by applying the given callback to each
2050+
chunk of the stream and then flattening the result.
2051+
2052+
It is possible to return a stream or another iterable or async iterable from
2053+
`fn` and the result streams will be merged (flattened) into the returned
2054+
stream.
2055+
2056+
```mjs
2057+
import { Readable } from 'stream';
2058+
import { createReadStream } from 'fs';
2059+
2060+
// With a synchronous mapper.
2061+
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2062+
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
2063+
}
2064+
// With an asynchronous mapper, combine the contents of 4 files
2065+
const concatResult = Readable.from([
2066+
'./1.mjs',
2067+
'./2.mjs',
2068+
'./3.mjs',
2069+
'./4.mjs',
2070+
]).flatMap((fileName) => createReadStream(fileName));
2071+
for await (const result of allFileContents) {
2072+
// This will contain the contents (all chunks) of all 4 files
2073+
console.log(result);
2074+
}
2075+
```
2076+
20282077
### Duplex and transform streams
20292078

20302079
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,16 @@ async function toArray(options) {
222222
}
223223
return result;
224224
}
225+
226+
async function* flatMap(fn, options) {
227+
for await (const val of this.map(fn, options)) {
228+
yield* val;
229+
}
230+
}
231+
225232
module.exports.streamReturningOperators = {
226233
filter,
234+
flatMap,
227235
map,
228236
};
229237

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fixtures = require('../common/fixtures');
5+
const {
6+
Readable,
7+
} = require('stream');
8+
const assert = require('assert');
9+
const { setTimeout } = require('timers/promises');
10+
const { createReadStream } = require('fs');
11+
12+
function oneTo5() {
13+
return Readable.from([1, 2, 3, 4, 5]);
14+
}
15+
16+
{
17+
// flatMap works on synchronous streams with a synchronous mapper
18+
(async () => {
19+
assert.deepStrictEqual(
20+
await oneTo5().flatMap((x) => [x + x]).toArray(),
21+
[2, 4, 6, 8, 10]
22+
);
23+
assert.deepStrictEqual(
24+
await oneTo5().flatMap(() => []).toArray(),
25+
[]
26+
);
27+
assert.deepStrictEqual(
28+
await oneTo5().flatMap((x) => [x, x]).toArray(),
29+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
30+
);
31+
})().then(common.mustCall());
32+
}
33+
34+
35+
{
36+
// flatMap works on sync/async streams with an asynchronous mapper
37+
(async () => {
38+
assert.deepStrictEqual(
39+
await oneTo5().flatMap(async (x) => [x, x]).toArray(),
40+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
41+
);
42+
const asyncOneTo5 = oneTo5().map(async (x) => x);
43+
assert.deepStrictEqual(
44+
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
45+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
46+
);
47+
})().then(common.mustCall());
48+
}
49+
{
50+
// flatMap works on a stream where mapping returns a stream
51+
(async () => {
52+
const result = await oneTo5().flatMap(async (x) => {
53+
return Readable.from([x, x]);
54+
}).toArray();
55+
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
56+
})().then(common.mustCall());
57+
// flatMap works on an objectMode stream where mappign returns a stream
58+
(async () => {
59+
const result = await oneTo5().flatMap(() => {
60+
return createReadStream(fixtures.path('x.txt'));
61+
}).toArray();
62+
// The resultant stream is in object mode so toArray shouldn't flatten
63+
assert.strictEqual(result.length, 5);
64+
assert.deepStrictEqual(
65+
Buffer.concat(result).toString(),
66+
'xyz\n'.repeat(5)
67+
);
68+
69+
})().then(common.mustCall());
70+
71+
}
72+
73+
{
74+
// Concurrency + AbortSignal
75+
const ac = new AbortController();
76+
const stream = oneTo5().flatMap(common.mustCall(async (_, { signal }) => {
77+
await setTimeout(100, { signal });
78+
}, 2), { signal: ac.signal, concurrency: 2 });
79+
// pump
80+
assert.rejects(async () => {
81+
for await (const item of stream) {
82+
// nope
83+
console.log(item);
84+
}
85+
}, {
86+
name: 'AbortError',
87+
}).then(common.mustCall());
88+
89+
setImmediate(() => {
90+
ac.abort();
91+
});
92+
}
93+
94+
{
95+
// Error cases
96+
assert.rejects(async () => {
97+
// eslint-disable-next-line no-unused-vars
98+
for await (const unused of Readable.from([1]).flatMap(1));
99+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
100+
assert.rejects(async () => {
101+
// eslint-disable-next-line no-unused-vars
102+
for await (const _ of Readable.from([1]).flatMap((x) => x, {
103+
concurrency: 'Foo'
104+
}));
105+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
106+
assert.rejects(async () => {
107+
// eslint-disable-next-line no-unused-vars
108+
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
109+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
110+
}
111+
{
112+
// Test result is a Readable
113+
const stream = oneTo5().flatMap((x) => x);
114+
assert.strictEqual(stream.readable, true);
115+
}

0 commit comments

Comments
 (0)