Skip to content

Commit

Permalink
perf: do not put closures on the queue (#57)
Browse files Browse the repository at this point in the history
This makes the benchmarks slightly faster than version 5.0.0, and way faster than version 6.0.0
  • Loading branch information
ForbesLindesay authored Jan 20, 2021
1 parent f8ced5e commit 8fd7a02
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 62 deletions.
156 changes: 98 additions & 58 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,76 @@
'use strict';

var fakeResolvedPromise = {
then: function (fn) {
return fn();
},
};
module.exports = function throat(size, fn) {
function throatInternal(size) {
var queue = new Queue();
var s = size | 0;

function run(fn, self, args) {
var ready = size
? fakeResolvedPromise
: new Promise(function (resolve) {
queue.push(resolve);
});
if (size) {
size--;
if ((s | 0) !== 0) {
s = (s | 0) - 1;
return new Promise(function (resolve) {
resolve(fn.apply(self, args));
}).then(onFulfill, onReject);
}
return ready
.then(function () {
return new Promise(function (resolve) {
resolve(fn.apply(self, args));
});
})
.then(
function (result) {
release();
return result;
},
function (err) {
release();
throw err;
}
return new Promise(function (resolve) {
queue.push(new Delayed(resolve, fn, self, args));
}).then(runDelayed);
}
function runDelayed(d) {
try {
return Promise.resolve(d.fn.apply(d.self, d.args)).then(
onFulfill,
onReject
);
} catch (ex) {
onReject(ex);
}
}
function onFulfill(result) {
release();
return result;
}
function onReject(error) {
release();
throw error;
}
function release() {
var next = queue.shift();
if (next) {
next();
next.resolve(next);
} else {
size++;
s = (s | 0) + 1;
}
}

return run;
}

function earlyBound(size, fn) {
const run = throatInternal(size | 0);
return function () {
var args = new Array(arguments.length);
for (var i = 0; i < arguments.length; i++) {
args[i] = arguments[i];
}
return run(fn, this, args);
};
}
function lateBound(size) {
const run = throatInternal(size | 0);
return function (fn) {
if (typeof fn !== 'function') {
throw new TypeError(
'Expected throat fn to be a function but got ' + typeof fn
);
}
var args = new Array(arguments.length - 1);
for (var i = 1; i < arguments.length; i++) {
args[i - 1] = arguments[i];
}
return run(fn, this, args);
};
}
module.exports = function throat(size, fn) {
if (typeof size === 'function') {
var temp = fn;
fn = size;
Expand All @@ -57,49 +87,59 @@ module.exports = function throat(size, fn) {
);
}
if (typeof fn === 'function') {
return function () {
var args = [];
for (var i = 0; i < arguments.length; i++) {
args.push(arguments[i]);
}
return run(fn, this, args);
};
return earlyBound(size | 0, fn);
} else {
return function (fn) {
if (typeof fn !== 'function') {
throw new TypeError(
'Expected throat fn to be a function but got ' + typeof fn
);
}
var args = [];
for (var i = 1; i < arguments.length; i++) {
args.push(arguments[i]);
}
return run(fn, this, args);
};
return lateBound(size | 0);
}
};

module.exports.default = module.exports;

function Delayed(resolve, fn, self, args) {
this.resolve = resolve;
this.fn = fn;
this.self = self || null;
this.args = args;
}

var blockSize = 64;
function Queue() {
this._s1 = [];
this._s2 = [];
this._shiftBlock = this._pushBlock = new Array(blockSize);
this._pushIndex = 0;
this._shiftIndex = 0;
}

Queue.prototype.push = function (value) {
this._s1.push(value);
if (this._pushIndex === blockSize) {
this._pushIndex = 0;
this._s1[this._s1.length] = this._pushBlock = new Array(blockSize);
}
this._pushBlock[this._pushIndex++] = value;
};

Queue.prototype.shift = function () {
var s2 = this._s2;
if (s2.length === 0) {
var s1 = this._s1;
if (s1.length === 0) {
return undefined;
if (this._shiftIndex === blockSize) {
this._shiftIndex = 0;
var s2 = this._s2;
if (s2.length === 0) {
var s1 = this._s1;
if (s1.length === 0) {
return undefined;
}
this._s1 = s2;
s2 = this._s2 = s1.reverse();
}
this._s1 = s2;
s2 = this._s2 = s1.reverse();
this._shiftBlock = s2.pop();
}
if (
this._pushBlock === this._shiftBlock &&
this._pushIndex === this._shiftIndex
) {
return undefined;
}
return s2.pop();
var result = this._shiftBlock[this._shiftIndex];
this._shiftBlock[this._shiftIndex++] = null;
return result;
};
22 changes: 18 additions & 4 deletions perf.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
const throat = require('./index');

const MAX_COUNT = 1000000;
const ITERATIONS = 10;
const promises = [];
for (let i = 0; i < 1000000; i++) {
promises.push(() => new Promise(resolve => process.nextTick(resolve)));
for (let i = 0; i < MAX_COUNT; i++) {
promises.push(() => new Promise((resolve) => process.nextTick(resolve)));
}

Promise.resolve().then(async () => {
for (let amount = 10; amount <= 1000000; amount = amount * 10) {
console.log('limit=10');
for (let amount = 10; amount <= MAX_COUNT; amount = amount * 10) {
const list = promises.slice(0, amount);
console.time(amount + ' promises');
await Promise.all(list.map(throat(10, fn => fn())));
for (let i = 0; i < ITERATIONS; i++) {
await Promise.all(list.map(throat(10, (fn) => fn())));
}
console.timeEnd(amount + ' promises');
}
console.log('limit=1000000');
for (let amount = 10; amount <= MAX_COUNT; amount = amount * 10) {
const list = promises.slice(0, amount);
console.time(amount + ' promises');
for (let i = 0; i < ITERATIONS; i++) {
await Promise.all(list.map(throat(1000000, (fn) => fn())));
}
console.timeEnd(amount + ' promises');
}
});
30 changes: 30 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,36 @@ test('type errors', function () {
});
});

test('sync errors are converted to async errors', function () {
var lock = throat(1);
return Promise.all([
lock(() => {
throw new Error('whatever');
}).catch(() => true),
lock(() => {
throw new Error('whatever');
}).catch(() => true),
lock(() => {
throw new Error('whatever');
}).catch(() => true),
]).then((results) => {
assert.deepEqual(results, [true, true, true]);
});
});

test('handles loads of promises', function () {
var lock = throat(1, (i) => Promise.resolve(i));
var results = [];
var expected = [];
for (var i = 0; i < 64 * 10 + 1; i++) {
results.push(lock(i));
expected.push(i);
}
return Promise.all(results).then((results) => {
assert.deepEqual(results, expected);
});
});

async function supportsAsyncStackTraces() {
async function innerFunction() {
await new Promise((resolve) => setTimeout(resolve, 10));
Expand Down

0 comments on commit 8fd7a02

Please sign in to comment.