Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: improve performance for sync write finishes #30710

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions benchmark/streams/writable-manywrites.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ const common = require('../common');
const Writable = require('stream').Writable;

const bench = common.createBenchmark(main, {
n: [2e6]
n: [2e6],
sync: ['yes', 'no']
});

function main({ n }) {
function main({ n, sync }) {
const b = Buffer.allocUnsafe(1024);
const s = new Writable();
sync = sync === 'yes';
s._write = function(chunk, encoding, cb) {
cb();
if (sync)
cb();
else
process.nextTick(cb);
};

bench.start();
Expand Down
33 changes: 28 additions & 5 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ function WritableState(options, stream, isDuplex) {
// The amount that is being written when _write is called.
this.writelen = 0;

// Storage for data passed to the afterWrite() callback in case of
// synchronous _write() completion.
this.afterWriteTickInfo = null;

this.bufferedRequest = null;
this.lastBufferedRequest = null;

Expand Down Expand Up @@ -498,22 +502,41 @@ function onwrite(stream, er) {
}

if (sync) {
process.nextTick(afterWrite, stream, state, cb);
// It is a common case that the callback passed to .write() is always
// the same. In that case, we do not schedule a new nextTick(), but rather
// just increase a counter, to improve performance and avoid memory
// allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
Copy link
Contributor

@mscdex mscdex Nov 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any difference in just using afterWrite directly here (process.nextTick(afterWrite, stream, ...))?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mscdex We need to allocate an object anyway so that we can modify count later, so that’s why it’s not just spreading the arguments right now

}
} else {
afterWrite(stream, state, cb);
afterWrite(stream, state, 1, cb);
}
}
}

function afterWrite(stream, state, cb) {
function afterWriteTick({ stream, state, count, cb }) {
Copy link
Member

@ronag ronag Nov 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might clear the wrong object. I think clearing the count and cb of the passed object is safer then modifying state?

function afterWriteTick(info) {
  const { stream, state, count, cb } = info;
  info.cb = null;
  return afterWrite(stream, state, count, cb);

This would also allow reusing the object and avoiding allocations:

if (!state.afterWriteTickInfo || state.afterWriteTickInfo.cb) {
  state.afterWriteTickInfo = { stream, state, cb, count: 1 };
} else {
  state.afterWriteTickInfo.cb = cb;
  state.afterWriteTickInfo.count = 1;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it matter though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is for the row below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronag So … the effect of setting afterWriteTickInfo to null is that the next time the code above is reached, a new process.nextTick() call with a new afterWriteTickInfo object is made. That’s always safe, right?

I think setting .cb to null would have the same effect, and .count is cleared anyway. I can do that instead, if you prefer, although it might screw with the map/hidden class of afterWriteTickInfo, as .cb is always a function right now.

Copy link
Member

@ronag ronag Nov 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more thinking of the case where you have two different cbs, e.g.

write('a', cba) // schedule tick a
write('b', cbb) // clear info a, schedule tick b

// ...

// tick a
// clear info b

// tick b
// clear nothing

The a tick will actually clear the info for the b tick.

Probably not a problem, but maybe a little weird... I don't have a strong opinion if you think it's fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might screw with the map/hidden class

Oh, I didn't know that null could cause a problems with that once it's been a function type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The a tick will actually clear the info for the b tick.

Probably not a problem, but maybe a little weird... I don't have a strong opinion if you think it's fine.

Yeah, I think that’s fine, because it would only make a difference if there’s a write('b', cbb) inside cba(), and that seems like a somewhat unlikely scenario, and even then it would only affect performance, not behaviour.

state.afterWriteTickInfo = null;
return afterWrite(stream, state, count, cb);
}

function afterWrite(stream, state, count, cb) {
const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
state.needDrain;
if (needDrain) {
state.needDrain = false;
stream.emit('drain');
}
state.pendingcb--;
cb();

while (count-- > 0) {
state.pendingcb--;
cb();
}

finishMaybe(stream, state);
}

Expand Down
30 changes: 30 additions & 0 deletions test/parallel/test-stream-writable-samecb-singletick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';
const common = require('../common');
const { Console } = require('console');
const { Writable } = require('stream');
const async_hooks = require('async_hooks');

// Make sure that repeated calls to console.log(), and by extension
// stream.write() for the underlying stream, allocate exactly 1 tick object.
// At the time of writing, that is enough to ensure a flat memory profile
// from repeated console.log() calls, rather than having callbacks pile up
// over time, assuming that data can be written synchronously.
// Refs: https://github.com/nodejs/node/issues/18013
// Refs: https://github.com/nodejs/node/issues/18367

const checkTickCreated = common.mustCall();

async_hooks.createHook({
init(id, type, triggerId, resoure) {
if (type === 'TickObject') checkTickCreated();
}
}).enable();

const console = new Console(new Writable({
write: common.mustCall((chunk, encoding, cb) => {
cb();
}, 100)
}));

for (let i = 0; i < 100; i++)
console.log(i);