Skip to content

Commit

Permalink
stream: close iterator in Readable.from
Browse files Browse the repository at this point in the history
Call iterator.return() if not all of its values are consumed.

Fixes: nodejs#32842
  • Loading branch information
vadzim committed Apr 15, 2020
1 parent 3f5142d commit 2fa5848
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 1 deletion.
30 changes: 29 additions & 1 deletion lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,49 @@ function from(Readable, iterable, opts) {
objectMode: true,
...opts
});

// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;

// needToClose boolean if iterator needs to be explicitly closed
let needToClose = false;

readable._read = function() {
if (!reading) {
reading = true;
next();
}
};

readable._destroy = function(error, cb) {
close().then(
() => cb(error),
(e) => cb(error || e),
);
};

async function close() {
if (needToClose) {
needToClose = false;
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
}
}
}

async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
needToClose = !done;
const resolved = await value;
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
} else if (readable.destroyed) {
await close();
} else if (readable.push(resolved)) {
next();
} else {
reading = false;
Expand Down
163 changes: 163 additions & 0 deletions test/parallel/test-readable-from-iterator-closing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
'use strict';

const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function asyncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();

async function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();

function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncPromiseSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustCall = mustCall();

function* infiniteGenerate() {
try {
while (true) yield Promise.resolve('a');
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncRejectedSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const secondNextMustNotCall = mustNotCall();

function* generate() {
try {
yield Promise.reject('a');
secondNextMustNotCall();
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}

const stream = Readable.from(generate());

try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}

async function noReturnAfterThrow() {
const returnMustNotCall = mustNotCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const nextMustCall = mustCall();

const stream = Readable.from({
[Symbol.asyncIterator]() { return this; },
async next() {
nextMustCall();
throw new Error('a');
},
async return() {
returnMustNotCall();
return { done: true };
},
});

try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}

async function closeStreamWhileNextIsPending() {
const finallyMustCall = mustCall();
let resolveDestroy;
const destroyed = new Promise((resolve) => { resolveDestroy = resolve; });

async function* infiniteGenerate() {
try {
while (true) {
yield 'a';
await destroyed;
}
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

stream.on('data', (data) => {
strictEqual(data, 'a');
stream.destroy();
resolveDestroy();
});
}

Promise.all([
asyncSupport(),
syncSupport(),
syncPromiseSupport(),
syncRejectedSupport(),
noReturnAfterThrow(),
closeStreamWhileNextIsPending()
]).then(mustCall());

0 comments on commit 2fa5848

Please sign in to comment.