Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v13.x backport] stream: improve writable.write() performance #32169

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 27 additions & 48 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,35 +267,8 @@ Writable.prototype.pipe = function() {
};


function writeAfterEnd(stream, cb) {
const er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}

// Checks that a user-supplied chunk is valid, especially for the particular
// mode the stream is in. Currently this means that `null` is never accepted
// and undefined/non-string values are only allowed in object mode.
function validChunk(stream, state, chunk, cb) {
var er;

if (chunk === null) {
er = new ERR_STREAM_NULL_VALUES();
} else if (typeof chunk !== 'string' && !state.objectMode) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
return true;
}

Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
var ret = false;
const isBuf = !state.objectMode && Stream._isUint8Array(chunk);

// Do not use Object.getPrototypeOf as it is slower since V8 7.3.
Expand All @@ -305,29 +278,42 @@ Writable.prototype.write = function(chunk, encoding, cb) {

if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
encoding = state.defaultEncoding;
} else {
if (!encoding)
encoding = state.defaultEncoding;
if (typeof cb !== 'function')
cb = nop;
}

if (isBuf)
encoding = 'buffer';
else if (!encoding)
encoding = state.defaultEncoding;

if (typeof cb !== 'function')
cb = nop;

let err;
if (state.ending) {
writeAfterEnd(this, cb);
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('write');
process.nextTick(cb, err);
errorOrDestroy(this, err);
} else if (isBuf || validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
err = new ERR_STREAM_DESTROYED('write');
} else if (chunk === null) {
err = new ERR_STREAM_NULL_VALUES();
} else {
if (!isBuf && !state.objectMode) {
if (typeof chunk !== 'string') {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else if (encoding !== 'buffer' && state.decodeStrings !== false) {
chunk = Buffer.from(chunk, encoding);
encoding = 'buffer';
}
}
if (err === undefined) {
state.pendingcb++;
return writeOrBuffer(this, state, chunk, encoding, cb);
}
}

return ret;
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
return false;
};

Writable.prototype.cork = function() {
Expand Down Expand Up @@ -402,13 +388,6 @@ ObjectDefineProperty(Writable.prototype, 'writableCorked', {
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!state.objectMode &&
state.decodeStrings !== false &&
encoding !== 'buffer' &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
encoding = 'buffer';
}
const len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand Down