From 2b77b94c05d22b945644984dfe84aa66f59ba63a Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sun, 23 Sep 2018 15:10:12 -0500 Subject: [PATCH] streams: refactor ReadableStream asyncIterator creation and a few fixes Closes: https://github.com/nodejs/node/issues/23041 - Rewrite `ReadableAsyncIterator` class into `ReadableStreamAsyncIteratorPrototype` which contains no constructor and inherits from `%AsyncIteratorPrototype%`. - Rewrite `AsyncIteratorRecord` into dumb function. PR-URL: https://github.com/nodejs/node/pull/23042 Fixes: https://github.com/nodejs/node/issues/23041 Reviewed-By: James M Snell Reviewed-By: Anatoli Papirovski Reviewed-By: Matteo Collina Reviewed-By: Ruben Bridgewater --- lib/_stream_readable.js | 10 +- lib/internal/streams/async_iterator.js | 94 ++++++++++--------- .../test-stream-readable-async-iterators.js | 22 +++++ 3 files changed, 78 insertions(+), 48 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 2b9600f0fd6647..488d10a10b5bbd 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -42,7 +42,7 @@ const { emitExperimentalWarning } = require('internal/util'); // Lazy loaded to improve the startup performance. let StringDecoder; -let ReadableAsyncIterator; +let createReadableStreamAsyncIterator; util.inherits(Readable, Stream); @@ -990,9 +990,11 @@ Readable.prototype.wrap = function(stream) { Readable.prototype[Symbol.asyncIterator] = function() { emitExperimentalWarning('Readable[Symbol.asyncIterator]'); - if (ReadableAsyncIterator === undefined) - ReadableAsyncIterator = require('internal/streams/async_iterator'); - return new ReadableAsyncIterator(this); + if (createReadableStreamAsyncIterator === undefined) { + createReadableStreamAsyncIterator = + require('internal/streams/async_iterator'); + } + return createReadableStreamAsyncIterator(this); }; Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 0e34573d877aee..91c473ee9d29c5 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -8,12 +8,9 @@ const kLastPromise = Symbol('lastPromise'); const kHandlePromise = Symbol('handlePromise'); const kStream = Symbol('stream'); -const AsyncIteratorRecord = class AsyncIteratorRecord { - constructor(value, done) { - this.done = done; - this.value = value; - } -}; +function createIterResult(value, done) { + return { value, done }; +} function readAndResolve(iter) { const resolve = iter[kLastResolve]; @@ -26,7 +23,7 @@ function readAndResolve(iter) { iter[kLastPromise] = null; iter[kLastResolve] = null; iter[kLastReject] = null; - resolve(new AsyncIteratorRecord(data, false)); + resolve(createIterResult(data, false)); } } } @@ -43,7 +40,7 @@ function onEnd(iter) { iter[kLastPromise] = null; iter[kLastResolve] = null; iter[kLastReject] = null; - resolve(new AsyncIteratorRecord(null, true)); + resolve(createIterResult(null, true)); } iter[kEnded] = true; } @@ -69,39 +66,13 @@ function wrapForNext(lastPromise, iter) { }; } -const ReadableAsyncIterator = class ReadableAsyncIterator { - constructor(stream) { - this[kStream] = stream; - this[kLastResolve] = null; - this[kLastReject] = null; - this[kError] = null; - this[kEnded] = false; - this[kLastPromise] = null; - - stream.on('readable', onReadable.bind(null, this)); - stream.on('end', onEnd.bind(null, this)); - stream.on('error', onError.bind(null, this)); - - // the function passed to new Promise - // is cached so we avoid allocating a new - // closure at every run - this[kHandlePromise] = (resolve, reject) => { - const data = this[kStream].read(); - if (data) { - this[kLastPromise] = null; - this[kLastResolve] = null; - this[kLastReject] = null; - resolve(new AsyncIteratorRecord(data, false)); - } else { - this[kLastResolve] = resolve; - this[kLastReject] = reject; - } - }; - } +const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(async function* () {}).prototype); +const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ get stream() { return this[kStream]; - } + }, next() { // if we have detected an error in the meanwhile @@ -112,7 +83,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { } if (this[kEnded]) { - return Promise.resolve(new AsyncIteratorRecord(null, true)); + return Promise.resolve(createIterResult(null, true)); } // if we have multiple next() calls @@ -129,7 +100,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { // without triggering the next() queue const data = this[kStream].read(); if (data !== null) { - return Promise.resolve(new AsyncIteratorRecord(data, false)); + return Promise.resolve(createIterResult(data, false)); } promise = new Promise(this[kHandlePromise]); @@ -138,7 +109,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { this[kLastPromise] = promise; return promise; - } + }, return() { // destroy(err, cb) is a private API @@ -150,10 +121,45 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { reject(err); return; } - resolve(new AsyncIteratorRecord(null, true)); + resolve(createIterResult(null, true)); }); }); - } + }, +}, AsyncIteratorPrototype); + +const createReadableStreamAsyncIterator = (stream) => { + const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, { + [kStream]: { value: stream, writable: true }, + [kLastResolve]: { value: null, writable: true }, + [kLastReject]: { value: null, writable: true }, + [kError]: { value: null, writable: true }, + [kEnded]: { value: false, writable: true }, + [kLastPromise]: { value: null, writable: true }, + // the function passed to new Promise + // is cached so we avoid allocating a new + // closure at every run + [kHandlePromise]: { + value: (resolve, reject) => { + const data = iterator[kStream].read(); + if (data) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(data, false)); + } else { + iterator[kLastResolve] = resolve; + iterator[kLastReject] = reject; + } + }, + writable: true, + }, + }); + + stream.on('readable', onReadable.bind(null, iterator)); + stream.on('end', onEnd.bind(null, iterator)); + stream.on('error', onError.bind(null, iterator)); + + return iterator; }; -module.exports = ReadableAsyncIterator; +module.exports = createReadableStreamAsyncIterator; diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index d8eb83a58506d1..fb3c55846c4450 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -5,6 +5,28 @@ const { Readable } = require('stream'); const assert = require('assert'); async function tests() { + { + const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(async function* () {}).prototype); + const rs = new Readable({}); + assert.strictEqual( + Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())), + AsyncIteratorPrototype); + } + + await (async function() { + const readable = new Readable({ objectMode: true, read() {} }); + readable.push(0); + readable.push(1); + readable.push(null); + + const iter = readable[Symbol.asyncIterator](); + assert.strictEqual((await iter.next()).value, 0); + for await (const d of iter) { + assert.strictEqual(d, 1); + } + })(); + await (async function() { console.log('read without for..await'); const max = 5;