Skip to content

Commit

Permalink
Merge pull request #289 from skrenek/master
Browse files Browse the repository at this point in the history
Henry and the Great Gorilla
  • Loading branch information
skrenek committed Aug 11, 2014
2 parents e04020a + 7cfea93 commit d5c04bf
Showing 1 changed file with 89 additions and 8 deletions.
97 changes: 89 additions & 8 deletions lib/resource-pool.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var Registry = require("./registry"),
Semaphore = require('./semaphore'),
inherits = require("inherits"),
id = require("./simple-id");
/**
Expand All @@ -13,22 +14,66 @@ var ResourcePool = module.exports = function(options) {
ResourcePool.super.apply(this);
this.min = options.min;
this.max = options.max;
this.lockTimeout = options.lockTimeout? options.lockTimeout * 1000 : 0
this.maxIdle = options.maxIdle || 0

// clamp the max idle to at least the min.
if (this.maxIdle !== 0 && this.maxIdle < this.min) {
this.maxIdle = this.min;
}

var me = this;

//allow bolt-on createResource providers via configuration
//NOTE: any supplied options.createResource function will
//override any class-level implementation
if (typeof options.createResource === "function") {
this.createResource = options.createResource;
}
if (typeof options.createResourceAsync === "function") {
this.createResourceAsync = options.createResourceAsync;
}

//init the min number of resources
if (this.min) {
for (var i = 0; i < this.min; i++) {
this.add(this.createResource());
if (this.createResourceAsync) {
this.createResourceAsync(function(err, resource) {
if (err) {
// Uh oh!
} else {
me.add(resource);
}
});
} else {
this.add(this.createResource());
}
}
}
};

var lockResource = function(pool, resource) {
resource.locked = true;
if (pool.lockTimeout > 0) {
resource.lockTimer = setTimeout(function() {

// don't mark it as unlocked, or it could get reallocated. Just remove it from the pool and dispose it.
pool.fire('resourceTimeout', { id: resource.id });
pool.remove(resource)
if (resource.dispose) {
resource.dispose()
}
}, pool.lockTimeout)
}
};

var unlockResource = function(resource) {
resource.locked = false;
if (resource.lockTimer) {
clearTimeout(resource.lockTimer);
}
};

/**
* Default implementation just returns an empty object. This makes this class useable
* as a simple throttling mechanism for parallel processes. Override createResource
Expand All @@ -48,15 +93,29 @@ ResourcePool.prototype.getResource = function(cb) {
return !item.locked;
});
if (resource) {
resource.locked = true;
lockResource(me, resource)
cb(resource);
} else {
//if we're not yet at the max for this pool, create a new resource
if (this.items.length < this.max) {
resource = this.createResource();
resource.locked = true;
this.add(resource);
cb(resource);
var resource;
var sem = new Semaphore(function() {
lockResource(me, resource)
me.add(resource);
me.fire('resourceAllocated', {id: resource.id});
cb(resource);
});
sem.increment();
if (me.createResourceAsync) {
sem.increment();
me.createResourceAsync(function(err, res) {
resource = res;
sem.execute();
})
} else {
resource = this.createResource();
}
sem.execute();
} else {
//now we have to wait until one is freed up
this.once("resourceReleased", function(args) {
Expand All @@ -76,6 +135,9 @@ ResourcePool.prototype.getStats = function(cb) {
cb({
min: me.min,
max: me.max,
active: me.items.length,
locked: lockCount,
unlocked: me.items.length - lockCount,
available: me.max - lockCount
})
};
Expand All @@ -85,8 +147,27 @@ ResourcePool.prototype.getStats = function(cb) {
* @param {Object} resource
*/
ResourcePool.prototype.release = function(resource) {
resource.locked = false;
this.fire("resourceReleased");
var me = this;
unlockResource(resource);
me.fire("resourceReleased", {id: resource.id});

// clean up extras
me.getStats(function(stats) {
if (stats.unlocked > me.maxIdle) {
var extra = stats.unlocked - me.maxIdle
if (extra > me.min) {
for (var i = 0; i < extra; i++) {
var resource = me.find(function(item) {
return !item.locked;
});
if (resource) {
me.remove(resource);
if (resource.dispose) resource.dispose();
}
}
}
}
});
};

ResourcePool.prototype.dispose = function() {
Expand Down

0 comments on commit d5c04bf

Please sign in to comment.