From d98fa2812b6977679b07f6c255383e44234f3b73 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Sun, 27 Mar 2016 19:54:50 -0400 Subject: [PATCH] feat(plugin): use amqp10's new plugin behavior --- README.md | 16 +++--- index.js | 107 ++++++++++++++++++++++------------------ package.json | 6 +-- test/link-cache.test.js | 54 +++++--------------- test/purge.test.js | 44 +++++++++++++++++ 5 files changed, 126 insertions(+), 101 deletions(-) create mode 100644 test/purge.test.js diff --git a/README.md b/README.md index 56530f9..24d5e77 100644 --- a/README.md +++ b/README.md @@ -11,16 +11,16 @@ created or a cached copy will be returned. ## usage ``` 'use strict'; -var LinkCache = require('../'), - AMQPClient = require('amqp10').Client; +var amqp = require('amqp'), + linkCache = require('amqp10-link-cache'); -var client = new AMQPClient(); -client.connect('amqp://localhost') - .then(function() { return new LinkCache(client); }); - .then(function(cache) { - // e.g. pass cache to all of your API endpoints _as_ the 'client' +// plug-in the link cache, with optional parameters +amqp.use(linkCache({ ttl: 5000 )); - return Promise.all([ cache.crateSender('amq.topic'), cache.crateSender('amq.topic') ]); +var client = new amqp.Client(); +client.connect('amqp://localhost') + .then(function() { + return Promise.all([ client.crateSender('amq.topic'), client.crateSender('amq.topic') ]); }) .spread(function(sender1, sender2) { // sender1 === sender2 diff --git a/index.js b/index.js index bd9e44f..a027650 100644 --- a/index.js +++ b/index.js @@ -1,69 +1,48 @@ 'use strict'; -var hash = require('object-hash'); +var Promise = require('bluebird'), + hash = require('object-hash'); -function LinkCache(client, options) { - options = options || {}; - this._client = client; - this._links = {}; - this._ttl = options.ttl || 60000; - this._purgeTimeout = null; - - client.on('disconnected', function() {}); -} - -LinkCache.prototype.createSender = function(address, options) { - return this._createLink(address, options, 'sender', 'createSender'); -}; - -LinkCache.prototype.createReceiver = function(address, options) { - return this._createLink(address, options, 'receiver', 'createReceiver'); -}; - -LinkCache.prototype.createSenderStream = function(address, options) { - return this._createLink(address, options, 'senderStream', 'createSenderStream'); -}; +var links = {}; +var ttl = 60000; +var purgeTimeout = null; -LinkCache.prototype.createReceiverStream = function(address, options) { - return this._createLink(address, options, 'receiverStream', 'createReceiverStream'); -}; - -LinkCache.prototype._createLink = function(address, options, type, method) { +function createLink(address, options, type, method) { var linkHash = hash({ type: type, address: address, options: options }); - if (this._links.hasOwnProperty(linkHash)) { - var entry = this._links[linkHash]; + if (links.hasOwnProperty(linkHash)) { + var entry = links[linkHash]; if (!entry.hasOwnProperty('link')) return entry; - this._links[linkHash].stamp = Date.now(); - return Promise.resolve(this._links[linkHash].link); + links[linkHash].stamp = Date.now(); + return Promise.resolve(links[linkHash].link); } - var self = this; - var linkPromise = this._client[method](address, options) + var linkPromise = method(address, options) .then(function(link) { link.once('detached', function() { - if (self._links.hasOwnProperty(linkHash)) - delete self._links[linkHash]; + if (links.hasOwnProperty(linkHash)) + delete links[linkHash]; }); - self._links[linkHash] = { link: link, stamp: Date.now() }; - if (!self._purgeTimeout) - self._purgeTimeout = setTimeout(self._purgeLinks.bind(self), self._ttl); + links[linkHash] = { link: link, stamp: Date.now() }; + if (!purgeTimeout) + purgeTimeout = setTimeout(purgeLinks, ttl); return link; }); - this._links[linkHash] = linkPromise; + links[linkHash] = linkPromise; return linkPromise; -}; +} -LinkCache.prototype._purgeLinks = function() { +function purgeLinks() { var now = Date.now(); - var _keys = Object.keys(this._links), + var _keys = Object.keys(links), expired = [], live = 0; + purgeTimeout = null; for (var i = 0, ii = _keys.length; i < ii; ++i) { - if (now - this._links[_keys[i]].stamp >= this._ttl) { + if (now - links[_keys[i]].stamp >= ttl) { expired.push(_keys[i]); } else { live++; @@ -71,14 +50,44 @@ LinkCache.prototype._purgeLinks = function() { } for (var j = 0, jj = expired.length; j < jj; ++j) { - var cacheEntry = this._links[expired[j]]; - delete this._links[_keys[j]]; + var cacheEntry = links[expired[j]]; + delete links[expired[j]]; cacheEntry.link.detach(); } - if (live && !this._purgeTimeout) { - this._purgeTimeout = setTimeout(this._purgeLinks.bind(this), this._ttl); + if (live) { + purgeTimeout = setTimeout(purgeLinks, ttl); } -}; +} -module.exports = LinkCache; +module.exports = function(options) { + // NOTE: we need to re-initialize these every time the plugin is called + options = options || {}; + links = {}; + ttl = options.ttl || 60000; + if (!!purgeTimeout) clearTimeout(purgeTimeout); + purgeTimeout = null; + + return function(Client) { + var _createSender = Client.prototype.createSender, + _createReceiver = Client.prototype.createReceiver, + _createSenderStream = Client.prototype.createSenderStream, + _createReceiverStream = Client.prototype.createReceiverStream; + + Client.prototype.createSender = function(address, options) { + return createLink(address, options, 'sender', _createSender.bind(this)); + }; + + Client.prototype.createReceiver = function(address, options) { + return createLink(address, options, 'receiver', _createReceiver.bind(this)); + }; + + Client.prototype.createSenderStream = function(address, options) { + return createLink(address, options, 'senderStream', _createSenderStream.bind(this)); + }; + + Client.prototype.createReceiverStream = function(address, options) { + return createLink(address, options, 'receiverStream', _createReceiverStream.bind(this)); + }; + }; +}; diff --git a/package.json b/package.json index 84d2efc..0f21bf7 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "amqp10-link-cache", "version": "0.2.0", - "description": "a link caching mechanism for the amqp10 module", + "description": "A link caching plugin for the amqp10 module", "main": "index.js", "engines": { "node": ">= 0.10" @@ -24,11 +24,11 @@ "url": "https://github.com/mbroadst/amqp10-link-cache.git" }, "dependencies": { + "bluebird": "^3.3.4", "object-hash": "^1.1.2" }, "devDependencies": { - "amqp10": "^3.1.0", - "bluebird": "^3.3.4", + "amqp10": "^3.1.1", "chai": "^3.5.0", "jshint": "^2.9.1", "mocha": "^2.4.5" diff --git a/test/link-cache.test.js b/test/link-cache.test.js index 79778be..55dbfca 100644 --- a/test/link-cache.test.js +++ b/test/link-cache.test.js @@ -1,25 +1,22 @@ 'use strict'; var Promise = require('bluebird'), - LinkCache = require('../'), - AMQPClient = require('amqp10').Client, + amqp = require('amqp10'), + linkCache = require('..'), + AMQPClient = amqp.Client, config = require('./config'), expect = require('chai').expect; var test = {}; -describe('LinkCache', function() { +describe('basic behavior', function() { + before(function() { amqp.use(linkCache()); }); beforeEach(function() { if (!!test.client) delete test.client; - if (!!test.cache) delete test.cache; - test.client = new AMQPClient(); - test.cache = new LinkCache(test.client); }); afterEach(function() { - return test.client.disconnect().then(function() { - delete test.client; - delete test.cache; - }); + return test.client.disconnect() + .then(function() { delete test.client; }); }); [ @@ -32,9 +29,9 @@ describe('LinkCache', function() { return test.client.connect(config.address) .then(function() { return Promise.all([ - test.cache[testCase.method]('amq.topic'), - test.cache[testCase.method]('amq.topic'), - test.cache[testCase.method]('amq.topic') + test.client[testCase.method]('amq.topic'), + test.client[testCase.method]('amq.topic'), + test.client[testCase.method]('amq.topic') ]); }) .spread(function(link1, link2, link3) { @@ -48,9 +45,9 @@ describe('LinkCache', function() { return test.client.connect(config.address) .then(function() { return Promise.all([ - test.cache[testCase.method]('amq.topic'), - test.cache[testCase.method]('amq.topic', { attach: { receiverSettleMode: false } }), - test.cache[testCase.method]('amq.topic/testing') + test.client[testCase.method]('amq.topic'), + test.client[testCase.method]('amq.topic', { attach: { receiverSettleMode: false } }), + test.client[testCase.method]('amq.topic/testing') ]); }) .spread(function(link1, link2, link3) { @@ -60,29 +57,4 @@ describe('LinkCache', function() { }); }); }); - - it('should purge links after a given interval', function() { - test.cache = new LinkCache(test.client, { ttl: 1 }); - return test.client.connect(config.address) - .then(function() { return test.cache.createSender('amq.topic'); }) - .delay(5) - .then(function() { - var linkCount = Object.keys(test.cache._links).length; - expect(linkCount).to.equal(0); - }); - }); - - it('should purge links after a given interval (2)', function() { - test.cache = new LinkCache(test.client, { ttl: 1 }); - - var sender1, sender2; - return test.client.connect(config.address) - .then(function() { return test.cache.createSender('amq.topic'); }) - .tap(function(sender) { sender1 = sender; }) - .delay(5) - .then(function() { return test.cache.createSender('amq.topic'); }) - .tap(function(sender) { sender2 = sender; }) - .then(function() { expect(sender1).to.not.eql(sender2); }); - }); - }); diff --git a/test/purge.test.js b/test/purge.test.js new file mode 100644 index 0000000..66951ed --- /dev/null +++ b/test/purge.test.js @@ -0,0 +1,44 @@ +'use strict'; +var amqp = require('amqp10'), + linkCache = require('..'), + AMQPClient = amqp.Client, + config = require('./config'), + expect = require('chai').expect; + +var test = {}; +describe('purging', function() { + before(function() { amqp.use(linkCache({ ttl: 10 })); }); + beforeEach(function() { + if (!!test.client) delete test.client; + test.client = new AMQPClient(); + }); + + afterEach(function() { + return test.client.disconnect() + .then(function() { delete test.client; }); + }); + + it('should purge links after a given interval', function() { + var sender; + return test.client.connect(config.address) + .then(function() { return test.client.createSender('amq.topic'); }) + .then(function(s) { sender = s; }) + .delay(50) + .then(function() { + var state = sender.linkSM.getMachineState(); + expect(state).to.be.oneOf(['DETACHED', 'DETACHING']); + sender = null; + }); + }); + + it('should purge links after a given interval (2)', function() { + var sender1, sender2; + return test.client.connect(config.address) + .then(function() { return test.client.createSender('amq.fanout'); }) + .tap(function(sender) { sender1 = sender; }) + .delay(100) + .then(function() { return test.client.createSender('amq.fanout'); }) + .tap(function(sender) { sender2 = sender; }) + .then(function() { expect(sender1).to.not.eql(sender2); }); + }); +});