forked from rajaraodv/rabbitpubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.js
127 lines (105 loc) · 4.36 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
var express = require('express')
, routes = require('./routes')
, http = require('http')
, path = require('path')
, redis = require('redis')
, amqp = require('amqp');
var rabbitConn = amqp.createConnection({});
var chatExchange;
rabbitConn.on('ready', function () {
chatExchange = rabbitConn.exchange('chatExchange', {'type':'fanout'});
});
/*
Setup Express & Socket.io
*/
var app = express();
var server = http.createServer(app);
var io = require('socket.io').listen(server);
//Set xhr-polling as WebSocket is not supported by CF
io.set("transports", ["xhr-polling"]);
//Set Socket.io's log level to 1 (info). Default is 3 (debugging)
io.set('log level', 1);
/*
Also use Redis for Session Store. Redis will keep all Express sessions in it.
*/
var RedisStore = require('connect-redis')(express),
rClient = redis.createClient(),
sessionStore = new RedisStore({client:rClient});
var cookieParser = express.cookieParser('your secret here');
app.configure(function () {
app.set('port', process.env.PORT || 3000);
app.set('views', __dirname + '/views');
app.set('view engine', 'ejs');
app.use(express.favicon());
app.use(express.logger('dev'));
app.use(express.bodyParser());
app.use(express.methodOverride());
/*
Use cookieParser and session middlewares together.
By default Express/Connect app creates a cookie by name 'connect.sid'.But to scale Socket.io app,
make sure to use cookie name 'jsessionid' (instead of connect.sid) use Cloud Foundry's 'Sticky Session' feature.
W/o this, Socket.io won't work if you have more than 1 instance.
If you are NOT running on Cloud Foundry, having cookie name 'jsessionid' doesn't hurt - it's just a cookie name.
*/
app.use(cookieParser);
app.use(express.session({store:sessionStore, key:'jsessionid', secret:'your secret here'}));
app.use(app.router);
app.use(express.static(path.join(__dirname, 'public')));
});
app.configure('development', function () {
app.use(express.errorHandler());
});
app.get('/', routes.index);
/*
When the user logs in (in our case, does http POST w/ user name), store it
in Express session (which in turn is stored in Redis)
*/
app.post('/user', function (req, res) {
req.session.user = req.body.user;
res.json({"error":""});
});
/*
Use SessionSockets so that we can exchange (set/get) user data b/w sockets and http sessions
Pass 'jsessionid' (custom) cookie name that we are using to make use of Sticky sessions.
*/
var SessionSockets = require('session.socket.io');
var sessionSockets = new SessionSockets(io, sessionStore, cookieParser, 'jsessionid');
sessionSockets.on('connection', function (err, socket, session) {
/*
When a user sends a chat message, publish it to chatExchange w/o binding key (bindingKey doesn't matter
because chatExchange is fanout).
Notice that we are getting user's name from session.
*/
socket.on('chat', function (data) {
var msg = JSON.parse(data);
var reply = {action:'message', user:session.user, msg:msg.msg };
// pub.publish('chat', reply);
chatExchange.publish('', reply);
});
/*
When a user joins, publish it to chatExchange w/o binding key (bindingKey doesn't matter
because chatExchange is fanout).
Notice that we are getting user's name from session.
*/
socket.on('join', function () {
var reply = {action:'control', user:session.user, msg:' joined the channel' };
chatExchange.publish('', reply);
});
/*Initialize subscriber queue.
First create a queue w/o any name. This forces RabbitMQ to create new queue
for every socket.io connection w/ a new random queue name.
Then bind the queue to chatExchange and listen to ALL messages
Lastly, create a consumer (via .subscribe) that waits for messages from RabbitMQ. And when
a message comes, send it to the browser.
Notice that we are creating this w/in sessionSockets.on('connection' to create NEW queue for every connection
*/
rabbitConn.queue('', {exclusive:true}, function (q) {
q.bind('chatExchange', "#"); // bind to chat ALL channel/bindingKey
q.subscribe(function (message) {
socket.emit('chat', JSON.stringify(message));
});
});
});
server.listen(app.get('port'), function () {
console.log("Express server listening on port " + app.get('port'));
});