Skip to content

Commit

Permalink
Merge pull request #64 from janusvr/multiprocessing
Browse files Browse the repository at this point in the history
Multiprocessing
  • Loading branch information
bioid authored Jul 17, 2017
2 parents 9a00a23 + a156047 commit 9cbdb66
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 556 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
language: node_js
node_js:
- "4"
- "6"
- "7"
services:
- mysql
- redis-server
before_install:
- mysql -e 'CREATE DATABASE janusvr;'
- cp test/test-config.js ./config.js
Expand Down
37 changes: 16 additions & 21 deletions config-example.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,6 @@ module.exports = {
/* Socket port to listen on */
port: 5566,

/* Web UI */
startWebServer: false,
webServerPort: 8080,
webServerPortHttps: 8081,
/*Log Levels - default is "info"
*
* This setting controls all logging into
* the server.log file - other than the
* restart message which will ALWAYS show.
*
* "info" -> log info and error events
* "error" -> log error events only
* "silent" -> do not log at all
*
* debug, warn, fatal, error, http could be
* implemented as well, but not done yet*/
logLevel: "info",

// Path to log output file
logFilePath: "./server.log",

/* SSL configurations */
ssl: {
port: 5567,
Expand All @@ -35,6 +14,22 @@ module.exports = {
cert: fs.readFileSync('cert/server-cert.pem'),
}
},

/*
************************************************************************
*** The following options REQUIRE a redis database to function ! ***
************************************************************************
*/
multiprocess: {
enabled: false, // requires redis for IPC
processes: 1
},
partyList: false,
redis: {
host: "127.0.0.1",
port: 6379,
//password: null
},


/*
Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,26 @@
"author": "Lisa Croxford, Michael/HAZARDU5, Jerad Bitner, na/source83, oculushut, Tim Schwartz, JamesHagerman",
"license": "MIT",
"dependencies": {
"byline": "^4.1.1",
"body-parser": "*",
"byline": "^4.1.1",
"express": "^4.6.1",
"finished": "^1.2.2",
"http-string-parser": "0.0.5",
"janus-method-ping": "^0.0.3",
"janus-mysql-auth": "^0.0.2",
"janus-mysql-userlist-official": "*",
"janus-mysql-userlist-official": ">=0.0.11",
"janus-mysql-popular": "*",
"mysql": "^2.12.0",
"npmlog": "^0.1.1",
"optimist": "^0.6.1",
"redis": "^2.6.3",
"simplesets": "^1.2.0",
"split-ca": "^1.0.0",
"websocket-driver": "^0.6.4"
},
"devDependencies": {
"chai": "^3.5.0",
"janus-node-client": "0.0.1",
"mocha": "^3.0.2",
"superagent": "^2.2.0",
"websocket": "*"
Expand Down
269 changes: 20 additions & 249 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,262 +1,33 @@
/* global log */
var cluster = require('cluster');
var redis = require('redis');
global.config = require('./config.js');

var args = require('optimist').argv;
global.config = require(args.config || './config.js');
var net = require('net');
var tls = require('tls');
var events = require('events');
var express = require('express');
var fs = require('fs');
var sets = require('simplesets');
var mysql = require('mysql');
var bodyParser = require('body-parser');
// websocket requires
var websocket = require('websocket-driver');
var parser = require('http-string-parser');
var WebSocketStream = require('./src/WebSocketStream');

global.log = require('./src/Logging');

var Session = require('./src/Session');
var Room = require('./src/Room');
var Plugins = require('./src/Plugins');

function Server() {
var d = new Date();
this._sessions = new sets.Set();
this._rooms = {};
this._userList = Array();
this._partyList = {};
this._plugins = new Plugins(this);

}

Server.prototype.getRoom = function (roomId) {
if (this._rooms[roomId] === undefined) {
this._rooms[roomId] = new Room(roomId);
}

return this._rooms[roomId];
};

// ## Check if username is in use ##
Server.prototype.isNameFree = function (name) {

var free = true;
this._sessions.each(function (s) {
if (s.id === name) {
free = false;
}
});
return free;
};

// ## Start Socket Server ##
Server.prototype.start = function (callback) {
if (cluster.isMaster && global.config.multiprocess.enabled) {
var numCPUs = global.config.multiproccess.processes;
console.log(`Starting ${numCPUs} workers`);
console.log('========================');
console.log('Janus VR Presence Server');
console.log('Janus VR Presence Server (clustered)');
console.log('========================');
log.info('Startup date/time: ' + Date());

console.log('See server.log for activity information and config.js for configuration');
console.log('Log level: ' + config.logLevel);
console.log('See config.js for configuration');
console.log('Startup date/time: ' + Date());

this.server = net.createServer(this.onConnect.bind(this));
this.server.listen(config.port, "::", function (err) {

if (err) {
log.error('Socket Server error listening on port: ' + config.port);
process.exit(1);
}

log.info('Socket Server listening on port: ' + config.port);
console.log('Socket Server listening on port: ' + config.port);

});

if (config.ssl) {

this.ssl = tls.createServer(config.ssl.options, this.onConnect.bind(this));
this.ssl.listen(config.ssl.port, "::", function (err) {

if (err) {
log.error('SSL Server error listening on port: ' + config.ssl.port);
process.exit(1);
}

console.log('SSL Server listening on port: ' + config.ssl.port);
log.info('SSL Server listening on port: ' + config.ssl.port);

});
}

if (config.startWebServer) {
this.startWebServer();
}
if (callback && typeof(callback) == "function")
callback();
};


// ## start web server ##
Server.prototype.startWebServer = function () {
var http = require('http'),
https = require('https');
var self = this;

this.ws = express();

this.ws.use(bodyParser.json());
var router = express.Router();

console.log('starting web server on port ' + config.webServerPort);

if (global.config.hookPlugins.hasOwnProperty('enter_room') &&
global.config.hookPlugins.enter_room.plugins.indexOf('janus-mysql-popular') > -1)
{
this._conn = mysql.createPool({
host : config.MySQL_Hostname,
user : config.MySQL_Username,
password : config.MySQL_Password,
database : config.MySQL_Database
var redisClient = redis.createClient(global.config.redis);
redisClient.del('userlist:multi');
redisClient.del('partylist:multi');
for (var i = 0; i < numCPUs; i++) {
var child = cluster.fork();
child.on('exit', () => {
redisClient.hdel('userlist', child.process.pid);
redisClient.hdel('partylist', child.process.pid);
});

router.get('/getPopularRooms', function (req, res) {
var limit = parseInt(req.query.limit, 10) || 20,
offset = parseInt(req.query.offset, 10) || 0,
orderBy = req.query.orderBy || "weight",
desc = (req.query.desc && req.query.desc === "true") ? "DESC" : "",
contains = req.query.urlContains ? "%" + req.query.urlContains + "%" : "%";
var sql = "SELECT roomName, url as roomUrl, count, weight, UNIX_TIMESTAMP(lastSeen) as lastEntered, thumbnail FROM `popular` WHERE url LIKE ? ORDER BY ?? "+desc+" LIMIT ?,?";
this._conn.query(sql, [contains, orderBy, offset, limit], function(err, results) {
if (err) {
console.log(err);
res.json({"success": false, "data": [{"error": "Error querying the DB"}]});
return;
}

res.json({"success": true, "data": results});
})
}.bind(this));

router.post('/addThumb', function (req, res) {
data = req.body;
if (!data['token'] ||
data['token'] !== global.config.popularRooms.masterToken)
return res.json({"success": false, "data": [{"error": "Invalid token"}]});
if (!data['roomUrl'] || !data['thumbnail'])
return res.json({"success": false, "data": [{"error": "Must POST roomUrl and thumbnail parameters"}]});
var roomUrl = data['roomUrl'],
thumbnail = data['thumbnail'];
var sql = "UPDATE popular SET thumbnail = ? WHERE url = ?";
this._conn.query(sql, [thumbnail, roomUrl], (err, results) => {
if (err) {
console.log(err);
return res.json({"success": false, "data": [{"error": "Error querying the DB"}]});
}
return res.json({"success": true});
});
}.bind(this));
}
router.get('/log', function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain', 'Content-Length': -1, 'Transfer-Encoding': 'chunked'});
var logFile = fs.createReadStream('server.log');
logFile.pipe(res);
});

router.get('/get_partylist', function (req, res) {
//console.log("get_partylist: " + JSON.stringify(self._partyList));
res.json(self._partyList);
});

router.get('/', function (req, res) {
res.send(200, 'Nothing to see here ... yet');
});


this.ws.use(router);

//this.webserver = this.ws.listen(config.webServerPort, "::");

this.webserver = http.createServer(this.ws)
this.webserver.listen(config.webServerPort, "::");
log.info('Webserver (http) started on port: ' + config.webServerPort);
this.webserverHttps = https.createServer(config.ssl.options, this.ws).listen(config.webServerPortHttps);
console.log('webserver', typeof(this.webserverHttps));
log.info('Webserver (https) started on port: ' + config.webServerPortHttps);
console.log('Start Date/Time: ' + Date());
};

Server.prototype.close = function(cb) {
this.server.close( (err) => {
if (config.startWebServer && this.webserver) {
this.webserver.close( (err) => {
return cb(err);
});
}
else {
return cb(err);
}
Object.keys(cluster.workers).forEach(function(id) {
console.log('SPAWNED', cluster.workers[id].process.pid);
});
}

// ## action on client connection ##
Server.prototype.onConnect = function (socket) {

var self = this;
var addr = socket.remoteAddress;
var s;

log.info('Client connected ' + addr);

// setup for websocket
var driver = websocket.server({'protocols': 'binary'});
socket.on('error', function (err) {
log.error(addr);
log.error('Socket error: ', err);
});
socket.on('close', function () {
log.info('Client disconnected: ' + addr);
if (s)
self._sessions.remove(s);
});

socket.once('data', function (data) {
// try to parse the packet as http
var request = parser.parseRequest(data.toString());

if (Object.keys(request.headers).length === 0)
{
// there are no http headers, this is a raw tcp connection
s = new Session(self, socket);
self._sessions.add(s);

// emit the first message so the session gets it
socket.emit('data', data);
}
});

driver.on('connect', function () {
if (websocket.isWebSocket(driver)) {
log.info('Websocket connection:', addr);
driver.start();

s = new Session(self, new WebSocketStream(driver, socket));
self._sessions.add(s)

driver.on('error', function (err) {
log.error(addr);
log.error('Websocket error: ', err);
});
}
});
socket.pipe(driver.io).pipe(socket);
};
if (require.main === module) {
(new Server()).start();
}
else {
module.exports = Server;
var Server = require("./src/Server.js");
(new Server()).start();
}

1 change: 1 addition & 0 deletions src/Logging.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

var npmlog = require('npmlog');
var args = require('optimist').argv;//Noted that this is deprecated. TODO: replace with either minimist or yargs
var onFinished = require('finished');
Expand Down
Loading

0 comments on commit 9cbdb66

Please sign in to comment.