-
Notifications
You must be signed in to change notification settings - Fork 488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Get all clients in a room across all nodes #109
Changes from 4 commits
c6a736f
2a78a1a
48ed4f8
ab448e0
faec2a6
ef1bba8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,8 +37,10 @@ function adapter(uri, opts){ | |
// opts | ||
var pub = opts.pubClient; | ||
var sub = opts.subClient; | ||
|
||
var prefix = opts.key || 'socket.io'; | ||
var subEvent = opts.subEvent || 'message'; | ||
var clientsTimeout = opts.clientsTimeout || 1000; | ||
|
||
// init clients if needed | ||
function createClient(redis_opts) { | ||
|
@@ -52,6 +54,8 @@ function adapter(uri, opts){ | |
|
||
if (!pub) pub = createClient(); | ||
if (!sub) sub = createClient({ return_buffers: true }); | ||
|
||
var subJson = sub.duplicate({ return_buffers: false }); | ||
|
||
// this server's key | ||
var uid = uid2(6); | ||
|
@@ -68,7 +72,11 @@ function adapter(uri, opts){ | |
|
||
this.uid = uid; | ||
this.prefix = prefix; | ||
this.clientsTimeout = clientsTimeout; | ||
|
||
this.channel = prefix + '#' + nsp.name + '#'; | ||
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#'; | ||
|
||
if (String.prototype.startsWith) { | ||
this.channelMatches = function (messageChannel, subscribedChannel) { | ||
return messageChannel.startsWith(subscribedChannel); | ||
|
@@ -82,10 +90,17 @@ function adapter(uri, opts){ | |
this.subClient = sub; | ||
|
||
var self = this; | ||
|
||
sub.subscribe(this.channel, function(err){ | ||
if (err) self.emit('error', err); | ||
}); | ||
|
||
subJson.subscribe(this.syncChannel, function(err){ | ||
if (err) self.emit('error', err); | ||
}); | ||
|
||
sub.on(subEvent, this.onmessage.bind(this)); | ||
subJson.on(subEvent, this.onclients.bind(this)); | ||
} | ||
|
||
/** | ||
|
@@ -124,6 +139,41 @@ function adapter(uri, opts){ | |
this.broadcast.apply(this, args); | ||
}; | ||
|
||
/** | ||
* Called with a subscription message on sync | ||
* | ||
* @api private | ||
*/ | ||
|
||
Redis.prototype.onclients = function(channel, msg){ | ||
|
||
if (!this.channelMatches(channel.toString(), this.syncChannel)) { | ||
return debug('ignore different channel'); | ||
} | ||
|
||
try { | ||
var decoded = JSON.parse(msg); | ||
} catch(err){ | ||
self.emit('error', err); | ||
return; | ||
} | ||
|
||
Adapter.prototype.clients.call(this, decoded.rooms, function(err, clients){ | ||
if(err){ | ||
self.emit('error', err); | ||
return; | ||
} | ||
|
||
var responseChn = prefix + '-sync#response#' + decoded.transaction; | ||
var response = JSON.stringify({ | ||
clients : clients | ||
}); | ||
|
||
pub.publish(responseChn, response); | ||
}); | ||
|
||
}; | ||
|
||
/** | ||
* Broadcasts a packet. | ||
* | ||
|
@@ -236,10 +286,88 @@ function adapter(uri, opts){ | |
}); | ||
}; | ||
|
||
/** | ||
* Gets a list of clients by sid. | ||
* | ||
* @param {Array} explicit set of rooms to check. | ||
* @api public | ||
*/ | ||
|
||
Redis.prototype.clients = function(rooms, fn){ | ||
if ('function' == typeof rooms){ | ||
fn = rooms; | ||
rooms = null; | ||
} | ||
|
||
rooms = rooms || []; | ||
|
||
var self = this; | ||
|
||
var transaction = uid2(6); | ||
var responseChn = prefix + '-sync#response#' + transaction; | ||
|
||
pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){ | ||
if (err) { | ||
self.emit('error', err); | ||
if (fn) fn(err); | ||
return; | ||
} | ||
|
||
numsub = numsub[1]; | ||
|
||
var msg_count = 0; | ||
var clients = {}; | ||
|
||
subJson.on("subscribe", function subscribed(channel, count) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason for double quotes here? |
||
|
||
var request = JSON.stringify({ | ||
transaction : transaction, | ||
rooms : rooms | ||
}); | ||
|
||
/*If there is no response for 5 seconds, return result;*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 5 seconds? |
||
var timeout = setTimeout(function() { | ||
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients))); | ||
}, self.clientsTimeout); | ||
|
||
subJson.on(subEvent, function onEvent(channel, msg) { | ||
|
||
if (!self.channelMatches(channel.toString(), responseChn)) { | ||
return debug('ignore different channel'); | ||
} | ||
|
||
var response = JSON.parse(msg); | ||
|
||
for(var i = 0; i < response.clients.length; i++){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you maybe add a test for the existence of the |
||
clients[response.clients[i]] = true; | ||
} | ||
|
||
msg_count++; | ||
if(msg_count == numsub){ | ||
clearTimeout(timeout); | ||
subJson.unsubscribe(responseChn); | ||
subJson.removeListener('subscribe', subscribed); | ||
subJson.removeListener(subEvent, onEvent); | ||
|
||
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients))); | ||
} | ||
}); | ||
|
||
pub.publish(self.syncChannel, request); | ||
|
||
}); | ||
|
||
subJson.subscribe(responseChn); | ||
|
||
}); | ||
|
||
}; | ||
|
||
Redis.uid = uid; | ||
Redis.pubClient = pub; | ||
Redis.subClient = sub; | ||
Redis.prefix = prefix; | ||
Redis.clientsTimeout = clientsTimeout; | ||
|
||
return Redis; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note:
self
seems undefined here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed