From f5096096e299ba7ea3852539dc6bbdec3941409b Mon Sep 17 00:00:00 2001 From: Darshan Sen Date: Mon, 21 Jun 2021 21:50:27 +0530 Subject: [PATCH] stream: throw on premature close in Readable[AsyncIterator] Fixes: https://github.com/nodejs/node/issues/39086 Signed-off-by: Darshan Sen --- lib/internal/streams/readable.js | 8 +- .../test-stream-readable-async-iterators.js | 75 +++++++++++++++---- 2 files changed, 64 insertions(+), 19 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7f6876599cc7fc..6eb07cdd4a9b14 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -58,9 +58,10 @@ const { const { ERR_INVALID_ARG_TYPE, - ERR_STREAM_PUSH_AFTER_EOF, ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT + ERR_STREAM_PREMATURE_CLOSE, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, } = require('internal/errors').codes; const { validateObject } = require('internal/validators'); @@ -1138,7 +1139,7 @@ async function* createAsyncIterator(stream, options) { } else if (endEmitted) { break; } else if (closeEmitted) { - break; + throw new ERR_STREAM_PREMATURE_CLOSE(); } else { await new Promise(next); } @@ -1152,7 +1153,6 @@ async function* createAsyncIterator(stream, options) { } finally { if (!errorThrown && opts.destroyOnReturn) { if (state.autoDestroy || !endEmitted) { - // TODO(ronag): ERR_PREMATURE_CLOSE? destroyImpl.destroyer(stream, null); } } diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index a497317565fb4c..ec1cb464845e70 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -10,6 +10,7 @@ const { } = require('stream'); const assert = require('assert'); const http = require('http'); +const fs = require('fs'); async function tests() { { @@ -338,11 +339,17 @@ async function tests() { process.nextTick(async () => { readable.on('close', common.mustNotCall()); let received = 0; - for await (const k of readable) { - // Just make linting pass. This should never run. - assert.strictEqual(k, 'hello'); - received++; + let err = null; + try { + for await (const k of readable) { + // Just make linting pass. This should never run. + assert.strictEqual(k, 'hello'); + received++; + } + } catch (_err) { + err = _err; } + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); assert.strictEqual(received, 0); }); } @@ -412,8 +419,13 @@ async function tests() { readable.destroy(); - const { done } = await readable[Symbol.asyncIterator]().next(); - assert.strictEqual(done, true); + const it = await readable[Symbol.asyncIterator](); + const next = it.next(); + next + .then(common.mustNotCall()) + .catch(common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + })); } { @@ -458,7 +470,7 @@ async function tests() { } { - console.log('destroy mid-stream does not error'); + console.log('destroy mid-stream errors'); const r = new Readable({ objectMode: true, read() { @@ -467,10 +479,16 @@ async function tests() { } }); - // eslint-disable-next-line no-unused-vars - for await (const a of r) { - r.destroy(null); + let err = null; + try { + // eslint-disable-next-line no-unused-vars + for await (const a of r) { + r.destroy(null); + } + } catch (_err) { + err = _err; } + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); } { @@ -514,7 +532,7 @@ async function tests() { } { - console.log('all next promises must be resolved on destroy'); + console.log('all next promises must be rejected on destroy'); const r = new Readable({ objectMode: true, read() { @@ -525,7 +543,11 @@ async function tests() { const c = b.next(); const d = b.next(); r.destroy(); - assert.deepStrictEqual(await c, { done: true, value: undefined }); + c + .then(common.mustNotCall()) + .catch(common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + })); assert.deepStrictEqual(await d, { done: true, value: undefined }); } @@ -675,7 +697,7 @@ async function tests() { } { - // AsyncIterator should finish correctly if destroyed. + // AsyncIterator should not finish correctly if destroyed. const r = new Readable({ objectMode: true, @@ -688,11 +710,34 @@ async function tests() { const it = r[Symbol.asyncIterator](); const next = it.next(); next - .then(common.mustCall(({ done }) => assert.strictEqual(done, true))) - .catch(common.mustNotCall()); + .then(common.mustNotCall()) + .catch(common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + })); }); } +{ + // AsyncIterator should throw if prematurely closed + // before end has been emitted. + (async function() { + const readable = fs.createReadStream(__filename); + + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable) { + readable.close(); + } + + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); +} + // AsyncIterator non-destroying iterator { function createReadable() {