Skip to content

Commit

Permalink
stream: make sure 'readable' is emitted before ending the stream
Browse files Browse the repository at this point in the history
Fixes: #25810

PR-URL: #26059
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
mcollina committed Mar 6, 2019
1 parent d38cd82 commit e95e7f9
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 33 deletions.
18 changes: 5 additions & 13 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,20 +505,12 @@ function onEofChunk(stream, state) {
}
}
state.ended = true;
state.needReadable = false;

if (state.sync) {
// If we are sync, wait until next tick to emit the data.
// Otherwise we risk emitting data in the flow()
// the readable code triggers during a read() call
emitReadable(stream);
} else {
// Emit 'readable' now to make sure it gets picked up.
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
emitReadable_(stream);
}
}
// We are not protecting if emittedReadable = true,
// so 'readable' gets scheduled anyway.
state.emittedReadable = true;
process.nextTick(emitReadable_, stream);
}

// Don't emit readable right away in sync mode, because this can trigger
Expand Down
3 changes: 0 additions & 3 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ function Transform(options) {
writeencoding: null
};

// Start out asking for a readable event once data is transformed.
this._readableState.needReadable = true;

// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http-readable-data-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const server = http.createServer((req, res) => {
};

const expectedData = [helloWorld, helloAgainLater];
const expectedRead = [helloWorld, null, helloAgainLater, null];
const expectedRead = [helloWorld, null, helloAgainLater, null, null];

const req = http.request(opts, (res) => {
res.on('error', common.mustNotCall());
Expand All @@ -42,7 +42,7 @@ const server = http.createServer((req, res) => {
assert.strictEqual(data, expectedRead.shift());
next();
} while (data !== null);
}, 2));
}, 3));

res.setEncoding('utf8');
res.on('data', common.mustCall((data) => {
Expand Down
146 changes: 146 additions & 0 deletions test/parallel/test-stream-readable-emit-readable-short-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use strict';

const common = require('../common');
const stream = require('stream');
const assert = require('assert');

{
const r = new stream.Readable({
read: common.mustCall(function() {
this.push('content');
this.push(null);
})
});

const t = new stream.Transform({
transform: common.mustCall(function(chunk, encoding, callback) {
this.push(chunk);
return callback();
}),
flush: common.mustCall(function(callback) {
return callback();
})
});

r.pipe(t);
t.on('readable', common.mustCall(function() {
while (true) {
const chunk = t.read();
if (!chunk)
break;

assert.strictEqual(chunk.toString(), 'content');
}
}, 2));
}

{
const t = new stream.Transform({
transform: common.mustCall(function(chunk, encoding, callback) {
this.push(chunk);
return callback();
}),
flush: common.mustCall(function(callback) {
return callback();
})
});

t.end('content');

t.on('readable', common.mustCall(function() {
while (true) {
const chunk = t.read();
if (!chunk)
break;
assert.strictEqual(chunk.toString(), 'content');
}
}, 2));
}

{
const t = new stream.Transform({
transform: common.mustCall(function(chunk, encoding, callback) {
this.push(chunk);
return callback();
}),
flush: common.mustCall(function(callback) {
return callback();
})
});

t.write('content');
t.end();

t.on('readable', common.mustCall(function() {
while (true) {
const chunk = t.read();
if (!chunk)
break;
assert.strictEqual(chunk.toString(), 'content');
}
}, 2));
}

{
const t = new stream.Readable({
read() {
}
});

t.on('readable', common.mustCall(function() {
while (true) {
const chunk = t.read();
if (!chunk)
break;
assert.strictEqual(chunk.toString(), 'content');
}
}, 2));

t.push('content');
t.push(null);
}

{
const t = new stream.Readable({
read() {
}
});

t.on('readable', common.mustCall(function() {
while (true) {
const chunk = t.read();
if (!chunk)
break;
assert.strictEqual(chunk.toString(), 'content');
}
}, 2));

process.nextTick(() => {
t.push('content');
t.push(null);
});
}

{
const t = new stream.Transform({
transform: common.mustCall(function(chunk, encoding, callback) {
this.push(chunk);
return callback();
}),
flush: common.mustCall(function(callback) {
return callback();
})
});

t.on('readable', common.mustCall(function() {
while (true) {
const chunk = t.read();
if (!chunk)
break;
assert.strictEqual(chunk.toString(), 'content');
}
}, 2));

t.write('content');
t.end();
}
13 changes: 12 additions & 1 deletion test/parallel/test-stream-readable-emittedReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,23 @@ const noRead = new Readable({
read: () => {}
});

noRead.on('readable', common.mustCall(() => {
noRead.once('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(noRead._readableState.emittedReadable, true);
noRead.read(0);
// emittedReadable is not reset during read(0)
assert.strictEqual(noRead._readableState.emittedReadable, true);

noRead.on('readable', common.mustCall(() => {
// The second 'readable' is emitted because we are ending

// emittedReadable should be true when the readable event is emitted
assert.strictEqual(noRead._readableState.emittedReadable, false);
noRead.read(0);
// emittedReadable is not reset during read(0)
assert.strictEqual(noRead._readableState.emittedReadable, false);

}));
}));

noRead.push('foo');
Expand Down
10 changes: 6 additions & 4 deletions test/parallel/test-stream-readable-needReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => {
// When the readable event fires, needReadable is reset.
assert.strictEqual(readable._readableState.needReadable, false);
readable.read();
}));
}, 2));

// If a readable listener is attached, then a readable event is needed.
assert.strictEqual(readable._readableState.needReadable, true);
Expand Down Expand Up @@ -74,12 +74,14 @@ const slowProducer = new Readable({
});

slowProducer.on('readable', common.mustCall(() => {
if (slowProducer.read(8) === null) {
const chunk = slowProducer.read(8);
const state = slowProducer._readableState;
if (chunk === null) {
// The buffer doesn't have enough data, and the stream is not need,
// we need to notify the reader when data arrives.
assert.strictEqual(slowProducer._readableState.needReadable, true);
assert.strictEqual(state.needReadable, true);
} else {
assert.strictEqual(slowProducer._readableState.needReadable, false);
assert.strictEqual(state.needReadable, false);
}
}, 4));

Expand Down
10 changes: 6 additions & 4 deletions test/parallel/test-stream-readable-reading-readingMore.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const Readable = require('stream').Readable;
assert.strictEqual(state.reading, false);
}

const expectedReadingMore = [true, false];
const expectedReadingMore = [true, false, false];
readable.on('readable', common.mustCall(() => {
// There is only one readingMore scheduled from on('data'),
// after which everything is governed by the .read() call
Expand All @@ -40,10 +40,12 @@ const Readable = require('stream').Readable;
// If the stream has ended, we shouldn't be reading
assert.strictEqual(state.ended, !state.reading);

const data = readable.read();
if (data === null) // reached end of stream
// consume all the data
while (readable.read() !== null) {}

if (expectedReadingMore.length === 0) // reached end of stream
process.nextTick(common.mustCall(onStreamEnd, 1));
}, 2));
}, 3));

readable.on('end', common.mustCall(onStreamEnd));
readable.push('pushed');
Expand Down
7 changes: 5 additions & 2 deletions test/parallel/test-stream2-httpclient-response-end.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ const server = http.createServer(function(req, res) {
let data = '';
res.on('readable', common.mustCall(function() {
console.log('readable event');
data += res.read();
}));
let chunk;
while ((chunk = res.read()) !== null) {
data += chunk;
}
}, 2));
res.on('end', common.mustCall(function() {
console.log('end event');
assert.strictEqual(msg, data);
Expand Down
13 changes: 9 additions & 4 deletions test/parallel/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,16 @@ const Transform = require('_stream_transform');

pt.end();

assert.strictEqual(emits, 1);
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);
// The next readable is emitted on the next tick.
assert.strictEqual(emits, 0);

assert.strictEqual(emits, 1);
process.on('nextTick', function() {
assert.strictEqual(emits, 1);
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);

assert.strictEqual(emits, 1);
});
}

{
Expand Down

0 comments on commit e95e7f9

Please sign in to comment.