Skip to content

Commit

Permalink
Refactoring for better handling of dynamic services and socket connec…
Browse files Browse the repository at this point in the history
…tions.
  • Loading branch information
daffl committed Feb 23, 2015
1 parent aea0284 commit ffb603a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 118 deletions.
10 changes: 3 additions & 7 deletions lib/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,9 @@ module.exports = {
provider(location, protoService, options || {});
});

// If already _setup, just add this single service.
if (this._setup) {
// If we ran setup already, set this service up explicitly
if (this._isSetup) {
protoService.setup(this, location);
// If we're using a socket provider, register the service on it.
if (this.addService) {
this.addService(protoService, location);
}
}

this.services[location] = protoService;
Expand Down Expand Up @@ -89,7 +85,7 @@ module.exports = {
}
}.bind(this));

this._setup = true;
this._isSetup = true;

return this;
},
Expand Down
134 changes: 81 additions & 53 deletions lib/providers/socket/commons.js
Original file line number Diff line number Diff line change
@@ -1,75 +1,103 @@
'use strict';

var _ = require('lodash');

// The position of the params parameters for a service method so that we can extend them
// default is 1
var paramsPositions = {
var paramsPositions = exports.paramsPositions = {
find: 0,
update: 2,
patch: 2
};

exports.addService = function addService(service, path){
// Add handlers for the service to connected sockets.
_.each(this.info.connections, function (spark) {
this.setupMethodHandlers(service, path, spark);
}, this);

// Setup events for the service.
exports.setupEventHandlers.call(this, service, path);
// The default event dispatcher
exports.defaultDispatcher = function (data, params, callback) {
callback(null, data);
};

// Set up the service method handler for a service and socket.
exports.setupMethodHandler = function setupMethodHandler(emitter, params, service, path, method) {
var name = path + '::' + method;
var position = typeof paramsPositions[method] !== 'undefined' ? paramsPositions[method] : 1;

if (typeof service[method] === 'function') {
emitter.on(name, function () {
var args = _.toArray(arguments);
// If the service is called with no parameter object
// insert an empty object
if(typeof args[position] === 'function') {
args.splice(position, 0, {});
}
args[position] = _.extend({ query: args[position] }, params);
service[method].apply(service, args);
});
}
};

exports.setupEventHandlers = function setupEventHandlers(service, path){
// Set up event handlers for a given service using the event dispatching mechanism
exports.setupEventHandlers = function setupEventHandlers(info, path, service){
// If the service emits events that we want to listen to (Event mixin)
if (typeof service.on === 'function' && service._serviceEvents) {
_.each(service._serviceEvents, function (ev) {
exports.setupEventHandler(this.info, service, path, ev);
}, this);
var addEvent = function (ev) {
service.on(ev, function (data) {
// Check if there is a method on the service with the same name as the event
var dispatcher = typeof service[ev] === 'function' ?
service[ev] : exports.defaultDispatcher;
var eventName = path + ' ' + ev;

info.clients().forEach(function (socket) {
dispatcher(data, info.params(socket), function (error, dispatchData) {
if (error) {
socket[info.method]('error', error);
} else if (dispatchData) { // Only dispatch if we have data
socket[info.method](eventName, dispatchData);
}
});
});
});
};

_.each(service._serviceEvents, addEvent);
}
};

// Set up event handlers for a given service and connected sockets.
// Send it through the service dispatching mechanism (`removed(data, params, callback)`,
// `updated(data, params, callback)` and `created(data, params, callback)`) if it
// exists.
exports.setupEventHandler = function setupEventHandler (info, service, path, ev) {
var defaultDispatcher = function (data, params, callback) {
callback(null, data);
};

service.on(ev, function (data) {
// Check if there is a method on the service with the same name as the event
var dispatcher = typeof service[ev] === 'function' ? service[ev] : defaultDispatcher;
var eventName = path + ' ' + ev;
// Set up all method handlers for a service and socket.
exports.setupMethodHandlers = function setupMethodHandlers(info, socket, path, service) {
this.methods.forEach(function(method) {
if (typeof service[method] === 'function') {
var name = path + '::' + method;
var params = info.params(socket);
var position = typeof paramsPositions[method] !== 'undefined' ?
paramsPositions[method] : 1;

info.emitters().forEach(function (emitter) {
dispatcher(data, info.params(emitter), function (error, dispatchData) {
if (error) {
emitter[info.method]('error', error);
} else if (dispatchData) {
emitter[info.method](eventName, dispatchData);
socket.on(name, function () {
var args = _.toArray(arguments);
// If the service is called with no parameter object
// insert an empty object
if(typeof args[position] === 'function') {
args.splice(position, 0, {});
}
args[position] = _.extend({ query: args[position] }, params);
service[method].apply(service, args);
});
}
});
};

// Common setup functionality taking the info object which abstracts websocket access
exports.setup = function(info) {
var app = this;

app._commons = info;

// For a new connection, set up the service method handlers
info.connection().on('connection', function (socket) {
// Process services that were registered at startup.
_.each(app.services, function (service, path) {
exports.setupMethodHandlers.call(app, info, socket, path, service);
});
});

// Set up events and event dispatching
_.each(app.services, function (service, path) {
exports.setupEventHandlers.call(app, info, path, service);
}, this);
};

// Socket mixin when a new service is registered
exports.service = function(path, service) {
var protoService = this._super.apply(this, arguments);
var app = this;
var info = this._commons;

// app._socketInfo will only be available once we are set up
if(service && info) {
// Set up event handlers for this new service
exports.setupEventHandlers.call(app, info, path, protoService);
// For any existing connection add method handlers
info.clients().forEach(function(socket) {
exports.setupMethodHandlers.call(app, info, socket, path, protoService);
});
}

return protoService;
};
40 changes: 11 additions & 29 deletions lib/providers/socket/primus.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

var _ = require('lodash');
var Proto = require('uberproto');
var Primus = require('primus');
var Emitter = require('primus-emitter');
Expand All @@ -14,54 +13,37 @@ module.exports = function(config, configurer) {

// Monkey patch app.setup(server)
Proto.mixin({
service: commons.service,

setup: function(server) {
var self = this;
var result = this._super.apply(this, arguments);

if (this.disabled('feathers primus')) {
return result;
}

var primus = this.primus = new Primus(server, config);
this.info = {
emitters: function() {

commons.setup.call(this, {
method: 'send',
connection: function() {
return primus;
},
clients: function() {
return primus;
},
params: function(spark) {
return spark.request.feathers;
},
method: 'send',
connections: this.primus.connections
};

primus.use('emitter', Emitter);

// For a new connection, set up the service method handlers
primus.on('connection', function (spark) {
// Process services that were registered at startup.
_.each(self.services, function (service, path) {
self.setupMethodHandlers.call(self, service, path, spark);
});
}
});

// Set up events and event dispatching
_.each(this.services, function (service, path) {
commons.setupEventHandlers.call(this, service, path);
}, this);
primus.use('emitter', Emitter);

if (typeof configurer === 'function') {
configurer.call(this, primus);
}

return result;
},

addService: commons.addService,

setupMethodHandlers: function(service, path, spark){
_.each(this.methods, function (method) {
commons.setupMethodHandler(spark, spark.request.feathers, service, path, method);
}, this);
}
}, app);
};
Expand Down
39 changes: 10 additions & 29 deletions lib/providers/socket/socketio.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

var _ = require('lodash');
var socketio = require('socket.io');
var Proto = require('uberproto');
var commons = require('./commons');
Expand All @@ -13,53 +12,35 @@ module.exports = function (config) {

// Monkey patch app.setup(server)
Proto.mixin({
service: commons.service,

setup: function (server) {
var self = this;
var result = this._super.apply(this, arguments);

if (this.disabled('feathers socketio')) {
return result;
}

var io = this.io = socketio.listen(server);
// The info object we can pass to commons.setupEventHandler
this.info = {
emitters: function() {

commons.setup.call(this, {
method: 'emit',
connection: function() {
return io.sockets;
},
clients: function() {
return io.sockets.sockets;
},
params: function(socket) {
return socket.feathers;
},
method: 'emit',
connections: this.connections = this.io.sockets.connected
};

// For a new connection, set up the service method handlers
io.sockets.on('connection', function (socket) {
// Process services that were registered at startup.
_.each(self.services, function (service, path) {
self.setupMethodHandlers.call(self, service, path, socket);
});
}
});

// Set up events and event dispatching
_.each(self.services, function (service, path) {
commons.setupEventHandlers.call(this, service, path);
}, this);

if (typeof config === 'function') {
config.call(this, io);
}

return result;
},

addService: commons.addService,

setupMethodHandlers: function(service, path, socket){
_.each(this.methods, function (method) {
commons.setupMethodHandler(socket, socket.feathers || {}, service, path, method);
}, this);
}
}, app);
};
Expand Down

0 comments on commit ffb603a

Please sign in to comment.