Skip to content

Commit

Permalink
lib: performance improvement on readline async iterator
Browse files Browse the repository at this point in the history
Using a direct approach to create the readline async iterator
allowed an iteration over 20 to 58% faster.

**BREAKING CHANGE**: With that change, the async iteterator
obtained from the readline interface doesn't have the
property "stream" any longer. This happened because it's no
longer created through a Readable, instead, the async
iterator is created directly from the events of the readline
interface instance, so, if anyone is using that property,
this change will break their code.
Also, the Readable added a backpressure control that is
fairly compensated by the use of FixedQueue + monitoring
its size. This control wasn't really precise with readline
before, though, because it only pauses the reading of the
original stream, but the lines generated from the last
message received from it was still emitted. For example:
if the readable was paused at 1000 messages but the last one
received generated 10k lines, but no further messages were
emitted again until the queue was lower than the readable
highWaterMark. A similar  behavior still happens with the
new implementation, but the highWaterMark used is fixed: 1024,
and the original stream is resumed again only after the queue
is cleared.

Before making that change, I created a package implementing
the same concept used here to validate it. You can find it
[here](https://github.com/Farenheith/faster-readline-iterator)
if this helps anyhow.
  • Loading branch information
Thiago Santos authored and Farenheith committed Dec 22, 2021
1 parent cf69964 commit c163dc1
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 135 deletions.
121 changes: 32 additions & 89 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
ArrayPrototypeShift,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ArrayFrom,
Boolean,
Error,
ErrorCaptureStackTrace,
Expand Down Expand Up @@ -63,6 +64,7 @@ const {
ERR_UNHANDLED_ERROR
},
} = require('internal/errors');
const getLinkedMap = require('internal/linkedMap');

const {
validateAbortSignal,
Expand Down Expand Up @@ -386,30 +388,19 @@ EventEmitter.prototype.emit = function emit(type, ...args) {
if (handler === undefined)
return false;

if (typeof handler === 'function') {
const result = handler.apply(this, args);
const listeners = ArrayFrom(handler);
const len = handler.length;
for (let i = 0; i < len; ++i) {
const result = listeners[i].apply(this, args);

// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty
// penalty.
// This code is duplicated because extracting it away
// would make it non-inlineable.
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
} else {
const len = handler.length;
const listeners = arrayClone(handler);
for (let i = 0; i < len; ++i) {
const result = listeners[i].apply(this, args);

// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty.
// This code is duplicated because extracting it away
// would make it non-inlineable.
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
}
}

return true;
Expand Down Expand Up @@ -442,36 +433,29 @@ function _addListener(target, type, listener, prepend) {

if (existing === undefined) {
// Optimize the case of one listener. Don't need the extra array object.
events[type] = listener;
existing = events[type] = getLinkedMap().push(listener);
++target._eventsCount;
} else if (prepend) {
existing.unshift(listener);
} else {
if (typeof existing === 'function') {
// Adding the second element, need to change to array.
existing = events[type] =
prepend ? [listener, existing] : [existing, listener];
// If we've already got an array, just append.
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}
existing.push(listener);
}

// Check for listener leak
m = _getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
// No error code for this since it is a Warning
// eslint-disable-next-line no-restricted-syntax
const w = new Error('Possible EventEmitter memory leak detected. ' +
`${existing.length} ${String(type)} listeners ` +
`added to ${inspect(target, { depth: -1 })}. Use ` +
'emitter.setMaxListeners() to increase limit');
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
process.emitWarning(w);
}
// Check for listener leak
m = _getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
// No error code for this since it is a Warning
// eslint-disable-next-line no-restricted-syntax
const w = new Error('Possible EventEmitter memory leak detected. ' +
`${existing.length} ${String(type)} listeners ` +
`added to ${inspect(target, { depth: -1 })}. Use ` +
'emitter.setMaxListeners() to increase limit');
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
process.emitWarning(w);
}

return target;
Expand Down Expand Up @@ -564,39 +548,10 @@ EventEmitter.prototype.removeListener =
const list = events[type];
if (list === undefined)
return this;

if (list === listener || list.listener === listener) {
if (--this._eventsCount === 0)
this._events = ObjectCreate(null);
else {
if (list?.remove(listener)) {
if (list.length === 0) {
delete events[type];
if (events.removeListener)
this.emit('removeListener', type, list.listener || listener);
}
} else if (typeof list !== 'function') {
let position = -1;

for (let i = list.length - 1; i >= 0; i--) {
if (list[i] === listener || list[i].listener === listener) {
position = i;
break;
}
}

if (position < 0)
return this;

if (position === 0)
list.shift();
else {
if (spliceOne === undefined)
spliceOne = require('internal/util').spliceOne;
spliceOne(list, position);
}

if (list.length === 1)
events[type] = list[0];

if (events.removeListener !== undefined)
this.emit('removeListener', type, listener);
}
Expand Down Expand Up @@ -720,19 +675,7 @@ EventEmitter.prototype.listenerCount = listenerCount;
* @returns {number}
*/
function listenerCount(type) {
const events = this._events;

if (events !== undefined) {
const evlistener = events[type];

if (typeof evlistener === 'function') {
return 1;
} else if (evlistener !== undefined) {
return evlistener.length;
}
}

return 0;
return this._events?.[type]?.length || 0;
}

/**
Expand Down
110 changes: 110 additions & 0 deletions lib/internal/linkedMap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
const {
Map,
SymbolIterator,
} = primordials;

function push(root, value) {
const node = { value };
if (root.last) {
node.previous = root.last;
root.last.next = root.last = node;
} else {
root.last = root.first = node;
}
root.length++;

return node;
}

function unshift(value) {
const node = { value };
if (root.first) {
node.next = root.first;
root.first.previous = root.first = node;
} else {
root.last = root.first = node;
}
root.length++;

return node;
}

function shift(root) {
if (root.first) {
const { value } = root.first;
root.first = root.first.next;
root.length--;
return value;
}
}


function getLinkedMap() {
const map = new Map();
function addToMap(key, node, operation) {
let refs = map.get(key);
if (!refs) {
map.set(key, refs = { length: 0 });
}
operation(refs, node);
}
const root = { length: 0 };

return {
get length() {
return root.length;
},
push(value) {
const node = push(root, value);
addToMap(value, node, push);

return this;
},
unshift(value) {
const node = unshift(root, value);
addToMap(key, node, unshift);

return this;
},
remove(value) {
const refs = map.get(value);
if (refs) {
const result = shift(refs);
if (result.previous)
result.previous.next = result.next;
if (result.next)
result.next.previous = result.previous;
if (refs.length === 0) {
map.delete(value);
}
root.length--;
return 1;
}
return 0;
},
[SymbolIterator]() {
let node = root.first;

const iterator = {
next() {
if (!node) {
return { done: true };
}
const result = {
done: false,
value: node.value,
};
node = node.next;
return result;
},
[SymbolIterator]() {
return iterator;
}
};

return iterator;
}
}
}

module.exports = getLinkedMap;
107 changes: 107 additions & 0 deletions lib/internal/readline/eventsToAsyncIteratorFactory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
'use strict';
const {
Promise,
SymbolAsyncIterator,
ArrayPrototypeConcat,
} = primordials;
const FixedQueue = require('internal/fixed_queue');

const PAUSE_THRESHOLD = 1024;
const RESUME_THRESHOLD = 1;
const ITEM_EVENTS = ['data'];
const CLOSE_EVENTS = ['close', 'end'];
const ERROR_EVENTS = ['error'];


function waitNext(emitter, next, events) {
return new Promise((resolve, reject) => {
const resolveNext = () => {
for (let i = 0; i < events.length; i++)
emitter.off(events[i], resolveNext);
try {
resolve(next());
} catch (promiseError) {
reject(promiseError);
}
};
for (let i = 0; i < events.length; i++)
emitter.once(events[i], resolveNext);
});
}

module.exports = function eventsToAsyncIteratorFactory(readable, {
pauseThreshold = PAUSE_THRESHOLD,
resumeThreshold = RESUME_THRESHOLD,
closeEvents = CLOSE_EVENTS,
itemEvents = ITEM_EVENTS,
errorEvents = ERROR_EVENTS,
}) {
const events = ArrayPrototypeConcat(itemEvents, errorEvents, closeEvents);
const highWaterMark = RESUME_THRESHOLD;

const queue = new FixedQueue();
let done = false;
let error;
let queueSize = 0;
let paused = false;
const onError = (value) => {
turn('off');
error = value;
};
const onClose = () => {
turn('off');
done = true;
};
const onItem = (value) => {
queue.push(value);
queueSize++;
if (queueSize >= pauseThreshold) {
paused = true;
readable.pause();
}
};
function turn(onOff) {
for (let i = 0; i < closeEvents.length; i++)
readable[onOff](closeEvents[i], onClose);
for (let i = 0; i < itemEvents.length; i++)
readable[onOff](itemEvents[i], onItem);
for (let i = 0; i < itemEvents.length; i++)
readable[onOff](errorEvents[i], onError);
}

turn('on');

function next() {
if (!queue.isEmpty()) {
const value = queue.shift();
queueSize--;
if (queueSize < resumeThreshold) {
paused = false;
readable.resume();
}
return {
done: false,
value,
};
}
if (error) {
throw error;
}
if (done) {
return { done };
}
return waitNext(readable, next, events);
}

return {
next,
highWaterMark,
get isPaused() {
return paused;
},
get queueSize() {
return queueSize;
},
[SymbolAsyncIterator]() { return result; },
};
};
Loading

0 comments on commit c163dc1

Please sign in to comment.