From b6e3abf49b8fff951e95f6fde57827ddf630aa96 Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Thu, 16 Oct 2014 16:39:01 -0400 Subject: [PATCH 01/10] Fix failing test with undefined clientId Small victories. --- faye-redis.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/faye-redis.js b/faye-redis.js index 526ddcd..7421036 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -218,6 +218,11 @@ Engine.prototype = { clientExists: function(clientId, callback, context) { var timeout = this._server.timeout; + if (clientId === undefined) { + this._server.debug("[RedisEngine#clientExists] undefined clientId, returning false"); + return callback.call(context, false); + } + this._redis.zscore(this._ns + '/clients', clientId, function(error, score) { if (timeout) { callback.call(context, score > new Date().getTime() - 1000 * 1.75 * timeout); From c16ee98e330f85dd709e99828f7d2e788e8196ca Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Thu, 16 Oct 2014 16:45:19 -0400 Subject: [PATCH 02/10] Remove useless connection logging --- faye-redis.js | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/faye-redis.js b/faye-redis.js index 7421036..e300db5 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -16,20 +16,6 @@ var multiRedis = function(urls) { var connection = self.connect(options); var subscription = self.connect(options); - connection.on("reconnecting", function(reconnectionInfo) { - console.log("***** connection reconnecting -- " + options); - }); - connection.on("error", function(err) { - console.log("***** connection error -- " + err + " -- " + options); - }); - - subscription.on("reconnecting", function(reconnectionInfo) { - console.log("***** subscription reconnecting -- " + options); - }); - subscription.on("error", function(err) { - console.log("***** subscription error -- " + err + " -- " + options); - }); - self.connections[url] = connection; self.subscriptions[url] = subscription; }); From 7e03b93366d228db47e523771472652a06746e1e Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Thu, 16 Oct 2014 16:46:09 -0400 Subject: [PATCH 03/10] Don't destroy clients when publishing Publishing was stepping on GC's toes by calling `destroyClient` whenever it detected a client ID was past its prime. I *think* there may have been some contention between the two (publisher and gc), so this moves the cleanup responsibility wholly to the latter. Well, not completely. The `disconnect` function inside of Faye itself calls `destroyClient` as well, but let's leave that be. --- faye-redis.js | 9 --------- 1 file changed, 9 deletions(-) diff --git a/faye-redis.js b/faye-redis.js index e300db5..7269f2d 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -304,7 +304,6 @@ Engine.prototype = { self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage); self._redis.publish(self._ns + '/notifications', clientId); self._redis.expire(self._ns + '/clients/' + clientId + '/messages', 3600) - self._checkClient(clientId); notified.push(clientId); } @@ -319,14 +318,6 @@ Engine.prototype = { this._server.trigger('publish', message.clientId, message.channel, message.data); }, - _checkClient: function(clientId) { - var self = this; - - this.clientExists(clientId, function(exists) { - if (!exists) self.destroyClient(clientId); - }); - }, - emptyQueue: function(clientId) { if (!this._server.hasConnection(clientId)) return; From c9a906f769a07f7148a07a88a83a6459e3b88ca4 Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Thu, 16 Oct 2014 16:58:09 -0400 Subject: [PATCH 04/10] Convert console.log to Faye debug in GC --- faye-redis.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/faye-redis.js b/faye-redis.js index 7269f2d..f6c710f 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -338,7 +338,7 @@ Engine.prototype = { if (typeof timeout !== 'number') return; this._redis.urls.forEach(function(url) { - console.log("[" + url + "] Running GC"); + this._server.debug("Running GC for ?", url); var connection = this._redis.connections[url]; this._withLock(connection, 'gc', function(releaseLock) { @@ -354,7 +354,7 @@ Engine.prototype = { this.destroyClient(clientId, function() { i += 1; if (i === n) { - console.log("[" + url + "] Destroyed " + n + " expired clients"); + self._server.debug("Destroyed ? expired clients for ?", n, url); releaseLock(); } }, this); From 8c2d9f25dab8ade747058863d7ddc2e534006df0 Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Thu, 16 Oct 2014 23:33:13 -0400 Subject: [PATCH 05/10] Rewrite destroyClient to tolerate Redis failures A lot of changes here. Primarily, the destroyClient function now tolerates Redis failures all the way downstream, and if any error is detected, the destroy process stops. Instead of trying to recover, we rely on the hope that GC will pick up a client and re-run the calls in the future, since the removal from the /clients sorted set is the final Redis call. The GC lock has also been removed. After grabbing a list of expired client IDs, we iterate over them with destroyClient, which returns immediately. Since we're simply launching destroyClient into flight, and don't care if it succeeds or not, we allow GC to re-run in the future without any restrictions. Because destroyClient doesn't block, we have an additional, expected test failure -- "Redis engine destroyClient when the client has subscriptions stops the client receiving messages:". This will fail b/c it's written to expect the destroy to occur immediately. If the test is rewritten with the expectation in a callback, it succeeds. --- faye-redis.js | 157 +++++++++++++++++++++++++++----------------------- 1 file changed, 86 insertions(+), 71 deletions(-) diff --git a/faye-redis.js b/faye-redis.js index f6c710f..a63a50d 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -222,37 +222,86 @@ Engine.prototype = { var timeout = this._server.timeout, self = this; if (timeout) { - this._redis.zadd(this._ns + '/clients', 0, clientId, function() { - self._removeClient(clientId, callback, context); + this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, n) { + if (error) { + return self._server.error("Failed to reset score for client ?: ?", clientId, error); + } + self._deleteSubscriptions(clientId, callback, context); }); } else { - this._removeClient(clientId, callback, context); + this._deleteSubscriptions(clientId, callback, context); } }, - _removeClient: function(clientId, callback, context) { + // Remove subscriptions for a client. + // + // The first part of cleaning up a client, this removes the client ID from + // all the channels that it's a member of. This prevents messages from being + // published to that client. + // + // For any Redis failures, we simply return without calling any further + // callbacks. This stops the client cleanup, but that's okay. Since the + // client ID is still in the sorted set, it will get mopped up in the next + // GC cycle (hopefully). + _deleteSubscriptions: function(clientId, callback, context) { var self = this; + var clientChannelsKey = this._ns + "/clients/" + clientId + "/channels"; - this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) { - var n = channels.length, i = 0; - if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context); + this._redis.smembers(clientChannelsKey, function(error, channels) { + if (error) { + return self._server.error("Failed to fetch channels: ?", error); + } + + var numChannels = channels.length, numUnsubscribes = 0; + + if (numChannels == 0) { + return self._deleteClient(clientId, callback, context); + } channels.forEach(function(channel) { - self.unsubscribe(clientId, channel, function() { - i += 1; - if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context); + var channelsKey = this._ns + "/channels" + channel; + self._redis.srem(channelsKey, clientId, function(error, res) { + if (error) { + return self._server.error("Failed to remove client ? from ?: ?", clientId, channelsKey, error); + } + numUnsubscribes += 1; + self._server.trigger("unsubscribe", clientId, channel); + if (numUnsubscribes == numChannels) { + self._deleteClient(clientId, callback, context); + } }); }); }); }, - _afterSubscriptionsRemoved: function(clientId, callback, context) { - var self = this; - this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() { - self._redis.zrem(self._ns + '/clients', clientId, function() { - self._server.debug('Destroyed client ?', clientId); - self._server.trigger('disconnect', clientId); - if (callback) callback.call(context); + // Removes the client bookkeeping records. + // + // Finishes client cleanup by removing the mailbox, channel set, and finally + // the client ID from the sorted set. Once again, any Redis errors shut down + // the callback chain, and we'll rely on GC to pick it back up again. + _deleteClient: function(clientId, callback, context) { + var self = this, + clientChannelsKey = this._ns + "/clients/" + clientId + "/channels", + clientMessagesKey = this._ns + "/clients/" + clientId + "/messages"; + + this._redis.del(clientChannelsKey, function(error, res) { + if (error) { + return self._server.error("Failed to remove client channels ?: ?", clientChannelsKey, error); + } + self._redis.del(clientMessagesKey, function(error, res) { + if (error) { + return self._server.error("Failed to remove client messages ?: ?", clientMessagesKey, error); + } + self._redis.zrem(self._ns + "/clients", clientId, function(error, res) { + if (error) { + return self._server.error("Failed to remove client ID ? from /clients: ?", clientId, error); + } + self._server.debug("Destroyed client ? successfully", clientId); + self._server.trigger("disconnect", clientId); + if (callback) { + callback.call(context); + } + }); }); }); }, @@ -339,64 +388,30 @@ Engine.prototype = { this._redis.urls.forEach(function(url) { this._server.debug("Running GC for ?", url); - var connection = this._redis.connections[url]; - - this._withLock(connection, 'gc', function(releaseLock) { - var cutoff = new Date().getTime() - 1000 * 2 * timeout, - self = this, - args; - - var pruneClientsCallback = function pruneClientsCallback(error, clients) { - var i = 0, n = clients.length; - if (i === n) return releaseLock(); - - clients.forEach(function(clientId) { - this.destroyClient(clientId, function() { - i += 1; - if (i === n) { - self._server.debug("Destroyed ? expired clients for ?", n, url); - releaseLock(); - } - }, this); - }, self); - }; - - if (this._gc_limit) { - args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback]; - } else { - args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback]; - } - connection.zrangebyscore.apply(connection, args); - }, this); - }, this); - }, - - _withLock: function(connection, lockName, callback, context) { - var lockKey = this._ns + '/locks/' + lockName, - currentTime = new Date().getTime(), - expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1, - self = this; - - var releaseLock = function() { - if (new Date().getTime() < expiry) connection.del(lockKey); - }; + var connection = this._redis.connections[url], + cutoff = new Date().getTime() - 1000 * 2 * timeout, + self = this, + args; - connection.setnx(lockKey, expiry, function(error, set) { - if (set === 1) return callback.call(context, releaseLock); - - connection.get(lockKey, function(error, timeout) { - if (!timeout) return; + var pruneClientsCallback = function pruneClientsCallback(error, clients) { + if (error) { + return self._server.error("Failed to fetch clients to GC: ?", error); + } + clients.forEach(function(clientId) { + self._server.debug("GC time for ? ...", clientId); + self.destroyClient(clientId); + }); + }; - var lockTimeout = parseInt(timeout, 10); - if (currentTime < lockTimeout) return; + if (this._gc_limit) { + args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback]; + } else { + args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback]; + } - connection.getset(lockKey, expiry, function(error, oldValue) { - if (oldValue !== timeout) return; - callback.call(context, releaseLock); - }); - }); - }); + connection.zrangebyscore.apply(connection, args); + }, this); } }; From ab2ff5f51278eec0968d32013ad2cc5ca78aa400 Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Fri, 17 Oct 2014 08:35:57 -0400 Subject: [PATCH 06/10] Set the score with the initial ZADD in createClient Previously, it would set the initial score to zero, and then allow ping() to update the score to now in the callback. This seems like it would allow a window of opportunity for GC to sneak in and kill the newly created client, however, so now it simply sets the score in the initial call. --- faye-redis.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/faye-redis.js b/faye-redis.js index a63a50d..fe0f16a 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -191,11 +191,13 @@ Engine.prototype = { }, createClient: function(callback, context) { - var clientId = this._server.generateId(), self = this; - this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) { + var clientId = this._server.generateId(), + score = new Date().getTime(), + self = this; + + this._redis.zadd(this._ns + '/clients', score, clientId, function(error, added) { if (added === 0) return self.createClient(callback, context); - self._server.debug('Created new client ?', clientId); - self.ping(clientId); + self._server.debug('Created new client ? with score ?', clientId, score); self._server.trigger('handshake', clientId); callback.call(context, clientId); }); From ebd7f3e079312dbb9ff1c8372b9cd8507b012d93 Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Fri, 17 Oct 2014 08:37:18 -0400 Subject: [PATCH 07/10] Modify Faye logging level when running tests --- spec/runner.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/runner.js b/spec/runner.js index 6f7eec9..eaa1014 100644 --- a/spec/runner.js +++ b/spec/runner.js @@ -5,4 +5,6 @@ Faye = require('../vendor/faye/build/node/faye-node') require('../vendor/faye/spec/javascript/engine_spec') require('./faye_redis_spec') +// Faye.Logging.logLevel = 'debug'; + JS.Test.autorun() From 67c2cfb099de6ac33e8d72c0795eecc4ed0812da Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Fri, 17 Oct 2014 08:42:48 -0400 Subject: [PATCH 08/10] Update README with test running instructions --- README.markdown | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/README.markdown b/README.markdown index 2d35d8e..4617513 100644 --- a/README.markdown +++ b/README.markdown @@ -41,6 +41,22 @@ The full list of settings is as follows. * socket - path to Unix socket if `unixsocket` is set +## Development + +To run the tests, first start up a local Redis server running on port 16379. + + $ redis-server spec/redis.conf + $ node spec/runner.js + +This engine does have some incompatibilities with the spec, and currently +there are four known test failures: + +1. Redis engine destroyClient when the client has subscriptions stops the client receiving messages: +2. Redis engine publish with a single wildcard delivers messages to matching subscriptions: +3. Redis engine publish with a double wildcard delivers a unique copy of the message to each client: +4. Redis engine publish with a double wildcard delivers messages to matching subscriptions: + + ## License (The MIT License) @@ -63,4 +79,3 @@ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - From f558fe65ac795de84c3808a71b5bcd6d3ebdb19e Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Fri, 17 Oct 2014 10:41:57 -0400 Subject: [PATCH 09/10] Update version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f64e2f2..2b2ac4e 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ , "author" : "James Coglan (http://jcoglan.com/)" , "keywords" : ["pubsub", "bayeux"] -, "version" : "0.1.7" +, "version" : "0.1.8" , "engines" : {"node": ">=0.4.0"} , "main" : "./faye-redis" , "dependencies" : {"hiredis": "", "redis": "", "consistent-hashing": ""} From f771ae586ae8287a1c363b5846240ff6c438dd50 Mon Sep 17 00:00:00 2001 From: Dave Yeu Date: Fri, 17 Oct 2014 11:00:23 -0400 Subject: [PATCH 10/10] Fixed namespace when removing subscriptions --- faye-redis.js | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/faye-redis.js b/faye-redis.js index fe0f16a..8916aac 100644 --- a/faye-redis.js +++ b/faye-redis.js @@ -261,7 +261,7 @@ Engine.prototype = { } channels.forEach(function(channel) { - var channelsKey = this._ns + "/channels" + channel; + var channelsKey = self._ns + "/channels" + channel; self._redis.srem(channelsKey, clientId, function(error, res) { if (error) { return self._server.error("Failed to remove client ? from ?: ?", clientId, channelsKey, error); diff --git a/package.json b/package.json index 2b2ac4e..6c286f7 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ , "author" : "James Coglan (http://jcoglan.com/)" , "keywords" : ["pubsub", "bayeux"] -, "version" : "0.1.8" +, "version" : "0.1.9" , "engines" : {"node": ">=0.4.0"} , "main" : "./faye-redis" , "dependencies" : {"hiredis": "", "redis": "", "consistent-hashing": ""}