diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index ab6db00a125a0b..f2d8dc603760ee 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -34,21 +34,49 @@ function from(Readable, iterable, opts) { objectMode: true, ...opts }); + // Reading boolean to protect against _read // being called before last iteration completion. let reading = false; + + // needToClose boolean if iterator needs to be explicitly closed + let needToClose = false; + readable._read = function() { if (!reading) { reading = true; next(); } }; + + readable._destroy = function(error, cb) { + close().then( + () => cb(error), + (e) => cb(error || e), + ); + }; + + async function close() { + if (needToClose) { + needToClose = false; + if (typeof iterator.return === 'function') { + const { value } = await iterator.return(); + await value; + } + } + } + async function next() { try { + needToClose = false; const { value, done } = await iterator.next(); + needToClose = !done; + const resolved = await value; if (done) { readable.push(null); - } else if (readable.push(await value)) { + } else if (readable.destroyed) { + await close(); + } else if (readable.push(resolved)) { next(); } else { reading = false; diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js new file mode 100644 index 00000000000000..f08feeb8abd3b1 --- /dev/null +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -0,0 +1,163 @@ +'use strict'; + +const { mustCall, mustNotCall } = require('../common'); +const { Readable } = require('stream'); +const { strictEqual } = require('assert'); + +async function asyncSupport() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(); + + async function* infiniteGenerate() { + try { + while (true) yield 'a'; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncSupport() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(); + + function* infiniteGenerate() { + try { + while (true) yield 'a'; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncPromiseSupport() { + const returnMustBeAwaited = mustCall(); + const bodyMustCall = mustCall(); + + function* infiniteGenerate() { + try { + while (true) yield Promise.resolve('a'); + } finally { + // eslint-disable-next-line no-unsafe-finally + return { then(cb) { + returnMustBeAwaited(); + cb(); + } }; + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncRejectedSupport() { + const returnMustBeAwaited = mustCall(); + const bodyMustNotCall = mustNotCall(); + const catchMustCall = mustCall(); + const secondNextMustNotCall = mustNotCall(); + + function* generate() { + try { + yield Promise.reject('a'); + secondNextMustNotCall(); + } finally { + // eslint-disable-next-line no-unsafe-finally + return { then(cb) { + returnMustBeAwaited(); + cb(); + } }; + } + } + + const stream = Readable.from(generate()); + + try { + for await (const chunk of stream) { + bodyMustNotCall(chunk); + } + } catch { + catchMustCall(); + } +} + +async function noReturnAfterThrow() { + const returnMustNotCall = mustNotCall(); + const bodyMustNotCall = mustNotCall(); + const catchMustCall = mustCall(); + const nextMustCall = mustCall(); + + const stream = Readable.from({ + [Symbol.asyncIterator]() { return this; }, + async next() { + nextMustCall(); + throw new Error('a'); + }, + async return() { + returnMustNotCall(); + return { done: true }; + }, + }); + + try { + for await (const chunk of stream) { + bodyMustNotCall(chunk); + } + } catch { + catchMustCall(); + } +} + +async function closeStreamWhileNextIsPending() { + const finallyMustCall = mustCall(); + let resolveDestroy; + const destroyed = new Promise((resolve) => { resolveDestroy = resolve; }); + + async function* infiniteGenerate() { + try { + while (true) { + yield 'a'; + await destroyed; + } + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + stream.on('data', (data) => { + strictEqual(data, 'a'); + stream.destroy(); + resolveDestroy(); + }); +} + +Promise.all([ + asyncSupport(), + syncSupport(), + syncPromiseSupport(), + syncRejectedSupport(), + noReturnAfterThrow(), + closeStreamWhileNextIsPending() +]).then(mustCall());