Skip to content

Commit 698a294

Browse files
abbshraddaleax
authored andcommitted
stream: fix readable state awaitDrain increase in recursion
PR-URL: #27572 Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 627bf59 commit 698a294

6 files changed

+101
-36
lines changed

lib/_stream_readable.js

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) {
134134
// Everything else in the universe uses 'utf8', though.
135135
this.defaultEncoding = options.defaultEncoding || 'utf8';
136136

137-
// The number of writers that are awaiting a drain event in .pipe()s
138-
this.awaitDrain = 0;
137+
// Ref the piped dest which we need a drain event on it
138+
// type: null | Writable | Set<Writable>
139+
this.awaitDrainWriters = null;
140+
this.multiAwaitDrain = false;
139141

140142
// If true, a maybeReadMore has been scheduled
141143
this.readingMore = false;
@@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
310312

311313
function addChunk(stream, state, chunk, addToFront) {
312314
if (state.flowing && state.length === 0 && !state.sync) {
313-
state.awaitDrain = 0;
315+
// Use the guard to avoid creating `Set()` repeatedly
316+
// when we have multiple pipes.
317+
if (state.multiAwaitDrain) {
318+
state.awaitDrainWriters.clear();
319+
} else {
320+
state.awaitDrainWriters = null;
321+
}
314322
stream.emit('data', chunk);
315323
} else {
316324
// Update the buffer info.
@@ -511,7 +519,11 @@ Readable.prototype.read = function(n) {
511519
n = 0;
512520
} else {
513521
state.length -= n;
514-
state.awaitDrain = 0;
522+
if (state.multiAwaitDrain) {
523+
state.awaitDrainWriters.clear();
524+
} else {
525+
state.awaitDrainWriters = null;
526+
}
515527
}
516528

517529
if (state.length === 0) {
@@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
656668
const src = this;
657669
const state = this._readableState;
658670

671+
if (state.pipes.length === 1) {
672+
if (!state.multiAwaitDrain) {
673+
state.multiAwaitDrain = true;
674+
state.awaitDrainWriters = new Set(
675+
state.awaitDrainWriters ? [state.awaitDrainWriters] : []
676+
);
677+
}
678+
}
679+
659680
state.pipes.push(dest);
660681
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);
661682

@@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
709730
// flowing again.
710731
// So, if this is awaiting a drain, then we just call it now.
711732
// If we don't know, then assume that we are waiting for one.
712-
if (ondrain && state.awaitDrain &&
733+
if (ondrain && state.awaitDrainWriters &&
713734
(!dest._writableState || dest._writableState.needDrain))
714735
ondrain();
715736
}
@@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
724745
// to get stuck in a permanently paused state if that write
725746
// also returned false.
726747
// => Check whether `dest` is still a piping destination.
727-
if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
728-
debug('false write response, pause', state.awaitDrain);
729-
state.awaitDrain++;
748+
if (!cleanedUp) {
749+
if (state.pipes.length === 1 && state.pipes[0] === dest) {
750+
debug('false write response, pause', 0);
751+
state.awaitDrainWriters = dest;
752+
state.multiAwaitDrain = false;
753+
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
754+
debug('false write response, pause', state.awaitDrainWriters.size);
755+
state.awaitDrainWriters.add(dest);
756+
}
730757
}
731758
if (!ondrain) {
732759
// When the dest drains, it reduces the awaitDrain counter
733760
// on the source. This would be more elegant with a .once()
734761
// handler in flow(), but adding and removing repeatedly is
735762
// too slow.
736-
ondrain = pipeOnDrain(src);
763+
ondrain = pipeOnDrain(src, dest);
737764
dest.on('drain', ondrain);
738765
}
739766
src.pause();
@@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
783810
return dest;
784811
};
785812

786-
function pipeOnDrain(src) {
813+
function pipeOnDrain(src, dest) {
787814
return function pipeOnDrainFunctionResult() {
788815
const state = src._readableState;
789-
debug('pipeOnDrain', state.awaitDrain);
790-
if (state.awaitDrain)
791-
state.awaitDrain--;
792-
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
816+
817+
// `ondrain` will call directly,
818+
// `this` maybe not a reference to dest,
819+
// so we use the real dest here.
820+
if (state.awaitDrainWriters === dest) {
821+
debug('pipeOnDrain', 1);
822+
state.awaitDrainWriters = null;
823+
} else if (state.multiAwaitDrain) {
824+
debug('pipeOnDrain', state.awaitDrainWriters.size);
825+
state.awaitDrainWriters.delete(dest);
826+
}
827+
828+
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
829+
EE.listenerCount(src, 'data')) {
793830
state.flowing = true;
794831
flow(src);
795832
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { PassThrough } = require('stream');
4+
5+
const encode = new PassThrough({
6+
highWaterMark: 1
7+
});
8+
9+
const decode = new PassThrough({
10+
highWaterMark: 1
11+
});
12+
13+
const send = common.mustCall((buf) => {
14+
encode.write(buf);
15+
}, 4);
16+
17+
let i = 0;
18+
const onData = common.mustCall(() => {
19+
if (++i === 2) {
20+
send(Buffer.from([0x3]));
21+
send(Buffer.from([0x4]));
22+
}
23+
}, 4);
24+
25+
encode.pipe(decode).on('data', onData);
26+
27+
send(Buffer.from([0x1]));
28+
send(Buffer.from([0x2]));

test/parallel/test-stream-pipe-await-drain-manual-resume.js

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ readable.pipe(writable);
2828

2929
readable.once('pause', common.mustCall(() => {
3030
assert.strictEqual(
31-
readable._readableState.awaitDrain,
32-
1,
33-
'Expected awaitDrain to equal 1 but instead got ' +
34-
`${readable._readableState.awaitDrain}`
31+
readable._readableState.awaitDrainWriters,
32+
writable,
33+
'Expected awaitDrainWriters to be a Writable but instead got ' +
34+
`${readable._readableState.awaitDrainWriters}`
3535
);
3636
// First pause, resume manually. The next write() to writable will still
3737
// return false, because chunks are still being buffered, so it will increase
@@ -43,10 +43,10 @@ readable.once('pause', common.mustCall(() => {
4343

4444
readable.once('pause', common.mustCall(() => {
4545
assert.strictEqual(
46-
readable._readableState.awaitDrain,
47-
1,
48-
'.resume() should not reset the counter but instead got ' +
49-
`${readable._readableState.awaitDrain}`
46+
readable._readableState.awaitDrainWriters,
47+
writable,
48+
'.resume() should not reset the awaitDrainWriters, but instead got ' +
49+
`${readable._readableState.awaitDrainWriters}`
5050
);
5151
// Second pause, handle all chunks from now on. Once all callbacks that
5252
// are currently queued up are handled, the awaitDrain drain counter should
@@ -65,10 +65,11 @@ readable.push(null);
6565

6666
writable.on('finish', common.mustCall(() => {
6767
assert.strictEqual(
68-
readable._readableState.awaitDrain,
69-
0,
70-
'awaitDrain should equal 0 after all chunks are written but instead got' +
71-
`${readable._readableState.awaitDrain}`
68+
readable._readableState.awaitDrainWriters,
69+
null,
70+
`awaitDrainWriters should be reset to null
71+
after all chunks are written but instead got
72+
${readable._readableState.awaitDrainWriters}`
7273
);
7374
// Everything okay, all chunks were written.
7475
}));

test/parallel/test-stream-pipe-await-drain-push-while-write.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ const assert = require('assert');
66
const writable = new stream.Writable({
77
write: common.mustCall(function(chunk, encoding, cb) {
88
assert.strictEqual(
9-
readable._readableState.awaitDrain,
10-
0
9+
readable._readableState.awaitDrainWriters,
10+
null,
1111
);
1212

1313
if (chunk.length === 32 * 1024) { // first chunk
1414
readable.push(Buffer.alloc(34 * 1024)); // above hwm
1515
// We should check if awaitDrain counter is increased in the next
1616
// tick, because awaitDrain is incremented after this method finished
1717
process.nextTick(() => {
18-
assert.strictEqual(readable._readableState.awaitDrain, 1);
18+
assert.strictEqual(readable._readableState.awaitDrainWriters, writable);
1919
});
2020
}
2121

test/parallel/test-stream-pipe-await-drain.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {
2424

2525
writer1.once('chunk-received', () => {
2626
assert.strictEqual(
27-
reader._readableState.awaitDrain,
27+
reader._readableState.awaitDrainWriters.size,
2828
0,
2929
'awaitDrain initial value should be 0, actual is ' +
30-
reader._readableState.awaitDrain
30+
reader._readableState.awaitDrainWriters
3131
);
3232
setImmediate(() => {
3333
// This one should *not* get through to writer1 because writer2 is not
@@ -39,10 +39,10 @@ writer1.once('chunk-received', () => {
3939
// A "slow" consumer:
4040
writer2._write = common.mustCall((chunk, encoding, cb) => {
4141
assert.strictEqual(
42-
reader._readableState.awaitDrain,
42+
reader._readableState.awaitDrainWriters.size,
4343
1,
4444
'awaitDrain should be 1 after first push, actual is ' +
45-
reader._readableState.awaitDrain
45+
reader._readableState.awaitDrainWriters
4646
);
4747
// Not calling cb here to "simulate" slow stream.
4848
// This should be called exactly once, since the first .write() call
@@ -51,10 +51,10 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
5151

5252
writer3._write = common.mustCall((chunk, encoding, cb) => {
5353
assert.strictEqual(
54-
reader._readableState.awaitDrain,
54+
reader._readableState.awaitDrainWriters.size,
5555
2,
5656
'awaitDrain should be 2 after second push, actual is ' +
57-
reader._readableState.awaitDrain
57+
reader._readableState.awaitDrainWriters
5858
);
5959
// Not calling cb here to "simulate" slow stream.
6060
// This should be called exactly once, since the first .write() call

test/parallel/test-stream2-basic.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,6 @@ class TestWriter extends EE {
355355
assert.strictEqual(v, null);
356356

357357
const w = new R();
358-
359358
w.write = function(buffer) {
360359
written = true;
361360
assert.strictEqual(ended, false);

0 commit comments

Comments
 (0)