Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for getting clients in a given room (Works across multiple nodes) #15

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,73 @@ By running socket.io with the `socket.io-redis` adapter you can run
multiple socket.io instances in different processes or servers that can
all broadcast and emit events to and from each other.

`socket.io-redis` use Redis pub/sub mechanism to route events to different nodes/servers and
store rooms and sockets ids in Redis sets.

If you need to emit events to socket.io instances from a non-socket.io
process, you should use [socket.io-emitter](http:///github.com/Automattic/socket.io-emitter).

## Stored schema

The module store two different entities in Redis: **socket** and **room**.

Each as a Redis SET.

Every key is prefixed with "socket.io". Prefix is customizable with the *key* option.

### socket

The module creates a new Redis SET for each new socket.

The socket SET key is defined as *{{PREFIX}}*#*{{SOCKET_ID}}* (e.g.: *socket.io#951wMmbBjkREmCapAAAD*).

The socket SET is created with one record: the socket ID string.

Then each time this socket join/leave a room module add/remove a Redis record in SET.

Example for a socket with the ID *951wMmbBjkREmCapAAAD* in *foo* and *bar* rooms:

```
socket.io#951wMmbBjkREmCapAAAD
-> 951wMmbBjkREmCapAAAD
-> foo
-> bar
```

### room

Each time a room is needed (= a socket join a room that not already exists) the module create a new Redis SET.

The room SET key is defined as *{{PREFIX}}*#*{{ROOM_NAME }}* (e.g.: *socket.io#foo*).
The room SET contain the socket IDs of the room sockets.

Then each time a socket join/leave the room the module add/remove the corresponding Redis record from the SET.

Example for a room *foo* with the following socket in *951wMmbBjkREmCapAAAD*, *566Mm_BjkREmRff456*:

```
socket.io#foo
-> 951wMmbBjkREmCapAAAD
-> 566Mm_BjkREmRff456
```

As with native adapter the not longer needed room SET are deleted automatically (except on application
exit, see below).

## Known limitation

**Warning! Current module implementation doesn't cleanup Redis storage on exit.**

Consequence is that in a multi-node/server configuration with the out-of-the-box module,
shutting down a node process will let sockets and rooms SET remain in Redis even if the
current sockets are not longer connected.

The reason is the non ability for node to execute asynchronous tasks (like Redis queries)
on exit.

So, every developer should implement his proper cleanup logic in the context of
his particular project.

## API

### adapter(uri[, opts])
Expand All @@ -36,8 +100,10 @@ The following options are allowed:
be used instead of the host and port options if specified.
- `pubClient`: optional, the redis client to publish events on
- `subClient`: optional, the redis client to subscribe to events on
- `dataClient`: optional, the redis client used to store and read socket.io
sockets/rooms data

If you decide to supply `pubClient` and `subClient`, make sure you use
If you decide to supply `pubClient`, `subClient` or `dataClient` make sure you use
[node_redis](https://github.com/mranney/node_redis) as a client or one
with an equivalent API.

Expand Down
123 changes: 118 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ function adapter(uri, opts){
var port = Number(opts.port || 6379);
var pub = opts.pubClient;
var sub = opts.subClient;
var data = opts.dataClient;
var prefix = opts.key || 'socket.io';

// init clients if needed
if (!pub) pub = socket ? redis(socket) : redis(port, host);
if (!sub) sub = socket
? redis(socket, { detect_buffers: true })
: redis(port, host, {detect_buffers: true});
if (!data) data = socket ? redis(socket) : redis(port, host);


// this server's key
Expand All @@ -65,10 +67,11 @@ function adapter(uri, opts){
* @api public
*/

var self = this;

function Redis(nsp){
self = this;
Adapter.call(this, nsp);

var self = this;
sub.psubscribe(prefix + '#*', function(err){
if (err) self.emit('error', err);
});
Expand All @@ -79,7 +82,7 @@ function adapter(uri, opts){
* Inherits from `Adapter`.
*/

Redis.prototype.__proto__ = Adapter.prototype;
Redis.prototype = Object.create(Adapter.prototype);

/**
* Called with a subscription message
Expand All @@ -93,13 +96,93 @@ function adapter(uri, opts){
var args = msgpack.decode(msg);

if (args[0] && args[0].nsp === undefined)
args[0].nsp = '/'
args[0].nsp = '/';

if (!args[0] || args[0].nsp != this.nsp.name) return debug('ignore different namespace')
if (!args[0] || args[0].nsp != this.nsp.name) return debug('ignore different namespace');
args.push(true);
this.broadcast.apply(this, args);
};

/**
* Adds a socket from a room.
*
* @param {String} socket id
* @param {String} room name
* @param {Function} callback
* @api public
*/

Redis.prototype.add = function(id, room, fn){
Adapter.prototype.add.call(this, id, room);
data.multi()
.sadd(prefix + '#' + room, id)
.sadd(prefix + '#' + id, room)
.exec(function(){
if (fn) process.nextTick(fn.bind(null, null));
});

};

/**
* Removes a socket from a room.
*
* @param {String} socket id
* @param {String} room name
* @param {Function} callback
* @api public
*/

Redis.prototype.del = function(id, room, fn){
Adapter.prototype.del.call(this, id, room);
data.multi()
.srem(prefix + '#' + room, id)
.srem(prefix + '#' + id, room)
.exec(function(){
if (fn) process.nextTick(fn.bind(null, null));
});
};


/**
* Removes a socket from all rooms it's joined.
*
* @param {String} socket id
* @api public
*/

Redis.prototype.delAll = function(id, fn){
Adapter.prototype.delAll.call(this, id);

data.smembers(prefix + '#' + id, function(err, rooms){
var multi = data.multi();
for(var i=0; i<rooms.length; ++i){
multi.srem(prefix + '#' + rooms[i], id);
}
multi.del(prefix + '#' + id);
multi.exec(fn);
});
};

/**
* Get all clients in room.
*
* @param {String} room id
* @api public
*/
Redis.prototype.clients = function(room, fn){
data.smembers(prefix + '#' + room, fn);
};

/**
* Get all rooms the client is in.
*
* @param {String} client id
* @api public
*/
Redis.prototype.roomClients = function(id, fn){
data.smembers(prefix + '#' + id, fn);
};

/**
* Broadcasts a packet.
*
Expand All @@ -114,6 +197,36 @@ function adapter(uri, opts){
if (!remote) pub.publish(key, msgpack.encode([packet, opts]));
};


// Set up exit handlers so we can clean up this process's redis data before exiting

process.stdin.resume(); //so the program will not close instantly
function exitHandler(options, err){
var i;
var multi = data.multi();
var execDone = false;

var roomIds = Object.keys(self.rooms);
var socketIds = Object.keys(self.sids);
for(i=0; i<roomIds.length; ++i){
multi.srem(prefix + '#' + roomIds[i], Object.keys(self.rooms[roomIds[i]]));
}
for(i=0; i<socketIds.length; ++i){
multi.srem(prefix + '#' + socketIds[i], Object.keys(self.sids[socketIds[i]]));
}
multi.exec(function(err, replies){
process.exit();
});
}

// //do something when app is closing
// process.on('exit', exitHandler.bind(null,{cleanup:true}));
process.on('SIGTERM', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGQUIT', exitHandler);
process.on('uncaughtException', exitHandler);


return Redis;

}
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "",
"repository": {
"type": "git",
"url": "git@github.com:Automattic/socket.io-redis.git"
"url": "git@github.com:FREEZX/socket.io-redis.git"
},
"scripts": {
"test": "mocha"
Expand All @@ -14,11 +14,11 @@
"uid2": "0.0.3",
"redis": "0.10.1",
"msgpack-js": "0.3.0",
"socket.io-adapter": "0.3.0"
"socket.io-adapter": "FREEZX/socket.io-adapter"
},
"devDependencies": {
"socket.io": "1.0.2",
"socket.io-client": "1.0.2",
"socket.io": "FREEZX/socket.io",
"socket.io-client": "Automattic/socket.io-client",
"mocha": "1.18.0",
"expect.js": "0.3.1",
"async": "0.2.10"
Expand Down
Loading