Skip to content

Commit

Permalink
stream: simpler and faster Readable async iterator
Browse files Browse the repository at this point in the history
Reimplement as an async generator instead of a custom
iterator class.

Backport-PR-URL: nodejs#34887
PR-URL: nodejs#34035
Refs: nodejs#34680
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and richardlau committed Sep 7, 2020
1 parent ffae5f3 commit 4bb4007
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 269 deletions.
38 changes: 38 additions & 0 deletions benchmark/streams/readable-async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';

const common = require('../common');
const Readable = require('stream').Readable;

const bench = common.createBenchmark(main, {
n: [1e5],
sync: ['yes', 'no'],
});

async function main({ n, sync }) {
sync = sync === 'yes';

const s = new Readable({
objectMode: true,
read() {
if (sync) {
this.push(1);
} else {
process.nextTick(() => {
this.push(1);
});
}
}
});

bench.start();

let x = 0;
for await (const chunk of s) {
x += chunk;
if (x > n) {
break;
}
}

bench.end(n);
}
66 changes: 61 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
NumberIsNaN,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
Set,
SymbolAsyncIterator,
Symbol
Expand Down Expand Up @@ -59,11 +60,11 @@ const kPaused = Symbol('kPaused');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
let from;

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
function nop() {}

const { errorOrDestroy } = destroyImpl;

Expand Down Expand Up @@ -1075,13 +1076,68 @@ Readable.prototype.wrap = function(stream) {
};

Readable.prototype[SymbolAsyncIterator] = function() {
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
let stream = this;

if (typeof stream.read !== 'function') {
// v1 stream
const src = stream;
stream = new Readable({
objectMode: true,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback();
}
}).wrap(src);
}
return createReadableStreamAsyncIterator(this);

const iter = createAsyncIterator(stream);
iter.stream = stream;
return iter;
};

async function* createAsyncIterator(stream) {
let callback = nop;

function next(resolve) {
if (this === stream) {
callback();
callback = nop;
} else {
callback = resolve;
}
}

stream
.on('readable', next)
.on('error', next)
.on('end', next)
.on('close', next);

try {
const state = stream._readableState;
while (true) {
const chunk = stream.read();
if (chunk !== null) {
yield chunk;
} else if (state.errored) {
throw state.errored;
} else if (state.ended) {
break;
} else if (state.closed) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
break;
} else {
await new Promise(next);
}
}
} catch (err) {
destroyImpl.destroyer(stream, err);
throw err;
} finally {
destroyImpl.destroyer(stream, null);
}
}

// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down
221 changes: 0 additions & 221 deletions lib/internal/streams/async_iterator.js

This file was deleted.

10 changes: 5 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const {

let EE;
let PassThrough;
let createReadableStreamAsyncIterator;
let Readable;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);
Expand Down Expand Up @@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
}

async function* fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
if (!Readable) {
Readable = require('_stream_readable');
}
yield* createReadableStreamAsyncIterator(val);

yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish) {
Expand Down
1 change: 0 additions & 1 deletion node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-readline-async-iterators-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async function testMutualDestroy() {
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
break;
}

assert.deepStrictEqual(iteratedLines, expectedLines);
Expand Down
Loading

0 comments on commit 4bb4007

Please sign in to comment.