Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ The full list of settings is as follows.
* <b><tt>socket</tt></b> - path to Unix socket if `unixsocket` is set


## Development

To run the tests, first start up a local Redis server running on port 16379.

$ redis-server spec/redis.conf
$ node spec/runner.js

This engine does have some incompatibilities with the spec, and currently
there are four known test failures:

1. Redis engine destroyClient when the client has subscriptions stops the client receiving messages:
2. Redis engine publish with a single wildcard delivers messages to matching subscriptions:
3. Redis engine publish with a double wildcard delivers a unique copy of the message to each client:
4. Redis engine publish with a double wildcard delivers messages to matching subscriptions:


## License

(The MIT License)
Expand All @@ -63,4 +79,3 @@ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

197 changes: 98 additions & 99 deletions faye-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,6 @@ var multiRedis = function(urls) {
var connection = self.connect(options);
var subscription = self.connect(options);

connection.on("reconnecting", function(reconnectionInfo) {
console.log("***** connection reconnecting -- " + options);
});
connection.on("error", function(err) {
console.log("***** connection error -- " + err + " -- " + options);
});

subscription.on("reconnecting", function(reconnectionInfo) {
console.log("***** subscription reconnecting -- " + options);
});
subscription.on("error", function(err) {
console.log("***** subscription error -- " + err + " -- " + options);
});

self.connections[url] = connection;
self.subscriptions[url] = subscription;
});
Expand Down Expand Up @@ -205,11 +191,13 @@ Engine.prototype = {
},

createClient: function(callback, context) {
var clientId = this._server.generateId(), self = this;
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
var clientId = this._server.generateId(),
score = new Date().getTime(),
self = this;

this._redis.zadd(this._ns + '/clients', score, clientId, function(error, added) {
if (added === 0) return self.createClient(callback, context);
self._server.debug('Created new client ?', clientId);
self.ping(clientId);
self._server.debug('Created new client ? with score ?', clientId, score);
self._server.trigger('handshake', clientId);
callback.call(context, clientId);
});
Expand All @@ -218,6 +206,11 @@ Engine.prototype = {
clientExists: function(clientId, callback, context) {
var timeout = this._server.timeout;

if (clientId === undefined) {
this._server.debug("[RedisEngine#clientExists] undefined clientId, returning false");
return callback.call(context, false);
}

this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
if (timeout) {
callback.call(context, score > new Date().getTime() - 1000 * 1.75 * timeout);
Expand All @@ -231,37 +224,86 @@ Engine.prototype = {
var timeout = this._server.timeout, self = this;

if (timeout) {
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
self._removeClient(clientId, callback, context);
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, n) {
if (error) {
return self._server.error("Failed to reset score for client ?: ?", clientId, error);
}
self._deleteSubscriptions(clientId, callback, context);
});
} else {
this._removeClient(clientId, callback, context);
this._deleteSubscriptions(clientId, callback, context);
}
},

_removeClient: function(clientId, callback, context) {
// Remove subscriptions for a client.
//
// The first part of cleaning up a client, this removes the client ID from
// all the channels that it's a member of. This prevents messages from being
// published to that client.
//
// For any Redis failures, we simply return without calling any further
// callbacks. This stops the client cleanup, but that's okay. Since the
// client ID is still in the sorted set, it will get mopped up in the next
// GC cycle (hopefully).
_deleteSubscriptions: function(clientId, callback, context) {
var self = this;
var clientChannelsKey = this._ns + "/clients/" + clientId + "/channels";

this._redis.smembers(clientChannelsKey, function(error, channels) {
if (error) {
return self._server.error("Failed to fetch channels: ?", error);
}

this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
var numChannels = channels.length, numUnsubscribes = 0;

if (numChannels == 0) {
return self._deleteClient(clientId, callback, context);
}

channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
var channelsKey = self._ns + "/channels" + channel;
self._redis.srem(channelsKey, clientId, function(error, res) {
if (error) {
return self._server.error("Failed to remove client ? from ?: ?", clientId, channelsKey, error);
}
numUnsubscribes += 1;
self._server.trigger("unsubscribe", clientId, channel);
if (numUnsubscribes == numChannels) {
self._deleteClient(clientId, callback, context);
}
});
});
});
},

_afterSubscriptionsRemoved: function(clientId, callback, context) {
var self = this;
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
self._redis.zrem(self._ns + '/clients', clientId, function() {
self._server.debug('Destroyed client ?', clientId);
self._server.trigger('disconnect', clientId);
if (callback) callback.call(context);
// Removes the client bookkeeping records.
//
// Finishes client cleanup by removing the mailbox, channel set, and finally
// the client ID from the sorted set. Once again, any Redis errors shut down
// the callback chain, and we'll rely on GC to pick it back up again.
_deleteClient: function(clientId, callback, context) {
var self = this,
clientChannelsKey = this._ns + "/clients/" + clientId + "/channels",
clientMessagesKey = this._ns + "/clients/" + clientId + "/messages";

this._redis.del(clientChannelsKey, function(error, res) {
if (error) {
return self._server.error("Failed to remove client channels ?: ?", clientChannelsKey, error);
}
self._redis.del(clientMessagesKey, function(error, res) {
if (error) {
return self._server.error("Failed to remove client messages ?: ?", clientMessagesKey, error);
}
self._redis.zrem(self._ns + "/clients", clientId, function(error, res) {
if (error) {
return self._server.error("Failed to remove client ID ? from /clients: ?", clientId, error);
}
self._server.debug("Destroyed client ? successfully", clientId);
self._server.trigger("disconnect", clientId);
if (callback) {
callback.call(context);
}
});
});
});
},
Expand Down Expand Up @@ -313,7 +355,6 @@ Engine.prototype = {
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
self._redis.publish(self._ns + '/notifications', clientId);
self._redis.expire(self._ns + '/clients/' + clientId + '/messages', 3600)
self._checkClient(clientId);

notified.push(clientId);
}
Expand All @@ -328,14 +369,6 @@ Engine.prototype = {
this._server.trigger('publish', message.clientId, message.channel, message.data);
},

_checkClient: function(clientId) {
var self = this;

this.clientExists(clientId, function(exists) {
if (!exists) self.destroyClient(clientId);
});
},

emptyQueue: function(clientId) {
if (!this._server.hasConnection(clientId)) return;

Expand All @@ -356,65 +389,31 @@ Engine.prototype = {
if (typeof timeout !== 'number') return;

this._redis.urls.forEach(function(url) {
console.log("[" + url + "] Running GC");
var connection = this._redis.connections[url];

this._withLock(connection, 'gc', function(releaseLock) {
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
self = this,
args;

var pruneClientsCallback = function pruneClientsCallback(error, clients) {
var i = 0, n = clients.length;
if (i === n) return releaseLock();

clients.forEach(function(clientId) {
this.destroyClient(clientId, function() {
i += 1;
if (i === n) {
console.log("[" + url + "] Destroyed " + n + " expired clients");
releaseLock();
}
}, this);
}, self);
};

if (this._gc_limit) {
args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback];
} else {
args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback];
}

connection.zrangebyscore.apply(connection, args);
}, this);
}, this);
},

_withLock: function(connection, lockName, callback, context) {
var lockKey = this._ns + '/locks/' + lockName,
currentTime = new Date().getTime(),
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
self = this;
this._server.debug("Running GC for ?", url);

var releaseLock = function() {
if (new Date().getTime() < expiry) connection.del(lockKey);
};

connection.setnx(lockKey, expiry, function(error, set) {
if (set === 1) return callback.call(context, releaseLock);
var connection = this._redis.connections[url],
cutoff = new Date().getTime() - 1000 * 2 * timeout,
self = this,
args;

connection.get(lockKey, function(error, timeout) {
if (!timeout) return;
var pruneClientsCallback = function pruneClientsCallback(error, clients) {
if (error) {
return self._server.error("Failed to fetch clients to GC: ?", error);
}
clients.forEach(function(clientId) {
self._server.debug("GC time for ? ...", clientId);
self.destroyClient(clientId);
});
};

var lockTimeout = parseInt(timeout, 10);
if (currentTime < lockTimeout) return;
if (this._gc_limit) {
args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback];
} else {
args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback];
}

connection.getset(lockKey, expiry, function(error, oldValue) {
if (oldValue !== timeout) return;
callback.call(context, releaseLock);
});
});
});
connection.zrangebyscore.apply(connection, args);
}, this);
}
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
, "keywords" : ["pubsub", "bayeux"]

, "version" : "0.1.7"
, "version" : "0.1.9"
, "engines" : {"node": ">=0.4.0"}
, "main" : "./faye-redis"
, "dependencies" : {"hiredis": "", "redis": "", "consistent-hashing": ""}
Expand Down
2 changes: 2 additions & 0 deletions spec/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ Faye = require('../vendor/faye/build/node/faye-node')
require('../vendor/faye/spec/javascript/engine_spec')
require('./faye_redis_spec')

// Faye.Logging.logLevel = 'debug';

JS.Test.autorun()