Skip to content

Commit

Permalink
stream: add iterator helper find
Browse files Browse the repository at this point in the history
Continue iterator-helpers work by adding `find` to readable streams.

PR-URL: nodejs/node#41849
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
Linkgoron authored and guangwong committed Oct 10, 2022
1 parent 4186b8b commit f2a96b5
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 153 deletions.
89 changes: 70 additions & 19 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,8 @@ added: v16.14.0

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
* `fn` {Function|AsyncFunction} a function to map over every chunk in the
stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -1709,16 +1710,16 @@ added: v16.14.0
* Returns: {Readable} a stream mapped with the function `fn`.

This method allows mapping over the stream. The `fn` function will be called
for every item in the stream. If the `fn` function returns a promise - that
for every chunk in the stream. If the `fn` function returns a promise - that
promise will be `await`ed before being passed to the result stream.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(item); // 2, 4, 6, 8
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
Expand All @@ -1740,7 +1741,7 @@ added: v16.14.0

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to filter items from stream.
* `fn` {Function|AsyncFunction} a function to filter chunks from the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -1752,8 +1753,8 @@ added: v16.14.0
aborted.
* Returns: {Readable} a stream filtered with the predicate `fn`.

This method allows filtering the stream. For each item in the stream the `fn`
function will be called and if it returns a truthy value, the item will be
This method allows filtering the stream. For each chunk in the stream the `fn`
function will be called and if it returns a truthy value, the chunk will be
passed to the result stream. If the `fn` function returns a promise - that
promise will be `await`ed.

Expand All @@ -1762,8 +1763,8 @@ import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
Expand All @@ -1789,7 +1790,7 @@ added: v16.15.0

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -1801,12 +1802,12 @@ added: v16.15.0
aborted.
* Returns: {Promise} a promise for when the stream has finished.

This method allows iterating a stream. For each item in the stream the
This method allows iterating a stream. For each chunk in the stream the
`fn` function will be called. If the `fn` function returns a promise - that
promise will be `await`ed.

This method is different from `for await...of` loops in that it can optionally
process items concurrently. In addition, a `forEach` iteration can only be
process chunks concurrently. In addition, a `forEach` iteration can only be
stopped by having passed a `signal` option and aborting the related
`AbortController` while `for await...of` can be stopped with `break` or
`return`. In either case the stream will be destroyed.
Expand All @@ -1820,8 +1821,8 @@ import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
Expand Down Expand Up @@ -1886,7 +1887,7 @@ added: v16.15.0

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand Down Expand Up @@ -1927,6 +1928,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
```

##### `readable.find(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to the first chunk for which `fn`
evaluated with a truthy value, or `undefined` if no element was found.

This method is similar to `Array.prototype.find` and calls `fn` on each chunk
in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's
awaited return value is truthy, the stream is destroyed and the promise is
fulfilled with value for which `fn` returned a truthy value. If all of the
`fn` calls on the chunks return a falsy value, the promise is fulfilled with
`undefined`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined

// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
```

##### `readable.every(fn[, options])`

<!-- YAML
Expand All @@ -1935,7 +1986,7 @@ added: v16.15.0

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand Down Expand Up @@ -1985,7 +2036,7 @@ added: v16.15.0
> Stability: 1 - Experimental
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
every item in the stream.
every chunk in the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -2009,8 +2060,8 @@ import { Readable } from 'stream';
import { createReadStream } from 'fs';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
Expand Down
42 changes: 14 additions & 28 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,9 @@ function asIndexedPairs(options = undefined) {
}

async function some(fn, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
if (options?.signal) {
if (options.signal.aborted) {
ac.abort();
}
options.signal.addEventListener('abort', () => ac.abort(), {
[kWeakHandler]: this,
once: true,
});
}
const mapped = this.map(fn, { ...options, signal: ac.signal });
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of filter.call(this, fn, options)) {
return true;
}
return false;
}
Expand All @@ -226,6 +204,13 @@ async function every(fn, options) {
}, options));
}

async function find(fn, options) {
for await (const result of filter.call(this, fn, options)) {
return result;
}
return undefined;
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand All @@ -236,7 +221,7 @@ async function forEach(fn, options) {
return kEmpty;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of this.map(forEachFn, options));
for await (const unused of map.call(this, forEachFn, options));
}

function filter(fn, options) {
Expand All @@ -250,7 +235,7 @@ function filter(fn, options) {
}
return kEmpty;
}
return this.map(filterFn, options);
return map.call(this, filterFn, options);
}

// Specific to provide better error to reduce since the argument is only
Expand Down Expand Up @@ -329,7 +314,7 @@ async function toArray(options) {
}

function flatMap(fn, options) {
const values = this.map(fn, options);
const values = map.call(this, fn, options);
return async function* flatMap() {
for await (const val of values) {
yield* val;
Expand Down Expand Up @@ -415,4 +400,5 @@ module.exports.promiseReturningOperators = {
reduce,
toArray,
some,
find,
};
8 changes: 8 additions & 0 deletions test/parallel/test-stream-filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,11 @@ const { setTimeout } = require('timers/promises');
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
assert.strictEqual(stream.readable, true);
}
{
const stream = Readable.from([1, 2, 3, 4, 5]);
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream.filter(() => true);
}
8 changes: 8 additions & 0 deletions test/parallel/test-stream-flatMap.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,11 @@ function oneTo5() {
const stream = oneTo5().flatMap((x) => x);
assert.strictEqual(stream.readable, true);
}
{
const stream = oneTo5();
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream.flatMap(() => true);
}
8 changes: 8 additions & 0 deletions test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,11 @@ const { once } = require('events');
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
assert.strictEqual(typeof stream.then, 'function');
}
{
const stream = Readable.from([1, 2, 3, 4, 5]);
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream.forEach(() => true);
}
Loading

0 comments on commit f2a96b5

Please sign in to comment.