Skip to content

Commit

Permalink
feat(plugin): use amqp10's new plugin behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Mar 28, 2016
1 parent a4ac30f commit d98fa28
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 101 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ created or a cached copy will be returned.
## usage
```
'use strict';
var LinkCache = require('../'),
AMQPClient = require('amqp10').Client;
var amqp = require('amqp'),
linkCache = require('amqp10-link-cache');
var client = new AMQPClient();
client.connect('amqp://localhost')
.then(function() { return new LinkCache(client); });
.then(function(cache) {
// e.g. pass cache to all of your API endpoints _as_ the 'client'
// plug-in the link cache, with optional parameters
amqp.use(linkCache({ ttl: 5000 ));
return Promise.all([ cache.crateSender('amq.topic'), cache.crateSender('amq.topic') ]);
var client = new amqp.Client();
client.connect('amqp://localhost')
.then(function() {
return Promise.all([ client.crateSender('amq.topic'), client.crateSender('amq.topic') ]);
})
.spread(function(sender1, sender2) {
// sender1 === sender2
Expand Down
107 changes: 58 additions & 49 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,84 +1,93 @@
'use strict';
var hash = require('object-hash');
var Promise = require('bluebird'),
hash = require('object-hash');

function LinkCache(client, options) {
options = options || {};
this._client = client;
this._links = {};
this._ttl = options.ttl || 60000;
this._purgeTimeout = null;

client.on('disconnected', function() {});
}

LinkCache.prototype.createSender = function(address, options) {
return this._createLink(address, options, 'sender', 'createSender');
};

LinkCache.prototype.createReceiver = function(address, options) {
return this._createLink(address, options, 'receiver', 'createReceiver');
};

LinkCache.prototype.createSenderStream = function(address, options) {
return this._createLink(address, options, 'senderStream', 'createSenderStream');
};
var links = {};
var ttl = 60000;
var purgeTimeout = null;

LinkCache.prototype.createReceiverStream = function(address, options) {
return this._createLink(address, options, 'receiverStream', 'createReceiverStream');
};

LinkCache.prototype._createLink = function(address, options, type, method) {
function createLink(address, options, type, method) {
var linkHash = hash({ type: type, address: address, options: options });
if (this._links.hasOwnProperty(linkHash)) {
var entry = this._links[linkHash];
if (links.hasOwnProperty(linkHash)) {
var entry = links[linkHash];
if (!entry.hasOwnProperty('link'))
return entry;

this._links[linkHash].stamp = Date.now();
return Promise.resolve(this._links[linkHash].link);
links[linkHash].stamp = Date.now();
return Promise.resolve(links[linkHash].link);
}

var self = this;
var linkPromise = this._client[method](address, options)
var linkPromise = method(address, options)
.then(function(link) {
link.once('detached', function() {
if (self._links.hasOwnProperty(linkHash))
delete self._links[linkHash];
if (links.hasOwnProperty(linkHash))
delete links[linkHash];
});

self._links[linkHash] = { link: link, stamp: Date.now() };
if (!self._purgeTimeout)
self._purgeTimeout = setTimeout(self._purgeLinks.bind(self), self._ttl);
links[linkHash] = { link: link, stamp: Date.now() };
if (!purgeTimeout)
purgeTimeout = setTimeout(purgeLinks, ttl);
return link;
});


this._links[linkHash] = linkPromise;
links[linkHash] = linkPromise;
return linkPromise;
};
}

LinkCache.prototype._purgeLinks = function() {
function purgeLinks() {
var now = Date.now();
var _keys = Object.keys(this._links),
var _keys = Object.keys(links),
expired = [], live = 0;

purgeTimeout = null;
for (var i = 0, ii = _keys.length; i < ii; ++i) {
if (now - this._links[_keys[i]].stamp >= this._ttl) {
if (now - links[_keys[i]].stamp >= ttl) {
expired.push(_keys[i]);
} else {
live++;
}
}

for (var j = 0, jj = expired.length; j < jj; ++j) {
var cacheEntry = this._links[expired[j]];
delete this._links[_keys[j]];
var cacheEntry = links[expired[j]];
delete links[expired[j]];
cacheEntry.link.detach();
}

if (live && !this._purgeTimeout) {
this._purgeTimeout = setTimeout(this._purgeLinks.bind(this), this._ttl);
if (live) {
purgeTimeout = setTimeout(purgeLinks, ttl);
}
};
}

module.exports = LinkCache;
module.exports = function(options) {
// NOTE: we need to re-initialize these every time the plugin is called
options = options || {};
links = {};
ttl = options.ttl || 60000;
if (!!purgeTimeout) clearTimeout(purgeTimeout);
purgeTimeout = null;

return function(Client) {
var _createSender = Client.prototype.createSender,
_createReceiver = Client.prototype.createReceiver,
_createSenderStream = Client.prototype.createSenderStream,
_createReceiverStream = Client.prototype.createReceiverStream;

Client.prototype.createSender = function(address, options) {
return createLink(address, options, 'sender', _createSender.bind(this));
};

Client.prototype.createReceiver = function(address, options) {
return createLink(address, options, 'receiver', _createReceiver.bind(this));
};

Client.prototype.createSenderStream = function(address, options) {
return createLink(address, options, 'senderStream', _createSenderStream.bind(this));
};

Client.prototype.createReceiverStream = function(address, options) {
return createLink(address, options, 'receiverStream', _createReceiverStream.bind(this));
};
};
};
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "amqp10-link-cache",
"version": "0.2.0",
"description": "a link caching mechanism for the amqp10 module",
"description": "A link caching plugin for the amqp10 module",
"main": "index.js",
"engines": {
"node": ">= 0.10"
Expand All @@ -24,11 +24,11 @@
"url": "https://github.com/mbroadst/amqp10-link-cache.git"
},
"dependencies": {
"bluebird": "^3.3.4",
"object-hash": "^1.1.2"
},
"devDependencies": {
"amqp10": "^3.1.0",
"bluebird": "^3.3.4",
"amqp10": "^3.1.1",
"chai": "^3.5.0",
"jshint": "^2.9.1",
"mocha": "^2.4.5"
Expand Down
54 changes: 13 additions & 41 deletions test/link-cache.test.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
'use strict';
var Promise = require('bluebird'),
LinkCache = require('../'),
AMQPClient = require('amqp10').Client,
amqp = require('amqp10'),
linkCache = require('..'),
AMQPClient = amqp.Client,
config = require('./config'),
expect = require('chai').expect;

var test = {};
describe('LinkCache', function() {
describe('basic behavior', function() {
before(function() { amqp.use(linkCache()); });
beforeEach(function() {
if (!!test.client) delete test.client;
if (!!test.cache) delete test.cache;

test.client = new AMQPClient();
test.cache = new LinkCache(test.client);
});

afterEach(function() {
return test.client.disconnect().then(function() {
delete test.client;
delete test.cache;
});
return test.client.disconnect()
.then(function() { delete test.client; });
});

[
Expand All @@ -32,9 +29,9 @@ describe('LinkCache', function() {
return test.client.connect(config.address)
.then(function() {
return Promise.all([
test.cache[testCase.method]('amq.topic'),
test.cache[testCase.method]('amq.topic'),
test.cache[testCase.method]('amq.topic')
test.client[testCase.method]('amq.topic'),
test.client[testCase.method]('amq.topic'),
test.client[testCase.method]('amq.topic')
]);
})
.spread(function(link1, link2, link3) {
Expand All @@ -48,9 +45,9 @@ describe('LinkCache', function() {
return test.client.connect(config.address)
.then(function() {
return Promise.all([
test.cache[testCase.method]('amq.topic'),
test.cache[testCase.method]('amq.topic', { attach: { receiverSettleMode: false } }),
test.cache[testCase.method]('amq.topic/testing')
test.client[testCase.method]('amq.topic'),
test.client[testCase.method]('amq.topic', { attach: { receiverSettleMode: false } }),
test.client[testCase.method]('amq.topic/testing')
]);
})
.spread(function(link1, link2, link3) {
Expand All @@ -60,29 +57,4 @@ describe('LinkCache', function() {
});
});
});

it('should purge links after a given interval', function() {
test.cache = new LinkCache(test.client, { ttl: 1 });
return test.client.connect(config.address)
.then(function() { return test.cache.createSender('amq.topic'); })
.delay(5)
.then(function() {
var linkCount = Object.keys(test.cache._links).length;
expect(linkCount).to.equal(0);
});
});

it('should purge links after a given interval (2)', function() {
test.cache = new LinkCache(test.client, { ttl: 1 });

var sender1, sender2;
return test.client.connect(config.address)
.then(function() { return test.cache.createSender('amq.topic'); })
.tap(function(sender) { sender1 = sender; })
.delay(5)
.then(function() { return test.cache.createSender('amq.topic'); })
.tap(function(sender) { sender2 = sender; })
.then(function() { expect(sender1).to.not.eql(sender2); });
});

});
44 changes: 44 additions & 0 deletions test/purge.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';
var amqp = require('amqp10'),
linkCache = require('..'),
AMQPClient = amqp.Client,
config = require('./config'),
expect = require('chai').expect;

var test = {};
describe('purging', function() {
before(function() { amqp.use(linkCache({ ttl: 10 })); });
beforeEach(function() {
if (!!test.client) delete test.client;
test.client = new AMQPClient();
});

afterEach(function() {
return test.client.disconnect()
.then(function() { delete test.client; });
});

it('should purge links after a given interval', function() {
var sender;
return test.client.connect(config.address)
.then(function() { return test.client.createSender('amq.topic'); })
.then(function(s) { sender = s; })
.delay(50)
.then(function() {
var state = sender.linkSM.getMachineState();
expect(state).to.be.oneOf(['DETACHED', 'DETACHING']);
sender = null;
});
});

it('should purge links after a given interval (2)', function() {
var sender1, sender2;
return test.client.connect(config.address)
.then(function() { return test.client.createSender('amq.fanout'); })
.tap(function(sender) { sender1 = sender; })
.delay(100)
.then(function() { return test.client.createSender('amq.fanout'); })
.tap(function(sender) { sender2 = sender; })
.then(function() { expect(sender1).to.not.eql(sender2); });
});
});

0 comments on commit d98fa28

Please sign in to comment.