diff --git a/lib/resource-pool.js b/lib/resource-pool.js index c908df8..edb9be2 100755 --- a/lib/resource-pool.js +++ b/lib/resource-pool.js @@ -1,4 +1,5 @@ var Registry = require("./registry"), + Semaphore = require('./semaphore'), inherits = require("inherits"), id = require("./simple-id"); /** @@ -13,6 +14,15 @@ var ResourcePool = module.exports = function(options) { ResourcePool.super.apply(this); this.min = options.min; this.max = options.max; + this.lockTimeout = options.lockTimeout? options.lockTimeout * 1000 : 0 + this.maxIdle = options.maxIdle || 0 + + // clamp the max idle to at least the min. + if (this.maxIdle !== 0 && this.maxIdle < this.min) { + this.maxIdle = this.min; + } + + var me = this; //allow bolt-on createResource providers via configuration //NOTE: any supplied options.createResource function will @@ -20,15 +30,50 @@ var ResourcePool = module.exports = function(options) { if (typeof options.createResource === "function") { this.createResource = options.createResource; } + if (typeof options.createResourceAsync === "function") { + this.createResourceAsync = options.createResourceAsync; + } //init the min number of resources if (this.min) { for (var i = 0; i < this.min; i++) { - this.add(this.createResource()); + if (this.createResourceAsync) { + this.createResourceAsync(function(err, resource) { + if (err) { + // Uh oh! + } else { + me.add(resource); + } + }); + } else { + this.add(this.createResource()); + } } } }; +var lockResource = function(pool, resource) { + resource.locked = true; + if (pool.lockTimeout > 0) { + resource.lockTimer = setTimeout(function() { + + // don't mark it as unlocked, or it could get reallocated. Just remove it from the pool and dispose it. + pool.fire('resourceTimeout', { id: resource.id }); + pool.remove(resource) + if (resource.dispose) { + resource.dispose() + } + }, pool.lockTimeout) + } +}; + +var unlockResource = function(resource) { + resource.locked = false; + if (resource.lockTimer) { + clearTimeout(resource.lockTimer); + } +}; + /** * Default implementation just returns an empty object. This makes this class useable * as a simple throttling mechanism for parallel processes. Override createResource @@ -48,15 +93,29 @@ ResourcePool.prototype.getResource = function(cb) { return !item.locked; }); if (resource) { - resource.locked = true; + lockResource(me, resource) cb(resource); } else { //if we're not yet at the max for this pool, create a new resource if (this.items.length < this.max) { - resource = this.createResource(); - resource.locked = true; - this.add(resource); - cb(resource); + var resource; + var sem = new Semaphore(function() { + lockResource(me, resource) + me.add(resource); + me.fire('resourceAllocated', {id: resource.id}); + cb(resource); + }); + sem.increment(); + if (me.createResourceAsync) { + sem.increment(); + me.createResourceAsync(function(err, res) { + resource = res; + sem.execute(); + }) + } else { + resource = this.createResource(); + } + sem.execute(); } else { //now we have to wait until one is freed up this.once("resourceReleased", function(args) { @@ -76,6 +135,9 @@ ResourcePool.prototype.getStats = function(cb) { cb({ min: me.min, max: me.max, + active: me.items.length, + locked: lockCount, + unlocked: me.items.length - lockCount, available: me.max - lockCount }) }; @@ -85,8 +147,27 @@ ResourcePool.prototype.getStats = function(cb) { * @param {Object} resource */ ResourcePool.prototype.release = function(resource) { - resource.locked = false; - this.fire("resourceReleased"); + var me = this; + unlockResource(resource); + me.fire("resourceReleased", {id: resource.id}); + + // clean up extras + me.getStats(function(stats) { + if (stats.unlocked > me.maxIdle) { + var extra = stats.unlocked - me.maxIdle + if (extra > me.min) { + for (var i = 0; i < extra; i++) { + var resource = me.find(function(item) { + return !item.locked; + }); + if (resource) { + me.remove(resource); + if (resource.dispose) resource.dispose(); + } + } + } + } + }); }; ResourcePool.prototype.dispose = function() {