Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add iterator helper find #41849

Merged
merged 2 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 70 additions & 19 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,8 @@ added: v17.4.0

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
* `fn` {Function|AsyncFunction} a function to map over every chunk in the
stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -1758,16 +1759,16 @@ added: v17.4.0
* Returns: {Readable} a stream mapped with the function `fn`.

This method allows mapping over the stream. The `fn` function will be called
for every item in the stream. If the `fn` function returns a promise - that
for every chunk in the stream. If the `fn` function returns a promise - that
promise will be `await`ed before being passed to the result stream.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(item); // 2, 4, 6, 8
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
Expand All @@ -1789,7 +1790,7 @@ added: v17.4.0

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to filter items from stream.
* `fn` {Function|AsyncFunction} a function to filter chunks from the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -1801,8 +1802,8 @@ added: v17.4.0
aborted.
* Returns: {Readable} a stream filtered with the predicate `fn`.

This method allows filtering the stream. For each item in the stream the `fn`
function will be called and if it returns a truthy value, the item will be
This method allows filtering the stream. For each chunk in the stream the `fn`
function will be called and if it returns a truthy value, the chunk will be
passed to the result stream. If the `fn` function returns a promise - that
promise will be `await`ed.

Expand All @@ -1811,8 +1812,8 @@ import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
Expand All @@ -1838,7 +1839,7 @@ added: REPLACEME

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -1850,12 +1851,12 @@ added: REPLACEME
aborted.
* Returns: {Promise} a promise for when the stream has finished.

This method allows iterating a stream. For each item in the stream the
This method allows iterating a stream. For each chunk in the stream the
`fn` function will be called. If the `fn` function returns a promise - that
promise will be `await`ed.

This method is different from `for await...of` loops in that it can optionally
process items concurrently. In addition, a `forEach` iteration can only be
process chunks concurrently. In addition, a `forEach` iteration can only be
stopped by having passed a `signal` option and aborting the related
`AbortController` while `for await...of` can be stopped with `break` or
`return`. In either case the stream will be destroyed.
Expand All @@ -1869,8 +1870,8 @@ import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
Expand Down Expand Up @@ -1935,7 +1936,7 @@ added: REPLACEME

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand Down Expand Up @@ -1976,6 +1977,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
```

### `readable.find(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to the first chunk for which `fn`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it "chunk" or "item"? Let's choose one and use that throughout documentation

Copy link
Member Author

@Linkgoron Linkgoron Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to change if for every/some as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well! Fits in this PR because it already touches those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed item to chunk in all places except in one unrelated place.

evaluated with a truthy value, or `undefined` if no element was found.

This method is similar to `Array.prototype.find` and calls `fn` on each chunk
in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's
awaited return value is truthy, the stream is destroyed and the promise is
fulfilled with value for which `fn` returned a truthy value. If all of the
`fn` calls on the chunks return a falsy value, the promise is fulfilled with
`undefined`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined

// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
```

### `readable.every(fn[, options])`

<!-- YAML
Expand All @@ -1984,7 +2035,7 @@ added: REPLACEME

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand Down Expand Up @@ -2034,7 +2085,7 @@ added: REPLACEME
> Stability: 1 - Experimental

* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
every item in the stream.
every chunk in the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
Expand All @@ -2058,8 +2109,8 @@ import { Readable } from 'stream';
import { createReadStream } from 'fs';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
Expand Down
42 changes: 14 additions & 28 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,9 @@ function asIndexedPairs(options = undefined) {
}

async function some(fn, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
if (options?.signal) {
if (options.signal.aborted) {
ac.abort();
}
options.signal.addEventListener('abort', () => ac.abort(), {
[kWeakHandler]: this,
once: true,
});
}
const mapped = this.map(fn, { ...options, signal: ac.signal });
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
}
// eslint-disable-next-line no-unused-vars
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
for await (const unused of filter.call(this, fn, options)) {
return true;
}
return false;
}
Expand All @@ -226,6 +204,13 @@ async function every(fn, options) {
}, options));
}

async function find(fn, options) {
for await (const result of filter.call(this, fn, options)) {
return result;
}
return undefined;
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand All @@ -236,7 +221,7 @@ async function forEach(fn, options) {
return kEmpty;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of this.map(forEachFn, options));
for await (const unused of map.call(this, forEachFn, options));
}

function filter(fn, options) {
Expand All @@ -250,7 +235,7 @@ function filter(fn, options) {
}
return kEmpty;
}
return this.map(filterFn, options);
return map.call(this, filterFn, options);
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
}

// Specific to provide better error to reduce since the argument is only
Expand Down Expand Up @@ -329,7 +314,7 @@ async function toArray(options) {
}

function flatMap(fn, options) {
const values = this.map(fn, options);
const values = map.call(this, fn, options);
return async function* flatMap() {
for await (const val of values) {
yield* val;
Expand Down Expand Up @@ -415,4 +400,5 @@ module.exports.promiseReturningOperators = {
reduce,
toArray,
some,
find,
};
8 changes: 8 additions & 0 deletions test/parallel/test-stream-filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,11 @@ const { setTimeout } = require('timers/promises');
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
assert.strictEqual(stream.readable, true);
}
{
const stream = Readable.from([1, 2, 3, 4, 5]);
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream.filter(() => true);
}
8 changes: 8 additions & 0 deletions test/parallel/test-stream-flatMap.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,11 @@ function oneTo5() {
const stream = oneTo5().flatMap((x) => x);
assert.strictEqual(stream.readable, true);
}
{
const stream = oneTo5();
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream.flatMap(() => true);
}
8 changes: 8 additions & 0 deletions test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ const { setTimeout } = require('timers/promises');
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
assert.strictEqual(typeof stream.then, 'function');
}
{
const stream = Readable.from([1, 2, 3, 4, 5]);
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream.forEach(() => true);
}
Loading