diff --git a/doc/api/stream.md b/doc/api/stream.md index 10bd9515051340..b30cb20cc4c026 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -747,6 +747,11 @@ The listener callback will be passed a single `Error` object. ##### Event: 'readable' The `'readable'` event is emitted when there is data available to be read from @@ -1647,6 +1652,13 @@ const myReadable = new Readable({ ``` #### readable.\_read(size) + * `size` {number} Number of bytes to read asynchronously @@ -1666,6 +1678,8 @@ additional data onto the queue. *Note*: Once the `readable._read()` method has been called, it will not be called again until the [`readable.push()`][stream-push] method is called. +`readable._read()` is guaranteed to be called only once within a +synchronous execution, i.e. a microtick. The `size` argument is advisory. For implementations where a "read" is a single operation that returns data can use the `size` argument to determine how diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 21598efa65f254..eb90c28b64663e 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { stream.emit('data', chunk); - stream.read(0); } else { // update the buffer info. state.length += state.objectMode ? 1 : chunk.length; @@ -496,7 +495,11 @@ function onEofChunk(stream, state) { state.ended = true; // emit 'readable' now to make sure it gets picked up. - emitReadable(stream); + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + emitReadable_(stream); + } } // Don't emit readable right away in sync mode, because this can trigger @@ -508,16 +511,15 @@ function emitReadable(stream) { if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; - if (state.sync) - process.nextTick(emitReadable_, stream); - else - emitReadable_(stream); + process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { + var state = stream._readableState; debug('emit readable'); stream.emit('readable'); + state.needReadable = !state.flowing && !state.ended; flow(stream); } @@ -537,7 +539,7 @@ function maybeReadMore(stream, state) { function maybeReadMore_(stream, state) { var len = state.length; - while (!state.reading && !state.flowing && !state.ended && + while (!state.reading && !state.ended && state.length < state.highWaterMark) { debug('maybeReadMore read 0'); stream.read(0); @@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); + debug('dest.write', ret); if (false === ret && !increasedAwaitDrain) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write @@ -824,8 +827,8 @@ function resume(stream, state) { } function resume_(stream, state) { + debug('resume', state.reading); if (!state.reading) { - debug('resume read 0'); stream.read(0); } @@ -1087,6 +1090,7 @@ function copyFromBuffer(n, list) { function endReadable(stream) { var state = stream._readableState; + debug('endReadable', state.endEmitted); if (!state.endEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); @@ -1094,6 +1098,8 @@ function endReadable(stream) { } function endReadableNT(state, stream) { + debug('endReadableNT', state.endEmitted, state.length); + // Check that we didn't get one last unshift. if (!state.endEmitted && state.length === 0) { state.endEmitted = true; diff --git a/test/parallel/test-net-end-close.js b/test/parallel/test-net-end-close.js index 44a539a3e800a6..31c150e09c09af 100644 --- a/test/parallel/test-net-end-close.js +++ b/test/parallel/test-net-end-close.js @@ -8,9 +8,9 @@ const uv = process.binding('uv'); const s = new net.Socket({ handle: { readStart: function() { - process.nextTick(() => this.onread(uv.UV_EOF, null)); + setImmediate(() => this.onread(uv.UV_EOF, null)); }, - close: (cb) => process.nextTick(cb) + close: (cb) => setImmediate(cb) }, writable: false }); @@ -18,8 +18,12 @@ assert.strictEqual(s, s.resume()); const events = []; -s.on('end', () => events.push('end')); -s.on('close', () => events.push('close')); +s.on('end', () => { + events.push('end'); +}); +s.on('close', () => { + events.push('close'); +}); process.on('exit', () => { assert.deepStrictEqual(events, [ 'end', 'close' ]); diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js index adefad70adc20c..263e6b6801f68b 100644 --- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js +++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js @@ -3,32 +3,24 @@ const common = require('../common'); const stream = require('stream'); const assert = require('assert'); -const awaitDrainStates = [ - 1, // after first chunk before callback - 1, // after second chunk before callback - 0 // resolving chunk pushed after first chunk, awaitDrain is decreased -]; - -// A writable stream which pushes data onto the stream which pipes into it, -// but only the first time it's written to. Since it's not paused at this time, -// a second write will occur. If the pipe increases awaitDrain twice, we'll -// never get subsequent chunks because 'drain' is only emitted once. const writable = new stream.Writable({ write: common.mustCall(function(chunk, encoding, cb) { - if (chunk.length === 32 * 1024) { // first chunk - const beforePush = readable._readableState.awaitDrain; - readable.push(Buffer.alloc(34 * 1024)); // above hwm - // We should check if awaitDrain counter is increased. - const afterPush = readable._readableState.awaitDrain; - assert.strictEqual(afterPush - beforePush, 1, - 'Counter is not increased for awaitDrain'); - } - assert.strictEqual( - awaitDrainStates.shift(), readable._readableState.awaitDrain, + 0, 'State variable awaitDrain is not correct.' ); + + if (chunk.length === 32 * 1024) { // first chunk + readable.push(Buffer.alloc(34 * 1024)); // above hwm + // We should check if awaitDrain counter is increased in the next + // tick, because awaitDrain is incremented after this method finished + process.nextTick(() => { + assert.strictEqual(readable._readableState.awaitDrain, 1, + 'Counter is not increased for awaitDrain'); + }); + } + cb(); }, 3) }); diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js index 65b6b5b15a5895..5b9affc59fbbf2 100644 --- a/test/parallel/test-stream-readable-emittedReadable.js +++ b/test/parallel/test-stream-readable-emittedReadable.js @@ -10,30 +10,33 @@ const readable = new Readable({ // Initialized to false. assert.strictEqual(readable._readableState.emittedReadable, false); +const expected = [Buffer.from('foobar'), Buffer.from('quo'), null]; readable.on('readable', common.mustCall(() => { // emittedReadable should be true when the readable event is emitted assert.strictEqual(readable._readableState.emittedReadable, true); - readable.read(); + assert.deepStrictEqual(readable.read(), expected.shift()); // emittedReadable is reset to false during read() assert.strictEqual(readable._readableState.emittedReadable, false); -}, 4)); +}, 3)); // When the first readable listener is just attached, // emittedReadable should be false assert.strictEqual(readable._readableState.emittedReadable, false); -// Each one of these should trigger a readable event. +// These trigger a single 'readable', as things are batched up process.nextTick(common.mustCall(() => { readable.push('foo'); })); process.nextTick(common.mustCall(() => { readable.push('bar'); })); -process.nextTick(common.mustCall(() => { + +// these triggers two readable events +setImmediate(common.mustCall(() => { readable.push('quo'); -})); -process.nextTick(common.mustCall(() => { - readable.push(null); + process.nextTick(common.mustCall(() => { + readable.push(null); + })); })); const noRead = new Readable({ diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js index be397dc5dc5f74..7058e123f07823 100644 --- a/test/parallel/test-stream-readable-needReadable.js +++ b/test/parallel/test-stream-readable-needReadable.js @@ -38,7 +38,7 @@ asyncReadable.on('readable', common.mustCall(() => { // then we need to notify the reader on future changes. assert.strictEqual(asyncReadable._readableState.needReadable, true); } -}, 3)); +}, 2)); process.nextTick(common.mustCall(() => { asyncReadable.push('foooo'); @@ -46,8 +46,9 @@ process.nextTick(common.mustCall(() => { process.nextTick(common.mustCall(() => { asyncReadable.push('bar'); })); -process.nextTick(common.mustCall(() => { +setImmediate(common.mustCall(() => { asyncReadable.push(null); + assert.strictEqual(asyncReadable._readableState.needReadable, false); })); const flowing = new Readable({ @@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => { process.nextTick(common.mustCall(() => { slowProducer.push('foo'); -})); -process.nextTick(common.mustCall(() => { - slowProducer.push('foo'); -})); -process.nextTick(common.mustCall(() => { - slowProducer.push('foo'); -})); -process.nextTick(common.mustCall(() => { - slowProducer.push(null); + process.nextTick(common.mustCall(() => { + slowProducer.push('foo'); + process.nextTick(common.mustCall(() => { + slowProducer.push('foo'); + process.nextTick(common.mustCall(() => { + slowProducer.push(null); + })); + })); + })); })); diff --git a/test/parallel/test-stream-readable-object-multi-push-async.js b/test/parallel/test-stream-readable-object-multi-push-async.js new file mode 100644 index 00000000000000..4babfd12a27084 --- /dev/null +++ b/test/parallel/test-stream-readable-object-multi-push-async.js @@ -0,0 +1,183 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Readable } = require('stream'); + +const MAX = 42; +const BATCH = 10; + +{ + const readable = new Readable({ + objectMode: true, + read: common.mustCall(function() { + console.log('>> READ'); + fetchData((err, data) => { + if (err) { + this.destroy(err); + return; + } + + if (data.length === 0) { + console.log('pushing null'); + this.push(null); + return; + } + + console.log('pushing'); + data.forEach((d) => this.push(d)); + }); + }, Math.floor(MAX / BATCH) + 2) + }); + + let i = 0; + function fetchData(cb) { + if (i > MAX) { + setTimeout(cb, 10, null, []); + } else { + const array = []; + const max = i + BATCH; + for (; i < max; i++) { + array.push(i); + } + setTimeout(cb, 10, null, array); + } + } + + readable.on('readable', () => { + let data; + console.log('readable emitted'); + while (data = readable.read()) { + console.log(data); + } + }); + + readable.on('end', common.mustCall(() => { + assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH); + })); +} + +{ + const readable = new Readable({ + objectMode: true, + read: common.mustCall(function() { + console.log('>> READ'); + fetchData((err, data) => { + if (err) { + this.destroy(err); + return; + } + + if (data.length === 0) { + console.log('pushing null'); + this.push(null); + return; + } + + console.log('pushing'); + data.forEach((d) => this.push(d)); + }); + }, Math.floor(MAX / BATCH) + 2) + }); + + let i = 0; + function fetchData(cb) { + if (i > MAX) { + setTimeout(cb, 10, null, []); + } else { + const array = []; + const max = i + BATCH; + for (; i < max; i++) { + array.push(i); + } + setTimeout(cb, 10, null, array); + } + } + + readable.on('data', (data) => { + console.log('data emitted', data); + }); + + readable.on('end', common.mustCall(() => { + assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH); + })); +} + +{ + const readable = new Readable({ + objectMode: true, + read: common.mustCall(function() { + console.log('>> READ'); + fetchData((err, data) => { + if (err) { + this.destroy(err); + return; + } + + console.log('pushing'); + data.forEach((d) => this.push(d)); + + if (data[BATCH - 1] >= MAX) { + console.log('pushing null'); + this.push(null); + } + }); + }, Math.floor(MAX / BATCH) + 1) + }); + + let i = 0; + function fetchData(cb) { + const array = []; + const max = i + BATCH; + for (; i < max; i++) { + array.push(i); + } + setTimeout(cb, 10, null, array); + } + + readable.on('data', (data) => { + console.log('data emitted', data); + }); + + readable.on('end', common.mustCall(() => { + assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH); + })); +} + +{ + const readable = new Readable({ + objectMode: true, + read: common.mustNotCall() + }); + + readable.on('data', common.mustNotCall()); + + readable.push(null); + + let nextTickPassed = false; + process.nextTick(() => { + nextTickPassed = true; + }); + + readable.on('end', common.mustCall(() => { + assert.strictEqual(nextTickPassed, false); + })); +} + +{ + const readable = new Readable({ + objectMode: true, + read: common.mustCall() + }); + + readable.on('data', (data) => { + console.log('data emitted', data); + }); + + readable.on('end', common.mustCall()); + + setImmediate(() => { + readable.push('aaa'); + readable.push(null); + }); +} diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index e31d2dd921ce5b..0af2eeb71f2b1b 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -37,7 +37,8 @@ readable.on('readable', common.mustCall(() => { // if the stream has ended, we shouldn't be reading assert.strictEqual(state.ended, !state.reading); - if (readable.read() === null) // reached end of stream + const data = readable.read(); + if (data === null) // reached end of stream process.nextTick(common.mustCall(onStreamEnd, 1)); }, 2)); diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index d0859265428b81..68c25141aa2e53 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -306,25 +306,26 @@ const Transform = require('_stream_transform'); pt.write(Buffer.from('foog')); pt.write(Buffer.from('bark')); - assert.strictEqual(emits, 1); + assert.strictEqual(emits, 0); assert.strictEqual(pt.read(5).toString(), 'foogb'); assert.strictEqual(String(pt.read(5)), 'null'); + assert.strictEqual(emits, 0); pt.write(Buffer.from('bazy')); pt.write(Buffer.from('kuel')); - assert.strictEqual(emits, 2); + assert.strictEqual(emits, 0); assert.strictEqual(pt.read(5).toString(), 'arkba'); assert.strictEqual(pt.read(5).toString(), 'zykue'); assert.strictEqual(pt.read(5), null); pt.end(); - assert.strictEqual(emits, 3); + assert.strictEqual(emits, 1); assert.strictEqual(pt.read(5).toString(), 'l'); assert.strictEqual(pt.read(5), null); - assert.strictEqual(emits, 3); + assert.strictEqual(emits, 1); } { @@ -338,7 +339,7 @@ const Transform = require('_stream_transform'); pt.write(Buffer.from('foog')); pt.write(Buffer.from('bark')); - assert.strictEqual(emits, 1); + assert.strictEqual(emits, 0); assert.strictEqual(pt.read(5).toString(), 'foogb'); assert.strictEqual(pt.read(5), null); @@ -352,7 +353,7 @@ const Transform = require('_stream_transform'); pt.once('readable', common.mustCall(function() { assert.strictEqual(pt.read(5).toString(), 'l'); assert.strictEqual(pt.read(5), null); - assert.strictEqual(emits, 4); + assert.strictEqual(emits, 3); })); pt.end(); }));