From e1edd39e9246982574ac76752cef6afad92a0e6b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 12:12:25 +0200 Subject: [PATCH 1/2] stream: pipeline don't destroy Duplex src before 'finish' pipeline was too agressive with destroying Duplex streams which were the first argument into pipeline. Just because it's !writable does not mean that it is safe to be destroyed, unless it has also emitted 'finish'. Fixes: https://github.com/nodejs/node/issues/32955 --- lib/internal/streams/pipeline.js | 31 +++++++++++++---- test/parallel/test-stream-pipeline.js | 49 +++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index cdd5bcb791f451..041ff9337594f5 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -50,13 +50,30 @@ function destroyer(stream, reading, writing, final, callback) { return callback(); } - if (!err && reading && !writing && stream.writable) { - return callback(); - } + const wState = stream._writableState; + + const writableEnded = stream.writableEnded || + (wState && wState.ended); + const writableFinished = stream.writableFinished || + (wState && wState.finished); + + const willFinish = stream.writable || + (writableEnded && !writableFinished); + const willEnd = stream.readable; - if (err || !final || !stream.readable) { - destroyImpl.destroyer(stream, err); + if (!err) { + // First + if (reading && !writing && willFinish) { + return callback(); + } + + // Last + if (!reading && writing && willEnd) { + return callback(); + } } + + destroyImpl.destroyer(stream, err); callback(err); }); @@ -81,7 +98,9 @@ function destroyer(stream, reading, writing, final, callback) { .once('end', _destroy) .once('error', _destroy); } else { - _destroy(err); + // Do an extra tick so that 'finish' has a chance to be emitted if + // first stream is Duplex. + process.nextTick(_destroy, err); } }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b273fddfa3b613..6d9e2be5299422 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -13,6 +13,7 @@ const { const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const net = require('net'); { let finished = false; @@ -1118,3 +1119,51 @@ const { promisify } = require('util'); assert.strictEqual(closed, true); })); } + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + // 13 force destroys the socket before it has a chance to emit finish + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + // `destroy()` won't be invoked by pipeline since + // the writable side has not completed when + // the pipeline has completed. + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +} From b58909d02b756cc0f0890c8774c4ffb6ddf05af7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 13:14:07 +0200 Subject: [PATCH 2/2] fixup: nit --- lib/internal/streams/pipeline.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 041ff9337594f5..4b4a7d95338652 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -50,18 +50,18 @@ function destroyer(stream, reading, writing, final, callback) { return callback(); } - const wState = stream._writableState; + if (!err) { + const wState = stream._writableState; - const writableEnded = stream.writableEnded || - (wState && wState.ended); - const writableFinished = stream.writableFinished || - (wState && wState.finished); + const writableEnded = stream.writableEnded || + (wState && wState.ended); + const writableFinished = stream.writableFinished || + (wState && wState.finished); - const willFinish = stream.writable || - (writableEnded && !writableFinished); - const willEnd = stream.readable; + const willFinish = stream.writable || + (writableEnded && !writableFinished); + const willEnd = stream.readable; - if (!err) { // First if (reading && !writing && willFinish) { return callback();