Skip to content

Commit

Permalink
stream: added experimental support for for-await
Browse files Browse the repository at this point in the history
Adds support for Symbol.asyncIterator into the Readable class.
The stream is destroyed when the loop terminates with break or throw.

Fixes: #15709

PR-URL: #17755
Fixes: #15709
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Vse Mozhet Byt <vsemozhetbyt@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
  • Loading branch information
mcollina committed Jan 11, 2018
1 parent 4d96c17 commit 61b4d60
Show file tree
Hide file tree
Showing 5 changed files with 492 additions and 0 deletions.
26 changes: 26 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,31 @@ readable stream will release any internal resources.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].

##### readable[@@asyncIterator]
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Returns an [AsyncIterator][async-iterator] to fully consume the stream.

```js
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const k of readable) {
data += k;
}
console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);
```

If the loop terminates with a `break` or a `throw`, the stream will be destroyed.
In other terms, iterating over a stream will consume the stream fully.

### Duplex and Transform Streams

#### Class: stream.Duplex
Expand Down Expand Up @@ -2328,3 +2353,4 @@ contain multi-byte characters.
[readable-destroy]: #stream_readable_destroy_error
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[async-iterator]: https://github.com/tc39/proposal-async-iteration
8 changes: 8 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const debug = util.debuglog('stream');
const BufferList = require('internal/streams/BufferList');
const destroyImpl = require('internal/streams/destroy');
const errors = require('internal/errors');
const ReadableAsyncIterator = require('internal/streams/async_iterator');
const { emitExperimentalWarning } = require('internal/util');
var StringDecoder;

util.inherits(Readable, Stream);
Expand Down Expand Up @@ -922,6 +924,12 @@ Readable.prototype.wrap = function(stream) {
return this;
};

Readable.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');

return new ReadableAsyncIterator(this);
};

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
159 changes: 159 additions & 0 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
'use strict';

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');

const AsyncIteratorRecord = class AsyncIteratorRecord {
constructor(value, done) {
this.done = done;
this.value = value;
}
};

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kStream].read();
// we defer if data is null
// we can be expecting either 'end' or
// 'error'
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
}
}
}

function onReadable(iter) {
// we wait for the next tick, because it might
// emit an error with process.nextTick
process.nextTick(readAndResolve, iter);
}

function onEnd(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(null, true));
}
iter[kEnded] = true;
}

function onError(iter, err) {
const reject = iter[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
reject(err);
}
iter.error = err;

This comment has been minimized.

Copy link
@julien-f

julien-f Apr 26, 2018

Contributor

Shoudn't it be iter[kError]?

This comment has been minimized.

Copy link
@mcollina

mcollina Apr 26, 2018

Author Member

Very likely! Would you like to send a PR?

This comment has been minimized.

Copy link
@julien-f

julien-f Apr 26, 2018

Contributor

No problem :)

}

function wrapForNext(lastPromise, iter) {
return function(resolve, reject) {
lastPromise.then(function() {
iter[kHandlePromise](resolve, reject);
}, reject);
};
}

const ReadableAsyncIterator = class ReadableAsyncIterator {
constructor(stream) {
this[kStream] = stream;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;

stream.on('readable', onReadable.bind(null, this));
stream.on('end', onEnd.bind(null, this));
stream.on('error', onError.bind(null, this));

// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kStream].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}

get stream() {
return this[kStream];
}

next() {
// if we have detected an error in the meanwhile
// reject straight away
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}

if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
}

// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
const lastPromise = this[kLastPromise];
let promise;

if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
}

promise = new Promise(this[kHandlePromise]);
}

this[kLastPromise] = promise;

return promise;
}

return() {
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise((resolve, reject) => {
this[kStream].destroy(null, (err) => {
if (err) {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
});
});
}
};

module.exports = ReadableAsyncIterator;
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
Expand Down
Loading

0 comments on commit 61b4d60

Please sign in to comment.