Skip to content

Commit 2d43d61

Browse files
committed
Merge pull request #1 from groupme/redis-errors
Rewrite GC to handle Redis failures
2 parents e14750c + f771ae5 commit 2d43d61

File tree

4 files changed

+117
-101
lines changed

4 files changed

+117
-101
lines changed

README.markdown

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@ The full list of settings is as follows.
4141
* <b><tt>socket</tt></b> - path to Unix socket if `unixsocket` is set
4242

4343

44+
## Development
45+
46+
To run the tests, first start up a local Redis server running on port 16379.
47+
48+
$ redis-server spec/redis.conf
49+
$ node spec/runner.js
50+
51+
This engine does have some incompatibilities with the spec, and currently
52+
there are four known test failures:
53+
54+
1. Redis engine destroyClient when the client has subscriptions stops the client receiving messages:
55+
2. Redis engine publish with a single wildcard delivers messages to matching subscriptions:
56+
3. Redis engine publish with a double wildcard delivers a unique copy of the message to each client:
57+
4. Redis engine publish with a double wildcard delivers messages to matching subscriptions:
58+
59+
4460
## License
4561

4662
(The MIT License)
@@ -63,4 +79,3 @@ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
6379
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
6480
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
6581
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
66-

faye-redis.js

Lines changed: 98 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,6 @@ var multiRedis = function(urls) {
1616
var connection = self.connect(options);
1717
var subscription = self.connect(options);
1818

19-
connection.on("reconnecting", function(reconnectionInfo) {
20-
console.log("***** connection reconnecting -- " + options);
21-
});
22-
connection.on("error", function(err) {
23-
console.log("***** connection error -- " + err + " -- " + options);
24-
});
25-
26-
subscription.on("reconnecting", function(reconnectionInfo) {
27-
console.log("***** subscription reconnecting -- " + options);
28-
});
29-
subscription.on("error", function(err) {
30-
console.log("***** subscription error -- " + err + " -- " + options);
31-
});
32-
3319
self.connections[url] = connection;
3420
self.subscriptions[url] = subscription;
3521
});
@@ -205,11 +191,13 @@ Engine.prototype = {
205191
},
206192

207193
createClient: function(callback, context) {
208-
var clientId = this._server.generateId(), self = this;
209-
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
194+
var clientId = this._server.generateId(),
195+
score = new Date().getTime(),
196+
self = this;
197+
198+
this._redis.zadd(this._ns + '/clients', score, clientId, function(error, added) {
210199
if (added === 0) return self.createClient(callback, context);
211-
self._server.debug('Created new client ?', clientId);
212-
self.ping(clientId);
200+
self._server.debug('Created new client ? with score ?', clientId, score);
213201
self._server.trigger('handshake', clientId);
214202
callback.call(context, clientId);
215203
});
@@ -218,6 +206,11 @@ Engine.prototype = {
218206
clientExists: function(clientId, callback, context) {
219207
var timeout = this._server.timeout;
220208

209+
if (clientId === undefined) {
210+
this._server.debug("[RedisEngine#clientExists] undefined clientId, returning false");
211+
return callback.call(context, false);
212+
}
213+
221214
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
222215
if (timeout) {
223216
callback.call(context, score > new Date().getTime() - 1000 * 1.75 * timeout);
@@ -231,37 +224,86 @@ Engine.prototype = {
231224
var timeout = this._server.timeout, self = this;
232225

233226
if (timeout) {
234-
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
235-
self._removeClient(clientId, callback, context);
227+
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, n) {
228+
if (error) {
229+
return self._server.error("Failed to reset score for client ?: ?", clientId, error);
230+
}
231+
self._deleteSubscriptions(clientId, callback, context);
236232
});
237233
} else {
238-
this._removeClient(clientId, callback, context);
234+
this._deleteSubscriptions(clientId, callback, context);
239235
}
240236
},
241237

242-
_removeClient: function(clientId, callback, context) {
238+
// Remove subscriptions for a client.
239+
//
240+
// The first part of cleaning up a client, this removes the client ID from
241+
// all the channels that it's a member of. This prevents messages from being
242+
// published to that client.
243+
//
244+
// For any Redis failures, we simply return without calling any further
245+
// callbacks. This stops the client cleanup, but that's okay. Since the
246+
// client ID is still in the sorted set, it will get mopped up in the next
247+
// GC cycle (hopefully).
248+
_deleteSubscriptions: function(clientId, callback, context) {
243249
var self = this;
250+
var clientChannelsKey = this._ns + "/clients/" + clientId + "/channels";
251+
252+
this._redis.smembers(clientChannelsKey, function(error, channels) {
253+
if (error) {
254+
return self._server.error("Failed to fetch channels: ?", error);
255+
}
244256

245-
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
246-
var n = channels.length, i = 0;
247-
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
257+
var numChannels = channels.length, numUnsubscribes = 0;
258+
259+
if (numChannels == 0) {
260+
return self._deleteClient(clientId, callback, context);
261+
}
248262

249263
channels.forEach(function(channel) {
250-
self.unsubscribe(clientId, channel, function() {
251-
i += 1;
252-
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
264+
var channelsKey = self._ns + "/channels" + channel;
265+
self._redis.srem(channelsKey, clientId, function(error, res) {
266+
if (error) {
267+
return self._server.error("Failed to remove client ? from ?: ?", clientId, channelsKey, error);
268+
}
269+
numUnsubscribes += 1;
270+
self._server.trigger("unsubscribe", clientId, channel);
271+
if (numUnsubscribes == numChannels) {
272+
self._deleteClient(clientId, callback, context);
273+
}
253274
});
254275
});
255276
});
256277
},
257278

258-
_afterSubscriptionsRemoved: function(clientId, callback, context) {
259-
var self = this;
260-
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
261-
self._redis.zrem(self._ns + '/clients', clientId, function() {
262-
self._server.debug('Destroyed client ?', clientId);
263-
self._server.trigger('disconnect', clientId);
264-
if (callback) callback.call(context);
279+
// Removes the client bookkeeping records.
280+
//
281+
// Finishes client cleanup by removing the mailbox, channel set, and finally
282+
// the client ID from the sorted set. Once again, any Redis errors shut down
283+
// the callback chain, and we'll rely on GC to pick it back up again.
284+
_deleteClient: function(clientId, callback, context) {
285+
var self = this,
286+
clientChannelsKey = this._ns + "/clients/" + clientId + "/channels",
287+
clientMessagesKey = this._ns + "/clients/" + clientId + "/messages";
288+
289+
this._redis.del(clientChannelsKey, function(error, res) {
290+
if (error) {
291+
return self._server.error("Failed to remove client channels ?: ?", clientChannelsKey, error);
292+
}
293+
self._redis.del(clientMessagesKey, function(error, res) {
294+
if (error) {
295+
return self._server.error("Failed to remove client messages ?: ?", clientMessagesKey, error);
296+
}
297+
self._redis.zrem(self._ns + "/clients", clientId, function(error, res) {
298+
if (error) {
299+
return self._server.error("Failed to remove client ID ? from /clients: ?", clientId, error);
300+
}
301+
self._server.debug("Destroyed client ? successfully", clientId);
302+
self._server.trigger("disconnect", clientId);
303+
if (callback) {
304+
callback.call(context);
305+
}
306+
});
265307
});
266308
});
267309
},
@@ -313,7 +355,6 @@ Engine.prototype = {
313355
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
314356
self._redis.publish(self._ns + '/notifications', clientId);
315357
self._redis.expire(self._ns + '/clients/' + clientId + '/messages', 3600)
316-
self._checkClient(clientId);
317358

318359
notified.push(clientId);
319360
}
@@ -328,14 +369,6 @@ Engine.prototype = {
328369
this._server.trigger('publish', message.clientId, message.channel, message.data);
329370
},
330371

331-
_checkClient: function(clientId) {
332-
var self = this;
333-
334-
this.clientExists(clientId, function(exists) {
335-
if (!exists) self.destroyClient(clientId);
336-
});
337-
},
338-
339372
emptyQueue: function(clientId) {
340373
if (!this._server.hasConnection(clientId)) return;
341374

@@ -356,65 +389,31 @@ Engine.prototype = {
356389
if (typeof timeout !== 'number') return;
357390

358391
this._redis.urls.forEach(function(url) {
359-
console.log("[" + url + "] Running GC");
360-
var connection = this._redis.connections[url];
361-
362-
this._withLock(connection, 'gc', function(releaseLock) {
363-
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
364-
self = this,
365-
args;
366-
367-
var pruneClientsCallback = function pruneClientsCallback(error, clients) {
368-
var i = 0, n = clients.length;
369-
if (i === n) return releaseLock();
370-
371-
clients.forEach(function(clientId) {
372-
this.destroyClient(clientId, function() {
373-
i += 1;
374-
if (i === n) {
375-
console.log("[" + url + "] Destroyed " + n + " expired clients");
376-
releaseLock();
377-
}
378-
}, this);
379-
}, self);
380-
};
381-
382-
if (this._gc_limit) {
383-
args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback];
384-
} else {
385-
args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback];
386-
}
387-
388-
connection.zrangebyscore.apply(connection, args);
389-
}, this);
390-
}, this);
391-
},
392-
393-
_withLock: function(connection, lockName, callback, context) {
394-
var lockKey = this._ns + '/locks/' + lockName,
395-
currentTime = new Date().getTime(),
396-
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
397-
self = this;
392+
this._server.debug("Running GC for ?", url);
398393

399-
var releaseLock = function() {
400-
if (new Date().getTime() < expiry) connection.del(lockKey);
401-
};
402-
403-
connection.setnx(lockKey, expiry, function(error, set) {
404-
if (set === 1) return callback.call(context, releaseLock);
394+
var connection = this._redis.connections[url],
395+
cutoff = new Date().getTime() - 1000 * 2 * timeout,
396+
self = this,
397+
args;
405398

406-
connection.get(lockKey, function(error, timeout) {
407-
if (!timeout) return;
399+
var pruneClientsCallback = function pruneClientsCallback(error, clients) {
400+
if (error) {
401+
return self._server.error("Failed to fetch clients to GC: ?", error);
402+
}
403+
clients.forEach(function(clientId) {
404+
self._server.debug("GC time for ? ...", clientId);
405+
self.destroyClient(clientId);
406+
});
407+
};
408408

409-
var lockTimeout = parseInt(timeout, 10);
410-
if (currentTime < lockTimeout) return;
409+
if (this._gc_limit) {
410+
args = [this._ns + "/clients", 0, cutoff, "LIMIT", 0, this._gc_limit, pruneClientsCallback];
411+
} else {
412+
args = [this._ns + "/clients", 0, cutoff, pruneClientsCallback];
413+
}
411414

412-
connection.getset(lockKey, expiry, function(error, oldValue) {
413-
if (oldValue !== timeout) return;
414-
callback.call(context, releaseLock);
415-
});
416-
});
417-
});
415+
connection.zrangebyscore.apply(connection, args);
416+
}, this);
418417
}
419418
};
420419

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
55
, "keywords" : ["pubsub", "bayeux"]
66

7-
, "version" : "0.1.7"
7+
, "version" : "0.1.9"
88
, "engines" : {"node": ">=0.4.0"}
99
, "main" : "./faye-redis"
1010
, "dependencies" : {"hiredis": "", "redis": "", "consistent-hashing": ""}

spec/runner.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ Faye = require('../vendor/faye/build/node/faye-node')
55
require('../vendor/faye/spec/javascript/engine_spec')
66
require('./faye_redis_spec')
77

8+
// Faye.Logging.logLevel = 'debug';
9+
810
JS.Test.autorun()

0 commit comments

Comments
 (0)