From bf3991b40650e4b8f4fb3d243e46a173bdfc9673 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 24 Jul 2022 13:55:11 +0200 Subject: [PATCH] stream: fix 0 transform hwm backpressure PR-URL: https://github.com/nodejs/node/pull/43685 Refs: https://github.com/nodejs/node/issues/42457 Refs: https://github.com/nodejs/node/pull/43648/files Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum --- lib/internal/streams/transform.js | 27 +++++++++++++++--- .../parallel/test-stream-passthrough-drain.js | 4 ++- test/parallel/test-stream-transform-hwm0.js | 28 +++++++++++++++++++ ...st-stream-transform-split-highwatermark.js | 19 ------------- 4 files changed, 54 insertions(+), 24 deletions(-) create mode 100644 test/parallel/test-stream-transform-hwm0.js diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index fdac76e4062b4b..ddd21f0fba9122 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -65,7 +65,7 @@ const { ObjectSetPrototypeOf, - Symbol + Symbol, } = primordials; module.exports = Transform; @@ -73,6 +73,7 @@ const { ERR_METHOD_NOT_IMPLEMENTED } = require('internal/errors').codes; const Duplex = require('internal/streams/duplex'); +const { getHighWaterMark } = require('internal/streams/state'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); @@ -82,6 +83,26 @@ function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); + // TODO (ronag): This should preferably always be + // applied but would be semver-major. Or even better; + // make Transform a Readable with the Writable interface. + const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null; + if (readableHighWaterMark === 0) { + // A Duplex will buffer both on the writable and readable side while + // a Transform just wants to buffer hwm number of elements. To avoid + // buffering twice we disable buffering on the writable side. + options = { + ...options, + highWaterMark: null, + readableHighWaterMark, + // TODO (ronag): 0 is not optimal since we have + // a "bug" where we check needDrain before calling _write and not after. + // Refs: https://github.com/nodejs/node/pull/32887 + // Refs: https://github.com/nodejs/node/pull/35941 + writableHighWaterMark: options.writableHighWaterMark || 0 + }; + } + Duplex.call(this, options); // We have implemented the _read method, and done the other things @@ -164,9 +185,7 @@ Transform.prototype._write = function(chunk, encoding, callback) { if ( wState.ended || // Backwards compat. length === rState.length || // Backwards compat. - rState.length < rState.highWaterMark || - rState.highWaterMark === 0 || - rState.length === 0 + rState.length < rState.highWaterMark ) { callback(); } else { diff --git a/test/parallel/test-stream-passthrough-drain.js b/test/parallel/test-stream-passthrough-drain.js index f5c98947e21e2e..244bf874073733 100644 --- a/test/parallel/test-stream-passthrough-drain.js +++ b/test/parallel/test-stream-passthrough-drain.js @@ -1,8 +1,10 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { PassThrough } = require('stream'); const pt = new PassThrough({ highWaterMark: 0 }); pt.on('drain', common.mustCall()); -pt.write('hello'); +assert(!pt.write('hello1')); +pt.read(); pt.read(); diff --git a/test/parallel/test-stream-transform-hwm0.js b/test/parallel/test-stream-transform-hwm0.js new file mode 100644 index 00000000000000..8e8971f21fa472 --- /dev/null +++ b/test/parallel/test-stream-transform-hwm0.js @@ -0,0 +1,28 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Transform } = require('stream'); + +const t = new Transform({ + objectMode: true, highWaterMark: 0, + transform(chunk, enc, callback) { + process.nextTick(() => callback(null, chunk, enc)); + } +}); + +assert.strictEqual(t.write(1), false); +t.on('drain', common.mustCall(() => { + assert.strictEqual(t.write(2), false); + t.end(); +})); + +t.once('readable', common.mustCall(() => { + assert.strictEqual(t.read(), 1); + setImmediate(common.mustCall(() => { + assert.strictEqual(t.read(), null); + t.once('readable', common.mustCall(() => { + assert.strictEqual(t.read(), 2); + })); + })); +})); diff --git a/test/parallel/test-stream-transform-split-highwatermark.js b/test/parallel/test-stream-transform-split-highwatermark.js index 22d13fd3c3b0e3..b6255c704710ac 100644 --- a/test/parallel/test-stream-transform-split-highwatermark.js +++ b/test/parallel/test-stream-transform-split-highwatermark.js @@ -20,10 +20,6 @@ testTransform(666, 777, { writableHighWaterMark: 777, }); -// test 0 overriding defaultHwm -testTransform(0, DEFAULT, { readableHighWaterMark: 0 }); -testTransform(DEFAULT, 0, { writableHighWaterMark: 0 }); - // Test highWaterMark overriding testTransform(555, 555, { highWaterMark: 555, @@ -39,21 +35,6 @@ testTransform(555, 555, { writableHighWaterMark: 777, }); -// Test highWaterMark = 0 overriding -testTransform(0, 0, { - highWaterMark: 0, - readableHighWaterMark: 666, -}); -testTransform(0, 0, { - highWaterMark: 0, - writableHighWaterMark: 777, -}); -testTransform(0, 0, { - highWaterMark: 0, - readableHighWaterMark: 666, - writableHighWaterMark: 777, -}); - // Test undefined, null [undefined, null].forEach((v) => { testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });