Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add a non-destroying iterator to Readable #38526

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1506,13 +1506,67 @@ async function print(readable) {
print(fs.createReadStream('file')).catch(console.error);
```

If the loop terminates with a `break` or a `throw`, the stream will be
destroyed. In other terms, iterating over a stream will consume the stream
If the loop terminates with a `break`, `return` or a `throw`, the stream will
Linkgoron marked this conversation as resolved.
Show resolved Hide resolved
be destroyed. In other terms, iterating over a stream will consume the stream
fully. The stream will be read in chunks of size equal to the `highWaterMark`
option. In the code example above, data will be in a single chunk if the file
has less then 64KB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable.iterator([options])`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: does this have to be in a new method rather than adding a parameter to the existing Symbol.asyncIterator one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -0 on naming. I prefer a new method as the Symbol.asyncIterator one has a predefined signature by the standard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't feel right to me if the user has to call [Symbol.asyncIterator]() themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Symbol.asyncIterator one has a predefined signature by the standard.

Not really, all the standard says it this method is called with no argument (https://tc39.es/ecma262/#sec-getiterator). The standard gives a clear rule for the returned object (https://tc39.es/ecma262/#sec-asynciterable-interface), but not for the function signature. I personally don't feel strongly either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the separate method. The key issue with extending standard-defined APIs is that it makes reasoning about the portability of code far more complicated. A separate method makes it clear. That said, the behavior of the two can be identical such that [Symbol.asyncIterator]() could just defer to readable.iterator() with default arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, the behavior of the two can be identical such that [Symbol.asyncIterator]() could just defer to readable.iterator() with default arguments.

Yeah, the reason that one doesn't call the other is because of legacy streams and this. I'd need to use ReflectApply to bind this, and I preferred to have a regular method and send this as the first parameter instead of primordials.

<!-- YAML
added: REPLACEME
-->

* `options` {Object}
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
async iterator, or exiting a `for await...of` iteration using a `break`,
`return` or `throw` will not destroy the stream. **Default:** `true`.
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
error while it's being iterated, the iterator will not destroy the stream.
**Default:** `true`.
* Returns: {AsyncIterator} to consume the stream.

The iterator created by this method gives users the option to cancel the
destruction of the stream if the `for await...of` loop is exited by `return`,
`break` or `throw`, or if the iterator should destroy the stream if the stream
emitted an error during iteration.

```js
const { Readable } = require('stream');

async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}

console.log(readable.destroyed); // false

for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}

console.log(readable.destroyed); // True, stream was totally consumed
}

async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}

console.log(readable.destroyed); // true
}

async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}

showBoth();
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
38 changes: 30 additions & 8 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
} = require('internal/errors').codes;
const { validateObject } = require('internal/validators');

const kPaused = Symbol('kPaused');

Expand Down Expand Up @@ -1062,8 +1063,17 @@ Readable.prototype.wrap = function(stream) {
};

Readable.prototype[SymbolAsyncIterator] = function() {
let stream = this;
return streamToAsyncIterator(this);
};

Readable.prototype.iterator = function(options) {
if (options !== undefined) {
validateObject(options, 'options');
}
return streamToAsyncIterator(this, options);
};

function streamToAsyncIterator(stream, options) {
if (typeof stream.read !== 'function') {
// v1 stream
const src = stream;
Expand All @@ -1076,14 +1086,20 @@ Readable.prototype[SymbolAsyncIterator] = function() {
}).wrap(src);
}

const iter = createAsyncIterator(stream);
const iter = createAsyncIterator(stream, options);
iter.stream = stream;
return iter;
};
}

async function* createAsyncIterator(stream) {
async function* createAsyncIterator(stream, options) {
let callback = nop;

const opts = {
destroyOnReturn: true,
destroyOnError: true,
...options,
};

function next(resolve) {
if (this === stream) {
callback();
Expand Down Expand Up @@ -1116,6 +1132,7 @@ async function* createAsyncIterator(stream) {
next.call(this);
});

let errorThrown = false;
try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
Expand All @@ -1132,12 +1149,17 @@ async function* createAsyncIterator(stream) {
}
}
} catch (err) {
destroyImpl.destroyer(stream, err);
if (opts.destroyOnError) {
destroyImpl.destroyer(stream, err);
}
errorThrown = true;
throw err;
} finally {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
if (!errorThrown && opts.destroyOnReturn) {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
}
}
Expand Down
116 changes: 116 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,122 @@ async function tests() {
});
}

// AsyncIterator non-destroying iterator
{
function createReadable() {
return Readable.from((async function* () {
await Promise.resolve();
yield 5;
await Promise.resolve();
yield 7;
await Promise.resolve();
})());
}

function createErrorReadable() {
const opts = { read() { throw new Error('inner'); } };
return new Readable(opts);
}

// Check default destroys on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator()) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroying on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator({ destroyOnReturn: true })) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check default destroys on error
(async function() {
const readable = createErrorReadable();
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroys on error
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: true, destroyOnReturn: false };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: false, destroyOnReturn: true };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(!readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createReadable();
const opts = { destroyOnReturn: false };
for await (const chunk of readable.iterator(opts)) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(!readable.destroyed);

for await (const chunk of readable.iterator(opts)) {
assert.strictEqual(chunk, 7);
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check non-object options.
{
const readable = createReadable();
assert.throws(
() => readable.iterator(42),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "options" argument must be of type object. Received ' +
'type number (42)',
}
);
}
}

{
let _req;
const server = http.createServer((request, response) => {
Expand Down