diff --git a/README.markdown b/README.markdown
index 2d35d8e..4617513 100644
--- a/README.markdown
+++ b/README.markdown
@@ -41,6 +41,22 @@ The full list of settings is as follows.
* socket - path to Unix socket if `unixsocket` is set
+## Development
+
+To run the tests, first start up a local Redis server running on port 16379.
+
+ $ redis-server spec/redis.conf
+ $ node spec/runner.js
+
+This engine does have some incompatibilities with the spec, and currently
+there are four known test failures:
+
+1. Redis engine destroyClient when the client has subscriptions stops the client receiving messages:
+2. Redis engine publish with a single wildcard delivers messages to matching subscriptions:
+3. Redis engine publish with a double wildcard delivers a unique copy of the message to each client:
+4. Redis engine publish with a double wildcard delivers messages to matching subscriptions:
+
+
## License
(The MIT License)
@@ -63,4 +79,3 @@ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
diff --git a/faye-redis.js b/faye-redis.js
index 526ddcd..8916aac 100644
--- a/faye-redis.js
+++ b/faye-redis.js
@@ -16,20 +16,6 @@ var multiRedis = function(urls) {
var connection = self.connect(options);
var subscription = self.connect(options);
- connection.on("reconnecting", function(reconnectionInfo) {
- console.log("***** connection reconnecting -- " + options);
- });
- connection.on("error", function(err) {
- console.log("***** connection error -- " + err + " -- " + options);
- });
-
- subscription.on("reconnecting", function(reconnectionInfo) {
- console.log("***** subscription reconnecting -- " + options);
- });
- subscription.on("error", function(err) {
- console.log("***** subscription error -- " + err + " -- " + options);
- });
-
self.connections[url] = connection;
self.subscriptions[url] = subscription;
});
@@ -205,11 +191,13 @@ Engine.prototype = {
},
createClient: function(callback, context) {
- var clientId = this._server.generateId(), self = this;
- this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
+ var clientId = this._server.generateId(),
+ score = new Date().getTime(),
+ self = this;
+
+ this._redis.zadd(this._ns + '/clients', score, clientId, function(error, added) {
if (added === 0) return self.createClient(callback, context);
- self._server.debug('Created new client ?', clientId);
- self.ping(clientId);
+ self._server.debug('Created new client ? with score ?', clientId, score);
self._server.trigger('handshake', clientId);
callback.call(context, clientId);
});
@@ -218,6 +206,11 @@ Engine.prototype = {
clientExists: function(clientId, callback, context) {
var timeout = this._server.timeout;
+ if (clientId === undefined) {
+ this._server.debug("[RedisEngine#clientExists] undefined clientId, returning false");
+ return callback.call(context, false);
+ }
+
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
if (timeout) {
callback.call(context, score > new Date().getTime() - 1000 * 1.75 * timeout);
@@ -231,37 +224,86 @@ Engine.prototype = {
var timeout = this._server.timeout, self = this;
if (timeout) {
- this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
- self._removeClient(clientId, callback, context);
+ this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, n) {
+ if (error) {
+ return self._server.error("Failed to reset score for client ?: ?", clientId, error);
+ }
+ self._deleteSubscriptions(clientId, callback, context);
});
} else {
- this._removeClient(clientId, callback, context);
+ this._deleteSubscriptions(clientId, callback, context);
}
},
- _removeClient: function(clientId, callback, context) {
+ // Remove subscriptions for a client.
+ //
+ // The first part of cleaning up a client, this removes the client ID from
+ // all the channels that it's a member of. This prevents messages from being
+ // published to that client.
+ //
+ // For any Redis failures, we simply return without calling any further
+ // callbacks. This stops the client cleanup, but that's okay. Since the
+ // client ID is still in the sorted set, it will get mopped up in the next
+ // GC cycle (hopefully).
+ _deleteSubscriptions: function(clientId, callback, context) {
var self = this;
+ var clientChannelsKey = this._ns + "/clients/" + clientId + "/channels";
+
+ this._redis.smembers(clientChannelsKey, function(error, channels) {
+ if (error) {
+ return self._server.error("Failed to fetch channels: ?", error);
+ }
- this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
- var n = channels.length, i = 0;
- if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
+ var numChannels = channels.length, numUnsubscribes = 0;
+
+ if (numChannels == 0) {
+ return self._deleteClient(clientId, callback, context);
+ }
channels.forEach(function(channel) {
- self.unsubscribe(clientId, channel, function() {
- i += 1;
- if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
+ var channelsKey = self._ns + "/channels" + channel;
+ self._redis.srem(channelsKey, clientId, function(error, res) {
+ if (error) {
+ return self._server.error("Failed to remove client ? from ?: ?", clientId, channelsKey, error);
+ }
+ numUnsubscribes += 1;
+ self._server.trigger("unsubscribe", clientId, channel);
+ if (numUnsubscribes == numChannels) {
+ self._deleteClient(clientId, callback, context);
+ }
});
});
});
},
- _afterSubscriptionsRemoved: function(clientId, callback, context) {
- var self = this;
- this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
- self._redis.zrem(self._ns + '/clients', clientId, function() {
- self._server.debug('Destroyed client ?', clientId);
- self._server.trigger('disconnect', clientId);
- if (callback) callback.call(context);
+ // Removes the client bookkeeping records.
+ //
+ // Finishes client cleanup by removing the mailbox, channel set, and finally
+ // the client ID from the sorted set. Once again, any Redis errors shut down
+ // the callback chain, and we'll rely on GC to pick it back up again.
+ _deleteClient: function(clientId, callback, context) {
+ var self = this,
+ clientChannelsKey = this._ns + "/clients/" + clientId + "/channels",
+ clientMessagesKey = this._ns + "/clients/" + clientId + "/messages";
+
+ this._redis.del(clientChannelsKey, function(error, res) {
+ if (error) {
+ return self._server.error("Failed to remove client channels ?: ?", clientChannelsKey, error);
+ }
+ self._redis.del(clientMessagesKey, function(error, res) {
+ if (error) {
+ return self._server.error("Failed to remove client messages ?: ?", clientMessagesKey, error);
+ }
+ self._redis.zrem(self._ns + "/clients", clientId, function(error, res) {
+ if (error) {
+ return self._server.error("Failed to remove client ID ? from /clients: ?", clientId, error);
+ }
+ self._server.debug("Destroyed client ? successfully", clientId);
+ self._server.trigger("disconnect", clientId);
+ if (callback) {
+ callback.call(context);
+ }
+ });
});
});
},
@@ -313,7 +355,6 @@ Engine.prototype = {
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
self._redis.publish(self._ns + '/notifications', clientId);
self._redis.expire(self._ns + '/clients/' + clientId + '/messages', 3600)
- self._checkClient(clientId);
notified.push(clientId);
}
@@ -328,14 +369,6 @@ Engine.prototype = {
this._server.trigger('publish', message.clientId, message.channel, message.data);
},
- _checkClient: function(clientId) {
- var self = this;
-
- this.clientExists(clientId, function(exists) {
- if (!exists) self.destroyClient(clientId);
- });
- },
-
emptyQueue: function(clientId) {
if (!this._server.hasConnection(clientId)) return;
@@ -356,65 +389,31 @@ Engine.prototype = {
if (typeof timeout !== 'number') return;
this._redis.urls.forEach(function(url) {
- console.log("[" + url + "] Running GC");
- var connection = this._redis.connections[url];
-
- this._withLock(connection, 'gc', function(releaseLock) {
- var cutoff = new Date().getTime() - 1000 * 2 * timeout,
- self = this,
- args;
-
- var pruneClientsCallback = function pruneClientsCallback(error, clients) {
- var i = 0, n = clients.length;
- if (i === n) return releaseLock();
-
- clients.forEach(function(clientId) {
- this.destroyClient(clientId, function() {
- i += 1;
- if (i === n) {
- console.log("[" + url + "] Destroyed " + n + " expired clients");
- releaseLock();
- }
- }, this);
- }, self);
- };
-
- if (this._gc_limit) {
- args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback];
- } else {
- args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback];
- }
-
- connection.zrangebyscore.apply(connection, args);
- }, this);
- }, this);
- },
-
- _withLock: function(connection, lockName, callback, context) {
- var lockKey = this._ns + '/locks/' + lockName,
- currentTime = new Date().getTime(),
- expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
- self = this;
+ this._server.debug("Running GC for ?", url);
- var releaseLock = function() {
- if (new Date().getTime() < expiry) connection.del(lockKey);
- };
-
- connection.setnx(lockKey, expiry, function(error, set) {
- if (set === 1) return callback.call(context, releaseLock);
+ var connection = this._redis.connections[url],
+ cutoff = new Date().getTime() - 1000 * 2 * timeout,
+ self = this,
+ args;
- connection.get(lockKey, function(error, timeout) {
- if (!timeout) return;
+ var pruneClientsCallback = function pruneClientsCallback(error, clients) {
+ if (error) {
+ return self._server.error("Failed to fetch clients to GC: ?", error);
+ }
+ clients.forEach(function(clientId) {
+ self._server.debug("GC time for ? ...", clientId);
+ self.destroyClient(clientId);
+ });
+ };
- var lockTimeout = parseInt(timeout, 10);
- if (currentTime < lockTimeout) return;
+ if (this._gc_limit) {
+ args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback];
+ } else {
+ args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback];
+ }
- connection.getset(lockKey, expiry, function(error, oldValue) {
- if (oldValue !== timeout) return;
- callback.call(context, releaseLock);
- });
- });
- });
+ connection.zrangebyscore.apply(connection, args);
+ }, this);
}
};
diff --git a/package.json b/package.json
index f64e2f2..6c286f7 100644
--- a/package.json
+++ b/package.json
@@ -4,7 +4,7 @@
, "author" : "James Coglan (http://jcoglan.com/)"
, "keywords" : ["pubsub", "bayeux"]
-, "version" : "0.1.7"
+, "version" : "0.1.9"
, "engines" : {"node": ">=0.4.0"}
, "main" : "./faye-redis"
, "dependencies" : {"hiredis": "", "redis": "", "consistent-hashing": ""}
diff --git a/spec/runner.js b/spec/runner.js
index 6f7eec9..eaa1014 100644
--- a/spec/runner.js
+++ b/spec/runner.js
@@ -5,4 +5,6 @@ Faye = require('../vendor/faye/build/node/faye-node')
require('../vendor/faye/spec/javascript/engine_spec')
require('./faye_redis_spec')
+// Faye.Logging.logLevel = 'debug';
+
JS.Test.autorun()