Skip to content

Commit

Permalink
functionnal POC
Browse files Browse the repository at this point in the history
  • Loading branch information
fed135 committed Mar 29, 2016
1 parent f10f758 commit d0cc8e9
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 37 deletions.
60 changes: 45 additions & 15 deletions src/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@ var middleware = require('./middleware');

/* Methods -------------------------------------------------------------------*/

Client.UID = 0;

/**
* Client constructor
* @constructor
* @param {object} options The configuration options for the client
*/
function Client(options) {
function Client(socket, options) {
EventEmitter.call(this);

this.uid = Client.UID++;

if (options === undefined) {
options = socket;
socket = null;
}
options = options || {};

this.options = {
Expand All @@ -47,13 +55,20 @@ function Client(options) {
};

// List of channels
this._channels = options.channels || {};
this.channels = {};
// Populate channels
if (options.channels) {
for (var c in options.channels) {
this.channel(c, options.channels[c]);
}
}

// Socket object
this.socket = options.socket || null;
if (socket) this.socket = socket;
else this.socket = this._createSocket();

// Data packets - transient state - by channel
this._packets = {};
this.packets = {};
}

/**
Expand All @@ -68,16 +83,16 @@ Client.prototype.channel = function(name, handler) {

if (name[0] !== '/') name = '/' + name;

if (!(name in this._channels)) {
if (!(name in this.channels)) {
debug(
'log: new channel ' +
this.options.adapter + '://' + this.options.hostname + ':' +
this.options.port + name
);
this._channels[name] = [];
this.channels[name] = [];
}

this._channels[name].push(handler);
this.channels[name].push(handler);
return this;
};

Expand All @@ -93,24 +108,39 @@ Client.prototype.use = function(socket) {
* @param {string|object} payload The payload to send
*/
Client.prototype.send = function(channel, payload) {
channel = channel || '/';
if (!this.packets[channel]) this.packets[channel] = [];
this.packets[channel].push(payload);
// Go through middlewares
middleware.process(this, payload);
middleware.process(this, channel, payload);
};

Client.prototype._createSocket = function() {
this.socket = adapters.resolve(this.adapter).createSocket(this);
return adapters.resolve(this.options.adapter).createSocket(this);
};

Client.prototype._emit = function(payload) {
this.adapter.prototype.send(
Client.prototype._emit = function(channel) {
adapters.resolve(this.options.adapter).send(
this.socket,
encoders[this.peer.options.encoder].encode(payload)
encoders.resolve(this.options.encoder).encode({
c: channel,
d: this.packets[channel]
})
);
this.packets[channel].length = 0;
}

Client.prototype._handleRequest = function(evt, data) {
console.log('test');
console.log(encoders[this.options.encoder].decode(evt || data));
Client.prototype._handleRequest = function(evt) {
var raw = encoders.resolve(this.options.encoder).decode(evt);
if (raw.c[0] !== '/') raw.c = '/' + raw.c;

if (raw.c in this.channels) {
for (var i = 0; i<raw.d.length; i++) {
for (var c = 0; c<this.channels[raw.c].length; c++) {
this.channels[raw.c][c](raw.d[i]);
}
}
}
};

util.inherits(Client, EventEmitter);
Expand Down
21 changes: 18 additions & 3 deletions src/Server.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ function Server(options) {

options = options || {};

this.status = 'off';
this.options = {
adapter: options.adapter || 'ipc',
encoder: options.encoder || 'json',
port: options.port || 80
};

this.connections = [];
this.channels = options.channels || {};

this.listen();
}

Expand All @@ -55,17 +57,30 @@ Server.prototype.listen = function(callback) {
}
};

Server.prototype.broadcast = function(channel, payload) {
for (var i = this.connections.length - 1; i >= 0; i--) {
this.connections[i].send(channel, payload);
}
};

Server.prototype._handleLift = function() {
this.emit('ready');
};

Server.prototype.stop = function() {
this.connections.length = 0;
if (this.listener) this.listener.stop();
};

Server.prototype._handleRequest = function(socket) {
console.log(socket);
this.emit('connection', new Client(socket));
var client = new Client(socket, {
adapter: this.options.adapter,
encoder: this.options.encoder,
channels: this.channels
});
socket.on('data', client._handleRequest.bind(client));
this.connections.push(client);
this.emit('connection', client);
};

util.inherits(Server, EventEmitter);
Expand Down
4 changes: 2 additions & 2 deletions src/adapters/ipc.adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var defaultPath = '/tmp/app.socket-';
*/
function listen(server, callback) {
fs.unlink(defaultPath + server.options.port, function _bindSocket() {
server.listener = net.createServer(server._handleRequest);
server.listener = net.createServer(server._handleRequest.bind(server));
server.listener.listen(defaultPath + server.options.port, callback);
});
};
Expand All @@ -47,7 +47,7 @@ function send(socket, payload) {
* @returns {Socket} The created ipc socket
*/
function createSocket(client) {
var socket = net.connect(defaultPath + this.options.port);
var socket = net.connect(defaultPath + client.options.port);
socket.on('data', client._handleRequest.bind(client));

return socket;
Expand Down
28 changes: 21 additions & 7 deletions src/middleware/bundler.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
function process() {
if (this.bundles.length > this.peer.options.maxBundle) {
this._bundleTimer = setTimeout(this._tick.bind(this), this.peer.options.bundleDelay);
function process(client, channel, payload) {
var options = client.options.transform.bundler;

if (!client.__bundler) {
client.__bundler = {
timers: {}
};
}
else {
this._bundleTimer = null;

if (client.packets[channel].length > options.maxPackets) {
if (client.__bundler.timers[channel]) {
clearTimeout(client.__bundler.timers[channel]);
}
client._emit.call(client, channel);
}

this.adapter.prototype.send.call(this, encoders[this.peer.options.encoder].encode(this.bundles.splice(0, this.peer.options.maxBundle)));
this.onComplete.dispatch();
if (!client.__bundler.timers[channel]) {
client.__bundler.timers[channel] = setTimeout(
function _emitBundle() {
client._emit.call(client, channel);
},
options.delay
);
}
}
// Bundling logic
// -----------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ var list = {
bundler: bundler
};

function process(socket, payload) {

function process(client, channel, payload) {
for (var t in client.options.transform) {
if (t in list) list[t].process(client, channel, payload);
}
}

function register(name, mod) {
Expand Down
30 changes: 22 additions & 8 deletions tests/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,38 @@ var Kalm = require('../index');
var server = new Kalm.Server({
port: 3000,
adapter: 'ipc',
encoder: 'msg-pack'
encoder: 'msg-pack',
channels: {
channel1: function(data) {
console.log('GOT "' + data + '" on channel1!');
},
'/': function(data) {
console.log('GOT "' + data + '" on main channel!');
}
}
});

server.on('connection', function(client) {
console.log('client');
console.log(client);
// Do stuff
client.send('greetings', 'Hi there!');
});

server.on('ready', function() {
var client = new Kalm.Client({
port: 3000,
adapter: 'ipc',
encoder:'msg-pack',
hostname: '0.0.0.0'
hostname: '0.0.0.0',
channels: {
greetings: function(data) {
console.log('Server [greetings]: ' + data);
}
}
});

client.channel('rare_pepes').send('kalm_pepe');
client.channel('rare_pepes').send('kalm_doge');

client.channel().send('klam klam');
client.send('channel1', 'some data');
client.send('channel1', 'some more data');
setTimeout(function() {
client.send(null, 'later sent data');
}, 1000);
});

0 comments on commit d0cc8e9

Please sign in to comment.