-
Notifications
You must be signed in to change notification settings - Fork 29.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
lib: performance improvement on readline async iterator
Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow.
- Loading branch information
1 parent
cf69964
commit a770eb9
Showing
4 changed files
with
252 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
'use strict'; | ||
const { | ||
Promise, | ||
SymbolAsyncIterator, | ||
} = primordials; | ||
|
||
const PAUSE_THRESHOLD = 1024; | ||
const RESUME_THRESHOLD = 1; | ||
const ITEM_EVENTS = ['data']; | ||
const CLOSE_EVENTS = ['close', 'end']; | ||
const ERROR_EVENTS = ['error']; | ||
|
||
let FixedQueue; | ||
let setImmediate; | ||
function waitNext(emitter, next, events) { | ||
return new Promise((resolve, reject) => { | ||
const resolveNext = () => { | ||
events.forEach((event) => emitter.off(event, resolveNext)); | ||
setImmediate(() => { | ||
try { | ||
resolve(next()); | ||
} catch (promiseError) { | ||
reject(promiseError); | ||
} | ||
}); | ||
}; | ||
events.forEach((event) => emitter.once(event, resolveNext)); | ||
}); | ||
} | ||
|
||
function turnFactory(readable, callback, onOff) { | ||
return (event) => readable[onOff](event, callback); | ||
} | ||
|
||
module.exports = function eventsToAsyncIteratorFactory(readable, { | ||
pauseThreshold = PAUSE_THRESHOLD, | ||
resumeThreshold = RESUME_THRESHOLD, | ||
closeEvents = CLOSE_EVENTS, | ||
itemEvents = ITEM_EVENTS, | ||
errorEvents = ERROR_EVENTS | ||
}) { | ||
const events = [...itemEvents, ...errorEvents, ...closeEvents]; | ||
const highWaterMark = RESUME_THRESHOLD; | ||
return function asyncIterator() { | ||
if (!FixedQueue) { | ||
FixedQueue = require('internal/fixed_queue'); | ||
setImmediate = require('timers').setImmediate; | ||
} | ||
const queue = new FixedQueue(); | ||
let done = false; | ||
let error; | ||
let queueSize = 0; | ||
let paused = false; | ||
const onError = (value) => { | ||
turnOff(); | ||
error = value; | ||
}; | ||
const onClose = () => { | ||
turnOff(); | ||
done = true; | ||
}; | ||
const onItem = (value) => { | ||
queue.push(value); | ||
queueSize++; | ||
if (queueSize >= pauseThreshold) { | ||
paused = true; | ||
readable.pause(); | ||
} | ||
}; | ||
const turnOff = () => { | ||
closeEvents.forEach(turnFactory(readable, onClose, 'off')); | ||
itemEvents.forEach(turnFactory(readable, onItem, 'off')); | ||
errorEvents.forEach(turnFactory(readable, onError, 'off')); | ||
}; | ||
itemEvents.forEach(turnFactory(readable, onItem, 'on')); | ||
errorEvents.forEach(turnFactory(readable, onError, 'on')); | ||
closeEvents.forEach(turnFactory(readable, onClose, 'on')); | ||
|
||
function next() { | ||
if (!queue.isEmpty()) { | ||
const value = queue.shift(); | ||
queueSize--; | ||
if (queueSize < resumeThreshold) { | ||
paused = false; | ||
readable.resume(); | ||
} | ||
return { | ||
done: false, | ||
value, | ||
}; | ||
} | ||
if (error) { | ||
throw error; | ||
} | ||
if (done) { | ||
return { done }; | ||
} | ||
return waitNext(readable, next, events); | ||
} | ||
|
||
const result = { | ||
next, | ||
highWaterMark, | ||
get isPaused() { | ||
return paused; | ||
}, | ||
get queueSize() { | ||
return queueSize; | ||
} | ||
}; | ||
result[SymbolAsyncIterator] = () => result; | ||
return result; | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters