Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: lazy allocate back pressure buffer #50013

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const kErroredValue = Symbol('kErroredValue');
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kWriteCbValue = Symbol('kWriteCbValue');
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
const kBufferedValue = Symbol('kBufferedValue');

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
Expand Down Expand Up @@ -108,7 +109,7 @@ const kWriteCb = 1 << 26;
const kExpectWriteCb = 1 << 27;
const kAfterWriteTickInfo = 1 << 28;
const kAfterWritePending = 1 << 29;
const kHasBuffer = 1 << 30;
const kBuffered = 1 << 30;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -270,6 +271,21 @@ ObjectDefineProperties(WritableState.prototype, {
}
},
},

buffered: {
__proto__: null,
enumerable: false,
get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; },
set(value) {
this[kBufferedValue] = value;
if (value) {
this.state |= kBuffered;
} else {
this.state &= ~kBuffered;
}
},
},

});

function WritableState(options, stream, isDuplex) {
Expand Down Expand Up @@ -338,20 +354,20 @@ function WritableState(options, stream, isDuplex) {
}

function resetBuffer(state) {
state.buffered = [];
state[kBufferedValue] = null;
state.bufferedIndex = 0;
state.state |= kAllBuffers | kAllNoop;
state.state &= ~kHasBuffer;
state.state &= ~kBuffered;
}

WritableState.prototype.getBuffer = function getBuffer() {
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
};

ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
__proto__: null,
get() {
return this.buffered.length - this.bufferedIndex;
return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex;
},
});

Expand Down Expand Up @@ -518,8 +534,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.length += len;

if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
state.buffered.push({ chunk, encoding, callback });
state.state |= kHasBuffer;
if ((state.state & kBuffered) === 0) {
state.state |= kBuffered;
state[kBufferedValue] = [];
}

state[kBufferedValue].push({ chunk, encoding, callback });
Comment on lines +537 to +542
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ((state.state & kBuffered) === 0) {
  state.state |= kBuffered;
  state[kBufferedValue] = [{ chunk, encoding, callback }];
} else {
  state[kBufferedValue].push({ chunk, encoding, callback });
}

Just curious, will there be any observable performance improvements?

if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
}
Expand Down Expand Up @@ -611,7 +631,7 @@ function onwrite(stream, er) {
onwriteError(stream, state, er, cb);
}
} else {
if ((state.state & kHasBuffer) !== 0) {
if ((state.state & kBuffered) !== 0) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -687,11 +707,13 @@ function errorBuffer(state) {
return;
}

for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state.buffered[n];
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
if ((state.state & kBuffered) !== 0) {
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state[kBufferedValue][n];
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}
}


Expand All @@ -702,13 +724,12 @@ function errorBuffer(state) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer ||
(state.state & kConstructed) === 0) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) {
return;
}

const objectMode = (state.state & kObjectMode) !== 0;
const { buffered, bufferedIndex } = state;
const { [kBufferedValue]: buffered, bufferedIndex } = state;
const bufferedLength = buffered.length - bufferedIndex;

if (!bufferedLength) {
Expand Down Expand Up @@ -838,10 +859,9 @@ function needFinish(state) {
kWriting |
kErrorEmitted |
kCloseEmitted |
kErrored
)) === (kEnding | kConstructed) &&
state.length === 0 &&
state.buffered.length === 0);
kErrored |
kBuffered
)) === (kEnding | kConstructed) && state.length === 0);
}

function callFinal(stream, state) {
Expand Down Expand Up @@ -1083,9 +1103,7 @@ Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;

// Invoke pending callbacks.
if ((state.state & kDestroyed) === 0 &&
(state.bufferedIndex < state.buffered.length ||
(state.state & kOnFinished) !== 0)) {
if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) {
process.nextTick(errorBuffer, state);
}

Expand Down