Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: use bit fields for construct/destroy #50408

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ const {
isDestroyed,
isFinished,
isServerRequest,
kState,
kErrorEmitted,
kEmitClose,
kClosed,
kCloseEmitted,
kConstructed,
kDestroyed,
kAutoDestroy,
kErrored,
} = require('internal/streams/utils');

const kDestroy = Symbol('kDestroy');
Expand Down Expand Up @@ -42,7 +51,10 @@ function destroy(err, cb) {
// With duplex streams we use the writable side for state.
const s = w || r;

if (w?.destroyed || r?.destroyed) {
if (
(w && (w[kState] & kDestroyed) !== 0) ||
(r && (r[kState] & kDestroyed) !== 0)
) {
if (typeof cb === 'function') {
cb();
}
Expand All @@ -56,14 +68,14 @@ function destroy(err, cb) {
checkError(err, w, r);

if (w) {
w.destroyed = true;
w[kState] |= kDestroyed;
}
if (r) {
r.destroyed = true;
r[kState] |= kDestroyed;
}

// If still constructing then defer calling _destroy.
if (!s.constructed) {
if ((s[kState] & kConstructed) === 0) {
this.once(kDestroy, function(er) {
_destroy(this, aggregateTwoErrors(er, err), cb);
});
Expand All @@ -89,10 +101,10 @@ function _destroy(self, err, cb) {
checkError(err, w, r);

if (w) {
w.closed = true;
w[kState] |= kClosed;
}
if (r) {
r.closed = true;
r[kState] |= kClosed;
}

if (typeof cb === 'function') {
Expand Down Expand Up @@ -122,13 +134,16 @@ function emitCloseNT(self) {
const w = self._writableState;

if (w) {
w.closeEmitted = true;
w[kState] |= kCloseEmitted;
}
if (r) {
r.closeEmitted = true;
r[kState] |= kCloseEmitted;
}

if (w?.emitClose || r?.emitClose) {
if (
(w && (w[kState] & kEmitClose) !== 0) ||
(r && (r[kState] & kEmitClose) !== 0)
) {
self.emit('close');
}
}
Expand All @@ -137,15 +152,18 @@ function emitErrorNT(self, err) {
const r = self._readableState;
const w = self._writableState;

if (w?.errorEmitted || r?.errorEmitted) {
if (
(w && (w[kState] & kErrorEmitted) !== 0) ||
(r && (r[kState] & kErrorEmitted) !== 0)
) {
return;
}

if (w) {
w.errorEmitted = true;
w[kState] |= kErrorEmitted;
}
if (r) {
r.errorEmitted = true;
r[kState] |= kErrorEmitted;
}

self.emit('error', err);
Expand Down Expand Up @@ -192,20 +210,26 @@ function errorOrDestroy(stream, err, sync) {
const r = stream._readableState;
const w = stream._writableState;

if (w?.destroyed || r?.destroyed) {
if (
(w && (w[kState] ? (w[kState] & kDestroyed) !== 0 : w.destroyed)) ||
(r && (r[kState] ? (r[kState] & kDestroyed) !== 0 : r.destroyed))
) {
return this;
}

if (r?.autoDestroy || w?.autoDestroy)
if (
(r && (r[kState] & kAutoDestroy) !== 0) ||
(w && (w[kState] & kAutoDestroy) !== 0)
) {
stream.destroy(err);
else if (err) {
} else if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack; // eslint-disable-line no-unused-expressions

if (w && !w.errored) {
if (w && (w[kState] & kErrored) === 0) {
w.errored = err;
}
if (r && !r.errored) {
if (r && (r[kState] & kErrored) === 0) {
r.errored = err;
}
if (sync) {
Expand All @@ -225,10 +249,10 @@ function construct(stream, cb) {
const w = stream._writableState;

if (r) {
r.constructed = false;
r[kState] &= ~kConstructed;
}
if (w) {
w.constructed = false;
w[kState] &= ~kConstructed;
}

stream.once(kConstruct, cb);
Expand Down Expand Up @@ -256,10 +280,10 @@ function constructNT(stream) {
const s = w || r;

if (r) {
r.constructed = true;
r[kState] |= kConstructed;
}
if (w) {
w.constructed = true;
w[kState] |= kConstructed;
}

if (s.destroyed) {
Expand Down
47 changes: 25 additions & 22 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ const {
getHighWaterMark,
getDefaultHighWaterMark,
} = require('internal/streams/state');
const {
kState,
// bitfields
kObjectMode,
kErrorEmitted,
kAutoDestroy,
kEmitClose,
kDestroyed,
kClosed,
kCloseEmitted,
kErrored,
kConstructed,
kOnConstructed,
} = require('internal/streams/utils');

const {
aggregateTwoErrors,
Expand All @@ -72,9 +86,7 @@ const {
AbortError,
} = require('internal/errors');
const { validateObject } = require('internal/validators');
const { kOnConstructed } = require('internal/streams/utils');

const kState = Symbol('kState');
const FastBuffer = Buffer[SymbolSpecies];

const { StringDecoder } = require('string_decoder');
Expand All @@ -91,26 +103,17 @@ const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kDecoderValue = Symbol('kDecoderValue');
const kEncodingValue = Symbol('kEncodingValue');

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kEndEmitted = 1 << 2;
const kReading = 1 << 3;
const kConstructed = 1 << 4;
const kSync = 1 << 5;
const kNeedReadable = 1 << 6;
const kEmittedReadable = 1 << 7;
const kReadableListening = 1 << 8;
const kResumeScheduled = 1 << 9;
const kErrorEmitted = 1 << 10;
const kEmitClose = 1 << 11;
const kAutoDestroy = 1 << 12;
const kDestroyed = 1 << 13;
const kClosed = 1 << 14;
const kCloseEmitted = 1 << 15;
const kMultiAwaitDrain = 1 << 16;
const kReadingMore = 1 << 17;
const kDataEmitted = 1 << 18;
const kErrored = 1 << 19;
const kEnded = 1 << 9;
const kEndEmitted = 1 << 10;
const kReading = 1 << 11;
const kSync = 1 << 12;
const kNeedReadable = 1 << 13;
const kEmittedReadable = 1 << 14;
const kReadableListening = 1 << 15;
const kResumeScheduled = 1 << 16;
const kMultiAwaitDrain = 1 << 17;
const kReadingMore = 1 << 18;
const kDataEmitted = 1 << 19;
const kDefaultUTF8Encoding = 1 << 20;
const kDecoder = 1 << 21;
const kEncoding = 1 << 22;
Expand Down
22 changes: 22 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ const kOnConstructed = Symbol('kOnConstructed');
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');

const kState = Symbol('kState');
const kObjectMode = 1 << 0;
const kErrorEmitted = 1 << 1;
const kAutoDestroy = 1 << 2;
const kEmitClose = 1 << 3;
const kDestroyed = 1 << 4;
const kClosed = 1 << 5;
const kCloseEmitted = 1 << 6;
const kErrored = 1 << 7;
const kConstructed = 1 << 8;
Comment on lines +26 to +34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we enforce that readable/writable stream will not have a bit state with the same position?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. Suggestions?


function isReadableNodeStream(obj, strict = false) {
return !!(
obj &&
Expand Down Expand Up @@ -339,4 +350,15 @@ module.exports = {
isServerResponse,
willEmitClose,
isTransformStream,
kState,
// bitfields
kObjectMode,
kErrorEmitted,
kAutoDestroy,
kEmitClose,
kDestroyed,
kClosed,
kCloseEmitted,
kErrored,
kConstructed,
};
49 changes: 26 additions & 23 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const { kOnConstructed } = require('internal/streams/utils');

const {
addAbortSignal,
Expand All @@ -65,6 +64,20 @@ const {
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
} = require('internal/errors').codes;
const {
kState,
// bitfields
kObjectMode,
kErrorEmitted,
kAutoDestroy,
kEmitClose,
kDestroyed,
kClosed,
kCloseEmitted,
kErrored,
kConstructed,
kOnConstructed,
} = require('internal/streams/utils');

const { errorOrDestroy } = destroyImpl;

Expand All @@ -79,18 +92,8 @@ const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kWriteCbValue = Symbol('kWriteCbValue');
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
const kBufferedValue = Symbol('kBufferedValue');
const kState = Symbol('kState');

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kConstructed = 1 << 2;
const kSync = 1 << 3;
const kErrorEmitted = 1 << 4;
const kEmitClose = 1 << 5;
const kAutoDestroy = 1 << 6;
const kDestroyed = 1 << 7;
const kClosed = 1 << 8;
const kCloseEmitted = 1 << 9;

const kSync = 1 << 9;
const kFinalCalled = 1 << 10;
const kNeedDrain = 1 << 11;
const kEnding = 1 << 12;
Expand All @@ -102,16 +105,16 @@ const kPrefinished = 1 << 17;
const kAllBuffers = 1 << 18;
const kAllNoop = 1 << 19;
const kOnFinished = 1 << 20;
const kErrored = 1 << 21;
const kHasWritable = 1 << 22;
const kWritable = 1 << 23;
const kCorked = 1 << 24;
const kDefaultUTF8Encoding = 1 << 25;
const kWriteCb = 1 << 26;
const kExpectWriteCb = 1 << 27;
const kAfterWriteTickInfo = 1 << 28;
const kAfterWritePending = 1 << 29;
const kBuffered = 1 << 30;
const kHasWritable = 1 << 21;
const kWritable = 1 << 22;
const kCorked = 1 << 23;
const kDefaultUTF8Encoding = 1 << 24;
const kWriteCb = 1 << 25;
const kExpectWriteCb = 1 << 26;
const kAfterWriteTickInfo = 1 << 27;
const kAfterWritePending = 1 << 28;
const kBuffered = 1 << 29;
const kEnded = 1 << 30;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down