From b9248c95775e8b93e06c61d74c02490bfe60fe28 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 23 Oct 2023 23:04:03 +0200 Subject: [PATCH] stream: readable use bitmap accessors PR-URL: https://github.com/nodejs/node/pull/50350 --- lib/internal/streams/readable.js | 58 +++++++++++++++++--------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..865e08ebffea50 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -435,7 +435,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) { function readableAddChunkUnshiftValue(stream, state, chunk) { if ((state[kState] & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - else if (state.destroyed || state.errored) + else if ((state[kState] & (kDestroyed | kErrored)) !== 0) return false; else addChunk(stream, state, chunk, true); @@ -604,7 +604,7 @@ function computeNewHighWaterMark(n) { // This function is designed to be inlinable, so please take care when making // changes to the function body. function howMuchToRead(n, state) { - if (n <= 0 || (state.length === 0 && state.ended)) + if (n <= 0 || (state.length === 0 && (state[kState] & kEnded) !== 0)) return 0; if ((state[kState] & kObjectMode) !== 0) return 1; @@ -648,7 +648,7 @@ Readable.prototype.read = function(n) { state.length >= state.highWaterMark : state.length > 0) || (state[kState] & kEnded) !== 0)) { - debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0); + debug('read: emitReadable'); if (state.length === 0 && (state[kState] & kEnded) !== 0) endReadable(this); else @@ -806,7 +806,7 @@ function emitReadable(stream) { function emitReadable_(stream) { const state = stream._readableState; debug('emitReadable_'); - if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) { + if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) { stream.emit('readable'); state[kState] &= ~kEmittedReadable; } @@ -887,7 +887,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { const state = this._readableState; if (state.pipes.length === 1) { - if (!state.multiAwaitDrain) { + if ((state[kState] & kMultiAwaitDrain) === 0) { state[kState] |= kMultiAwaitDrain; state.awaitDrainWriters = new SafeSet( state.awaitDrainWriters ? [state.awaitDrainWriters] : [], @@ -903,7 +903,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stderr; const endFn = doEnd ? onend : unpipe; - if (state.endEmitted) + if ((state[kState] & kEndEmitted) !== 0) process.nextTick(endFn); else src.once('end', endFn); @@ -962,7 +962,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (state.pipes.length === 1 && state.pipes[0] === dest) { debug('false write response, pause', 0); state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; + state[kState] &= ~kMultiAwaitDrain; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { debug('false write response, pause', state.awaitDrainWriters.size); state.awaitDrainWriters.add(dest); @@ -1034,7 +1034,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (dest.writableNeedDrain === true) { pause(); - } else if (!state.flowing) { + } else if ((state[kState] & kFlowing) === 0) { debug('pipe resume'); src.resume(); } @@ -1052,7 +1052,7 @@ function pipeOnDrain(src, dest) { if (state.awaitDrainWriters === dest) { debug('pipeOnDrain', 1); state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { + } else if ((state[kState] & kMultiAwaitDrain) !== 0) { debug('pipeOnDrain', state.awaitDrainWriters.size); state.awaitDrainWriters.delete(dest); } @@ -1107,20 +1107,20 @@ Readable.prototype.on = function(ev, fn) { if (ev === 'data') { // Update readableListening so that resume() may be a no-op // a few lines down. This is needed to support once('readable'). - state.readableListening = this.listenerCount('readable') > 0; + state[kState] |= this.listenerCount('readable') > 0 ? kReadableListening : 0; // Try start flowing on next tick if stream isn't explicitly paused. - if (state.flowing !== false) + if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) { this.resume(); + } } else if (ev === 'readable') { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - debug('on readable', state.length, state.reading); + if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) { + state[kState] |= kReadableListening | kNeedReadable | kHasFlowing; + state[kState] &= ~(kFlowing | kEmittedReadable); + debug('on readable'); if (state.length) { emitReadable(this); - } else if (!state.reading) { + } else if ((state[kState] & kReading) === 0) { process.nextTick(nReadingNextTick, this); } } @@ -1167,7 +1167,12 @@ Readable.prototype.removeAllListeners = function(ev) { function updateReadableListening(self) { const state = self._readableState; - state.readableListening = self.listenerCount('readable') > 0; + + if (self.listenerCount('readable') > 0) { + state[kState] |= kReadableListening; + } else { + state[kState] &= ~kReadableListening; + } if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) { // Flowing needs to be set to true now, otherwise @@ -1197,7 +1202,7 @@ Readable.prototype.resume = function() { // for readable, but we still have to call // resume(). state[kState] |= kHasFlowing; - if (!state.readableListening) { + if ((state[kState] & kReadableListening) === 0) { state[kState] |= kFlowing; } else { state[kState] &= ~kFlowing; @@ -1210,8 +1215,8 @@ Readable.prototype.resume = function() { }; function resume(stream, state) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; + if ((state[kState] & kResumeScheduled) === 0) { + state[kState] |= kResumeScheduled; process.nextTick(resume_, stream, state); } } @@ -1232,7 +1237,7 @@ function resume_(stream, state) { Readable.prototype.pause = function() { const state = this._readableState; debug('call pause'); - if (state.flowing !== false) { + if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) { debug('pause'); state[kState] |= kHasFlowing; state[kState] &= ~kFlowing; @@ -1572,7 +1577,7 @@ function fromList(n, state) { function endReadable(stream) { const state = stream._readableState; - debug('endReadable', (state[kState] & kEndEmitted) !== 0); + debug('endReadable'); if ((state[kState] & kEndEmitted) === 0) { state[kState] |= kEnded; process.nextTick(endReadableNT, state, stream); @@ -1580,12 +1585,11 @@ function endReadable(stream) { } function endReadableNT(state, stream) { - debug('endReadableNT', state.endEmitted, state.length); + debug('endReadableNT'); // Check that we didn't get one last unshift. - if (!state.errored && !state.closeEmitted && - !state.endEmitted && state.length === 0) { - state.endEmitted = true; + if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) { + state[kState] |= kEndEmitted; stream.emit('end'); if (stream.writable && stream.allowHalfOpen === false) {