Skip to content

Commit

Permalink
stream: add readableDidRead if has been read from
Browse files Browse the repository at this point in the history
Adds did read accessor used to determine whether a readable has been
read from.

PR-URL: #39589
Refs: nodejs/undici#907
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag authored and Trott committed Aug 2, 2021
1 parent 45f98fc commit ac621ff
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 19 deletions.
11 changes: 11 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,17 @@ added: v11.4.0
Is `true` if it is safe to call [`readable.read()`][stream-read], which means
the stream has not been destroyed or emitted `'error'` or `'end'`.

##### `readable.readableDidRead`
<!-- YAML
added: REPLACEME
-->

* {boolean}

Allows determining if the stream has been or is about to be read.
Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been
emitted.

##### `readable.readableEncoding`
<!-- YAML
added: v12.7.0
Expand Down
1 change: 0 additions & 1 deletion lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ function IncomingMessage(socket) {
this.statusMessage = null;
this.client = socket;

// TODO: Deprecate and remove.
this._consuming = false;
// Flag for when we decide that this message cannot possibly be
// read by the user, so there's no point continuing to handle it.
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ function resOnFinish(req, res, socket, state, server) {
// If the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req._dump() so that the
// bytes will be pulled off the wire.
if (!req.readableDidRead)
if (!req._consuming && !req._readableState.resumeScheduled)
req._dump();

// Make sure the requestTimeout is cleared before finishing.
Expand Down
19 changes: 11 additions & 8 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ function ReadableState(options, stream, isDuplex) {
// If true, a maybeReadMore has been scheduled.
this.readingMore = false;

this.didRead = false;
this.dataEmitted = false;

this.decoder = null;
this.encoding = null;
Expand Down Expand Up @@ -316,6 +316,7 @@ function addChunk(stream, state, chunk, addToFront) {
} else {
state.awaitDrainWriters = null;
}
state.dataEmitted = true;
stream.emit('data', chunk);
} else {
// Update the buffer info.
Expand Down Expand Up @@ -541,10 +542,10 @@ Readable.prototype.read = function(n) {
endReadable(this);
}

if (ret !== null)
if (ret !== null) {
state.dataEmitted = true;
this.emit('data', ret);

state.didRead = true;
}

return ret;
};
Expand Down Expand Up @@ -849,9 +850,7 @@ function pipeOnDrain(src, dest) {

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EE.listenerCount(src, 'data')) {
// TODO(ronag): Call resume() instead?
state.flowing = true;
state.didRead = true;
flow(src);
}
};
Expand Down Expand Up @@ -999,7 +998,6 @@ Readable.prototype.resume = function() {
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
state.didRead = true;
process.nextTick(resume_, stream, state);
}
}
Expand Down Expand Up @@ -1187,7 +1185,12 @@ ObjectDefineProperties(Readable.prototype, {
readableDidRead: {
enumerable: false,
get: function() {
return this._readableState.didRead;
return (
this._readableState.dataEmitted ||
this._readableState.endEmitted ||
this._readableState.errorEmitted ||
this._readableState.closeEmitted
);
}
},

Expand Down
98 changes: 89 additions & 9 deletions test/parallel/test-stream-readable-didRead.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,104 @@
'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');
const Readable = require('stream').Readable;

function noop() {}

function check(readable, data, fn) {
assert.strictEqual(readable.readableDidRead, false);
if (data === -1) {
readable.on('error', common.mustCall());
readable.on('data', common.mustNotCall());
readable.on('end', common.mustNotCall());
} else {
readable.on('error', common.mustNotCall());
if (data === -2) {
readable.on('end', common.mustNotCall());
} else {
readable.on('end', common.mustCall());
}
if (data > 0) {
readable.on('data', common.mustCallAtLeast(data));
} else {
readable.on('data', common.mustNotCall());
}
}
readable.on('close', common.mustCall());
fn();
setImmediate(() => {
assert.strictEqual(readable.readableDidRead, true);
});
}

{
const readable = new Readable({
read: () => {}
read() {
this.push(null);
}
});
check(readable, 0, () => {
readable.read();
});
}

assert.strictEqual(readable.readableDidRead, false);
readable.read();
assert.strictEqual(readable.readableDidRead, true);
{
const readable = new Readable({
read() {
this.push(null);
}
});
check(readable, 0, () => {
readable.resume();
});
}

{
const readable = new Readable({
read: () => {}
read() {
this.push(null);
}
});
check(readable, -2, () => {
readable.destroy();
});
}

assert.strictEqual(readable.readableDidRead, false);
readable.resume();
assert.strictEqual(readable.readableDidRead, true);
{
const readable = new Readable({
read() {
this.push(null);
}
});

check(readable, -1, () => {
readable.destroy(new Error());
});
}

{
const readable = new Readable({
read() {
this.push('data');
this.push(null);
}
});

check(readable, 1, () => {
readable.on('data', noop);
});
}

{
const readable = new Readable({
read() {
this.push('data');
this.push(null);
}
});

check(readable, 1, () => {
readable.on('data', noop);
readable.off('data', noop);
});
}

0 comments on commit ac621ff

Please sign in to comment.