Skip to content

Commit

Permalink
stream: close iterator in Readable.from
Browse files Browse the repository at this point in the history
Call iterator.return() if not all of its values are consumed.

Fixes: nodejs#32842

PR-URL: nodejs#32844
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Zeyu Yang <himself65@outlook.com>
  • Loading branch information
vadzim authored and ronag committed Apr 18, 2020
1 parent fd10be4 commit 8a3fa32
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 1 deletion.
32 changes: 31 additions & 1 deletion lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,51 @@ function from(Readable, iterable, opts) {
objectMode: true,
...opts
});

// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;

// needToClose boolean if iterator needs to be explicitly closed
let needToClose = false;

readable._read = function() {
if (!reading) {
reading = true;
next();
}
};

readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
close().then(
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
} else {
cb(error);
}
};

async function close() {
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
}
}

async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
needToClose = !done;
const resolved = await value;
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
} else if (readable.destroyed) {
await close();
} else if (readable.push(resolved)) {
next();
} else {
reading = false;
Expand Down
198 changes: 198 additions & 0 deletions test/parallel/test-readable-from-iterator-closing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
'use strict';

const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function asyncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();

async function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();

function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncPromiseSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustCall = mustCall();

function* infiniteGenerate() {
try {
while (true) yield Promise.resolve('a');
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncRejectedSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const secondNextMustNotCall = mustNotCall();

function* generate() {
try {
yield Promise.reject('a');
secondNextMustNotCall();
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}

const stream = Readable.from(generate());

try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}

async function noReturnAfterThrow() {
const returnMustNotCall = mustNotCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const nextMustCall = mustCall();

const stream = Readable.from({
[Symbol.asyncIterator]() { return this; },
async next() {
nextMustCall();
throw new Error('a');
},
async return() {
returnMustNotCall();
return { done: true };
},
});

try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}

async function closeStreamWhileNextIsPending() {
const finallyMustCall = mustCall();
const dataMustCall = mustCall();

let resolveDestroy;
const destroyed =
new Promise((resolve) => { resolveDestroy = mustCall(resolve); });
let resolveYielded;
const yielded =
new Promise((resolve) => { resolveYielded = mustCall(resolve); });

async function* infiniteGenerate() {
try {
while (true) {
yield 'a';
resolveYielded();
await destroyed;
}
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

stream.on('data', (data) => {
dataMustCall();
strictEqual(data, 'a');
});

yielded.then(() => {
stream.destroy();
resolveDestroy();
});
}

async function closeAfterNullYielded() {
const finallyMustCall = mustCall();
const dataMustCall = mustCall(3);

function* infiniteGenerate() {
try {
yield 'a';
yield 'a';
yield 'a';
while (true) yield null;
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

stream.on('data', (chunk) => {
dataMustCall();
strictEqual(chunk, 'a');
});
}

Promise.all([
asyncSupport(),
syncSupport(),
syncPromiseSupport(),
syncRejectedSupport(),
noReturnAfterThrow(),
closeStreamWhileNextIsPending(),
closeAfterNullYielded(),
]).then(mustCall());

0 comments on commit 8a3fa32

Please sign in to comment.