From d0cc8e9f27973b10c9afb9f89a2b3234affaf09b Mon Sep 17 00:00:00 2001 From: Frederic Date: Sat, 26 Mar 2016 17:34:40 -0400 Subject: [PATCH] functionnal POC --- src/Client.js | 60 +++++++++++++++++++++++++++---------- src/Server.js | 21 +++++++++++-- src/adapters/ipc.adapter.js | 4 +-- src/middleware/bundler.js | 28 ++++++++++++----- src/middleware/index.js | 6 ++-- tests/test.js | 30 ++++++++++++++----- 6 files changed, 112 insertions(+), 37 deletions(-) diff --git a/src/Client.js b/src/Client.js index 61a6eab..6e95287 100644 --- a/src/Client.js +++ b/src/Client.js @@ -19,14 +19,22 @@ var middleware = require('./middleware'); /* Methods -------------------------------------------------------------------*/ +Client.UID = 0; + /** * Client constructor * @constructor * @param {object} options The configuration options for the client */ -function Client(options) { +function Client(socket, options) { EventEmitter.call(this); + this.uid = Client.UID++; + + if (options === undefined) { + options = socket; + socket = null; + } options = options || {}; this.options = { @@ -47,13 +55,20 @@ function Client(options) { }; // List of channels - this._channels = options.channels || {}; + this.channels = {}; + // Populate channels + if (options.channels) { + for (var c in options.channels) { + this.channel(c, options.channels[c]); + } + } // Socket object - this.socket = options.socket || null; + if (socket) this.socket = socket; + else this.socket = this._createSocket(); // Data packets - transient state - by channel - this._packets = {}; + this.packets = {}; } /** @@ -68,16 +83,16 @@ Client.prototype.channel = function(name, handler) { if (name[0] !== '/') name = '/' + name; - if (!(name in this._channels)) { + if (!(name in this.channels)) { debug( 'log: new channel ' + this.options.adapter + '://' + this.options.hostname + ':' + this.options.port + name ); - this._channels[name] = []; + this.channels[name] = []; } - this._channels[name].push(handler); + this.channels[name].push(handler); return this; }; @@ -93,24 +108,39 @@ Client.prototype.use = function(socket) { * @param {string|object} payload The payload to send */ Client.prototype.send = function(channel, payload) { + channel = channel || '/'; + if (!this.packets[channel]) this.packets[channel] = []; + this.packets[channel].push(payload); // Go through middlewares - middleware.process(this, payload); + middleware.process(this, channel, payload); }; Client.prototype._createSocket = function() { - this.socket = adapters.resolve(this.adapter).createSocket(this); + return adapters.resolve(this.options.adapter).createSocket(this); }; -Client.prototype._emit = function(payload) { - this.adapter.prototype.send( +Client.prototype._emit = function(channel) { + adapters.resolve(this.options.adapter).send( this.socket, - encoders[this.peer.options.encoder].encode(payload) + encoders.resolve(this.options.encoder).encode({ + c: channel, + d: this.packets[channel] + }) ); + this.packets[channel].length = 0; } -Client.prototype._handleRequest = function(evt, data) { - console.log('test'); - console.log(encoders[this.options.encoder].decode(evt || data)); +Client.prototype._handleRequest = function(evt) { + var raw = encoders.resolve(this.options.encoder).decode(evt); + if (raw.c[0] !== '/') raw.c = '/' + raw.c; + + if (raw.c in this.channels) { + for (var i = 0; i= 0; i--) { + this.connections[i].send(channel, payload); + } +}; + Server.prototype._handleLift = function() { this.emit('ready'); }; Server.prototype.stop = function() { + this.connections.length = 0; if (this.listener) this.listener.stop(); }; Server.prototype._handleRequest = function(socket) { - console.log(socket); - this.emit('connection', new Client(socket)); + var client = new Client(socket, { + adapter: this.options.adapter, + encoder: this.options.encoder, + channels: this.channels + }); + socket.on('data', client._handleRequest.bind(client)); + this.connections.push(client); + this.emit('connection', client); }; util.inherits(Server, EventEmitter); diff --git a/src/adapters/ipc.adapter.js b/src/adapters/ipc.adapter.js index c13b317..0e8d4bf 100644 --- a/src/adapters/ipc.adapter.js +++ b/src/adapters/ipc.adapter.js @@ -25,7 +25,7 @@ var defaultPath = '/tmp/app.socket-'; */ function listen(server, callback) { fs.unlink(defaultPath + server.options.port, function _bindSocket() { - server.listener = net.createServer(server._handleRequest); + server.listener = net.createServer(server._handleRequest.bind(server)); server.listener.listen(defaultPath + server.options.port, callback); }); }; @@ -47,7 +47,7 @@ function send(socket, payload) { * @returns {Socket} The created ipc socket */ function createSocket(client) { - var socket = net.connect(defaultPath + this.options.port); + var socket = net.connect(defaultPath + client.options.port); socket.on('data', client._handleRequest.bind(client)); return socket; diff --git a/src/middleware/bundler.js b/src/middleware/bundler.js index 3fa2a71..22a4e42 100644 --- a/src/middleware/bundler.js +++ b/src/middleware/bundler.js @@ -1,13 +1,27 @@ -function process() { - if (this.bundles.length > this.peer.options.maxBundle) { - this._bundleTimer = setTimeout(this._tick.bind(this), this.peer.options.bundleDelay); +function process(client, channel, payload) { + var options = client.options.transform.bundler; + + if (!client.__bundler) { + client.__bundler = { + timers: {} + }; } - else { - this._bundleTimer = null; + + if (client.packets[channel].length > options.maxPackets) { + if (client.__bundler.timers[channel]) { + clearTimeout(client.__bundler.timers[channel]); + } + client._emit.call(client, channel); } - this.adapter.prototype.send.call(this, encoders[this.peer.options.encoder].encode(this.bundles.splice(0, this.peer.options.maxBundle))); - this.onComplete.dispatch(); + if (!client.__bundler.timers[channel]) { + client.__bundler.timers[channel] = setTimeout( + function _emitBundle() { + client._emit.call(client, channel); + }, + options.delay + ); + } } // Bundling logic // ----------------------------- diff --git a/src/middleware/index.js b/src/middleware/index.js index dbf6d98..0adbb79 100644 --- a/src/middleware/index.js +++ b/src/middleware/index.js @@ -4,8 +4,10 @@ var list = { bundler: bundler }; -function process(socket, payload) { - +function process(client, channel, payload) { + for (var t in client.options.transform) { + if (t in list) list[t].process(client, channel, payload); + } } function register(name, mod) { diff --git a/tests/test.js b/tests/test.js index e0f1434..0c0cf3a 100644 --- a/tests/test.js +++ b/tests/test.js @@ -3,12 +3,20 @@ var Kalm = require('../index'); var server = new Kalm.Server({ port: 3000, adapter: 'ipc', - encoder: 'msg-pack' + encoder: 'msg-pack', + channels: { + channel1: function(data) { + console.log('GOT "' + data + '" on channel1!'); + }, + '/': function(data) { + console.log('GOT "' + data + '" on main channel!'); + } + } }); server.on('connection', function(client) { - console.log('client'); - console.log(client); + // Do stuff + client.send('greetings', 'Hi there!'); }); server.on('ready', function() { @@ -16,11 +24,17 @@ server.on('ready', function() { port: 3000, adapter: 'ipc', encoder:'msg-pack', - hostname: '0.0.0.0' + hostname: '0.0.0.0', + channels: { + greetings: function(data) { + console.log('Server [greetings]: ' + data); + } + } }); - client.channel('rare_pepes').send('kalm_pepe'); - client.channel('rare_pepes').send('kalm_doge'); - - client.channel().send('klam klam'); + client.send('channel1', 'some data'); + client.send('channel1', 'some more data'); + setTimeout(function() { + client.send(null, 'later sent data'); + }, 1000); }); \ No newline at end of file