From 83ec33b9335a7140c1f8b46357303ff7a8122a0d Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sun, 9 Dec 2018 20:05:11 +0100 Subject: [PATCH] stream: fix end-of-stream for HTTP/2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HTTP/2 streams call `.end()` on themselves from their `.destroy()` method, which might be queued (e.g. due to network congestion) and not processed before the stream itself is destroyed. In that case, the `_writableState.ended` property could be set before the stream emits its `'close'` event, and never actually emits the `'finished'` event, confusing the end-of-stream implementation so that it wouldn’t call its callback. This can be fixed by watching for the end events themselves using the existing `'finish'` and `'end'` listeners rather than relying on the `.ended` properties of the `_...State` objects. These properties still need to be checked to know whether stream closure was premature – My understanding is that ideally, streams should not emit `'close'` before `'end'` and/or `'finished'`, so this might be another bug, but changing this would require modifying tests and almost certainly be a breaking change. Fixes: https://github.com/nodejs/node/issues/24456 PR-URL: https://github.com/nodejs/node/pull/24926 Reviewed-By: Rich Trott Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina Reviewed-By: Franziska Hinkelmann --- lib/internal/streams/end-of-stream.js | 21 ++++++---- test/parallel/parallel.status | 2 - ...t-stream-pipeline-queued-end-in-destroy.js | 39 +++++++++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) create mode 100644 test/parallel/test-stream-pipeline-queued-end-in-destroy.js diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 8bbd4827b23a3a..4ad7b93337f633 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -36,8 +36,6 @@ function eos(stream, opts, callback) { callback = once(callback); - const ws = stream._writableState; - const rs = stream._readableState; let readable = opts.readable || (opts.readable !== false && stream.readable); let writable = opts.writable || (opts.writable !== false && stream.writable); @@ -45,13 +43,17 @@ function eos(stream, opts, callback) { if (!stream.writable) onfinish(); }; + var writableEnded = stream._writableState && stream._writableState.finished; const onfinish = () => { writable = false; + writableEnded = true; if (!readable) callback.call(stream); }; + var readableEnded = stream._readableState && stream._readableState.endEmitted; const onend = () => { readable = false; + readableEnded = true; if (!writable) callback.call(stream); }; @@ -60,11 +62,16 @@ function eos(stream, opts, callback) { }; const onclose = () => { - if (readable && !(rs && rs.ended)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + let err; + if (readable && !readableEnded) { + if (!stream._readableState || !stream._readableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } - if (writable && !(ws && ws.ended)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + if (writable && !writableEnded) { + if (!stream._writableState || !stream._writableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } }; @@ -77,7 +84,7 @@ function eos(stream, opts, callback) { stream.on('abort', onclose); if (stream.req) onrequest(); else stream.on('request', onrequest); - } else if (writable && !ws) { // legacy streams + } else if (writable && !stream._writableState) { // legacy streams stream.on('end', onlegacyfinish); stream.on('close', onlegacyfinish); } diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status index cfc4e36e9a565f..b45e4448d97c06 100644 --- a/test/parallel/parallel.status +++ b/test/parallel/parallel.status @@ -12,8 +12,6 @@ test-net-connect-options-port: PASS,FLAKY test-http2-pipe: PASS,FLAKY test-worker-syntax-error: PASS,FLAKY test-worker-syntax-error-file: PASS,FLAKY -# https://github.com/nodejs/node/issues/24456 -test-stream-pipeline-http2: PASS,FLAKY [$system==linux] diff --git a/test/parallel/test-stream-pipeline-queued-end-in-destroy.js b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js new file mode 100644 index 00000000000000..d5e399ddda531b --- /dev/null +++ b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js @@ -0,0 +1,39 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Readable, Duplex, pipeline } = require('stream'); + +// Test that the callback for pipeline() is called even when the ._destroy() +// method of the stream places an .end() request to itself that does not +// get processed before the destruction of the stream (i.e. the 'close' event). +// Refs: https://github.com/nodejs/node/issues/24456 + +const readable = new Readable({ + read: common.mustCall(() => {}) +}); + +const duplex = new Duplex({ + write(chunk, enc, cb) { + // Simulate messages queueing up. + }, + read() {}, + destroy(err, cb) { + // Call end() from inside the destroy() method, like HTTP/2 streams + // do at the time of writing. + this.end(); + cb(err); + } +}); + +duplex.on('finished', common.mustNotCall()); + +pipeline(readable, duplex, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); +})); + +// Write one chunk of data, and destroy the stream later. +// That should trigger the pipeline destruction. +readable.push('foo'); +setImmediate(() => { + readable.destroy(); +});