Skip to content

Commit

Permalink
stream: implement Readable.from async iterator utility
Browse files Browse the repository at this point in the history
PR-URL: #27660
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
guybedford authored and BethGriggs committed Oct 17, 2019
1 parent 333963e commit ddb5152
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 3 deletions.
113 changes: 111 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js:
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions [pipeline][] and
[finished][].
Additionally, this module includes the utility functions [pipeline][],
[finished][] and [Readable.from][].

### Object Mode

Expand Down Expand Up @@ -1445,6 +1445,31 @@ async function run() {
run().catch(console.error);
```

### Readable.from(iterable, [options])

* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
`Symbol.iterator` iterable protocol.
* `options` {Object} Options provided to `new stream.Readable([options])`.
By default, `Readable.from()` will set `options.objectMode` to `true`, unless
this is explicitly opted out by setting `options.objectMode` to `false`.

A utility method for creating Readable Streams out of iterators.

```js
const { Readable } = require('stream');

async function * generate() {
yield 'hello';
yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
console.log(chunk);
});
```

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -2368,6 +2393,89 @@ primarily for examples and testing, but there are some use cases where

<!--type=misc-->

### Streams Compatibility with Async Generators and Async Iterators

With the support of async generators and iterators in JavaScript, async
generators are effectively a first-class language-level stream construct at
this point.

Some common interop cases of using Node.js streams with async generators
and async iterators are provided below.

#### Consuming Readable Streams with Async Iterators

```js
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
```

#### Creating Readable Streams with Async Generators

We can construct a Node.js Readable Stream from an asynchronous generator
using the `Readable.from` utility method:

```js
const { Readable } = require('stream');

async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
console.log(chunk);
});
```

#### Piping to Writable Streams from Async Iterators

In the scenario of writing to a writeable stream from an async iterator,
it is important to ensure the correct handling of backpressure and errors.

```js
const { once } = require('events');

const writeable = fs.createWriteStream('./file');

(async function() {
for await (const chunk of iterator) {
// Handle backpressure on write
if (!writeable.write(value))
await once(writeable, 'drain');
}
writeable.end();
// Ensure completion without errors
await once(writeable, 'finish');
})();
```

In the above, errors on the write stream would be caught and thrown by the two
`once` listeners, since `once` will also handle `'error'` events.

Alternatively the readable stream could be wrapped with `Readable.from` and
then piped via `.pipe`:

```js
const { once } = require('events');

const writeable = fs.createWriteStream('./file');

(async function() {
const readable = Readable.from(iterator);
readable.pipe(writeable);
// Ensure completion without errors
await once(writeable, 'finish');
})();
```

<!--type=misc-->

### Compatibility with Older Node.js Versions

<!--type=misc-->
Expand Down Expand Up @@ -2504,6 +2612,7 @@ contain multi-byte characters.
[Compatibility]: #stream_compatibility_with_older_node_js_versions
[HTTP requests, on the client]: http.html#http_class_http_clientrequest
[HTTP responses, on the server]: http.html#http_class_http_serverresponse
[Readable.from]: #readable.from
[TCP sockets]: net.html#net_class_net_socket
[child process stdin]: child_process.html#child_process_subprocess_stdin
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
Expand Down
39 changes: 39 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1154,3 +1154,42 @@ function endReadableNT(state, stream) {
}
}
}

Readable.from = function(iterable, opts) {
let iterator;
if (iterable && iterable[Symbol.asyncIterator])
iterator = iterable[Symbol.asyncIterator]();
else if (iterable && iterable[Symbol.iterator])
iterator = iterable[Symbol.iterator]();
else
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);

const readable = new Readable({
objectMode: true,
...opts
});
// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
async function next() {
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
next();
} else {
reading = false;
}
} catch (err) {
readable.destroy(err);
}
}
return readable;
};
2 changes: 1 addition & 1 deletion test/parallel/test-events-once.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ Promise.all([
catchesErrors(),
stopListeningAfterCatchingError(),
onceError()
]);
]).then(common.mustCall());
163 changes: 163 additions & 0 deletions test/parallel/test-readable-from.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
'use strict';

const { mustCall } = require('../common');
const { once } = require('events');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function toReadableBasicSupport() {
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate());

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadableSyncIterator() {
function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate());

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadablePromises() {
const promises = [
Promise.resolve('a'),
Promise.resolve('b'),
Promise.resolve('c')
];

const stream = Readable.from(promises);

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadableString() {
const stream = Readable.from('abc');

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadableOnData() {
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate());

let iterations = 0;
const expected = ['a', 'b', 'c'];

stream.on('data', (chunk) => {
iterations++;
strictEqual(chunk, expected.shift());
});

await once(stream, 'end');

strictEqual(iterations, 3);
}

async function toReadableOnDataNonObject() {
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate(), { objectMode: false });

let iterations = 0;
const expected = ['a', 'b', 'c'];

stream.on('data', (chunk) => {
iterations++;
strictEqual(chunk instanceof Buffer, true);
strictEqual(chunk.toString(), expected.shift());
});

await once(stream, 'end');

strictEqual(iterations, 3);
}

async function destroysTheStreamWhenThrowing() {
async function * generate() {
throw new Error('kaboom');
}

const stream = Readable.from(generate());

stream.read();

try {
await once(stream, 'error');
} catch (err) {
strictEqual(err.message, 'kaboom');
strictEqual(stream.destroyed, true);
}
}

async function asTransformStream() {
async function * generate(stream) {
for await (const chunk of stream) {
yield chunk.toUpperCase();
}
}

const source = new Readable({
objectMode: true,
read() {
this.push('a');
this.push('b');
this.push('c');
this.push(null);
}
});

const stream = Readable.from(generate(source));

const expected = ['A', 'B', 'C'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

Promise.all([
toReadableBasicSupport(),
toReadableSyncIterator(),
toReadablePromises(),
toReadableString(),
toReadableOnData(),
toReadableOnDataNonObject(),
destroysTheStreamWhenThrowing(),
asTransformStream()
]).then(mustCall());

0 comments on commit ddb5152

Please sign in to comment.