Skip to content

Commit

Permalink
lib: optimize connection events
Browse files Browse the repository at this point in the history
* Remove unnecessary events like 'handshake', 'call',
  'handshakeRequest'.
* Replace 'callbackForUnknownPacket' with _rejectPacket().
* Change signature of connection 'event' (remote event), now it contains
  interfaceName, eventName, eventArgs.
* Avoid creating extra closures to conditionally emit the 'error' event
  in callbacks instantiated inside callPacket().

Fixes: #49
PR-URL: #222
Reviewed-By: Alexey Orlenko <eaglexrlnk@gmail.com>
Reviewed-By: Mykola Bilochub <nbelochub@gmail.com>
Reviewed-By: Dmytro Nechai <nechaido@gmail.com>
  • Loading branch information
lundibundi authored and belochub committed Jan 22, 2018
1 parent f2502b8 commit e2a76b0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 108 deletions.
97 changes: 11 additions & 86 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ function Connection(transport, server, client) {

this._heartbeatCallbackInstance = null;

// Defined in constructor to be used as default callback in callMethod
// without binding it.
this._emitError = (error) => {
if (error) this.emit('error', error);
};

transport.on('packet', this._processPacket.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
Expand All @@ -77,9 +83,7 @@ Connection.prototype.callMethod = function(
) {
const packet = this.createPacket('call', interfaceName, methodName, args);
const packetId = packet.call[0];
this._callbacks[packetId] = callback || ((error) => {
if (error) this.emit('error', error);
});
this._callbacks[packetId] = callback || this._emitError;
this._send(packet);
};

Expand Down Expand Up @@ -115,17 +119,6 @@ Connection.prototype.emitRemoteEvent = function(
this._send(packet);
};

// Send a state synchronization packet over the connection
// path - path to a value in the application state
// verb - operation with this value (inc, dec, let, delete, push, pop, shift,
// unshift)
// value - a value to modify the current value with
//
Connection.prototype.notifyStateChange = function(path, verb, value) {
const packet = this.createPacket('state', path, verb, value);
this._send(packet);
};

// Send a handshake packet over the connection
// appName - name of an application to connect to
// login - user name (optional)
Expand Down Expand Up @@ -393,12 +386,6 @@ Connection.prototype._processHandshakeRequest = function(packet, keys) {
const credentials = authStrategy && packet[authStrategy];
authStrategy = authStrategy || 'anonymous';

this._emitPacketEvent('handshakeRequest', packet, packet.handshake[0], {
packetType: 'handshake',
handshakeRequest: true,
authStrategy
});

this.server.startSession(this, application, authStrategy, credentials,
this._onSessionCreated.bind(this));
};
Expand Down Expand Up @@ -442,10 +429,6 @@ Connection.prototype._processHandshakeResponse = function(packet) {
this.handshakeDone = true;
this.application = this.client.application;

this._emitPacketEvent('handshake', packet, packetId, {
sessionId: packet.ok
});

callback(null, packet.ok);
} else if (packet.error) {
delete this._callbacks[packetId];
Expand Down Expand Up @@ -475,11 +458,6 @@ Connection.prototype._processCallPacket = function(packet, keys) {
const methodName = keys[1];
const args = packet[methodName];

this._emitPacketEvent('call', packet, packetId, {
interfaceName,
methodName
});

const callback = this._remoteCallbackWrapper.bind(this, packetId);

if (!args) {
Expand All @@ -506,20 +484,12 @@ Connection.prototype._processCallbackPacket = function(packet) {
delete this._callbacks[packetId];

if (packet.ok) {
callback(null, ...packet.ok);
return callback(null, ...packet.ok);
} else if (packet.error) {
callback(errors.RemoteError.fromJstpArray(packet.error));
} else {
this._rejectPacket(packet);
return callback(errors.RemoteError.fromJstpArray(packet.error));
}
}

const eventArgs = callback ? null : { sourcePacketUnknown: true };
this._emitPacketEvent('callback', packet, packetId, eventArgs);

if (!callback) {
this.emit('callbackForUnknownPacket', packetId, packet, this);
}
this._rejectPacket(packet);
};

// Process incoming event packet
Expand All @@ -531,11 +501,7 @@ Connection.prototype._processEventPacket = function(packet, keys) {
const eventName = keys[1];
const eventArgs = packet[eventName];

this._emitPacketEvent('event', packet, packet.event[0], {
interfaceName,
remoteEventName: eventName,
remoteEventArgs: eventArgs
});
this.emit('event', interfaceName, eventName, eventArgs);

const remoteProxy = this.remoteProxies[interfaceName];
if (remoteProxy) {
Expand All @@ -550,8 +516,6 @@ Connection.prototype._processInspectPacket = function(packet) {
const packetId = packet.inspect[0];
const interfaceName = packet.inspect[1];

this._emitPacketEvent('inspect', packet, packetId, { interfaceName });

const methods = this.application.getMethods(interfaceName);
if (methods) {
this.callback(packetId, null, methods);
Expand All @@ -560,19 +524,6 @@ Connection.prototype._processInspectPacket = function(packet) {
}
};

// Process incoming state packet
// packet - parsed packet
//
Connection.prototype._processStatePacket = function(packet, keys) {
const path = packet.state[1];
const verb = keys[1];
const value = packet[verb];

this._emitPacketEvent('state', packet, packet.state[0], {
path, verb, value
});
};

// Process incoming ping packet
// packet - parsed packet
//
Expand Down Expand Up @@ -604,38 +555,12 @@ Connection.prototype._remoteCallbackWrapper = function(
this.callback(packetId, error, result);
};

// Emit an event notifying about incoming packet. The event payload is an
// object that contains information about the connection, application, packet,
// packet type, packet ID and any additional data that you pass to this
// function.
// kind - packet type and event name
// packet - parsed packet
// packetId - packet ID
// args - additional event arguments (optional)
//
Connection.prototype._emitPacketEvent = function(kind, packet, packetId, args) {
const eventArgs = {
connection: this,
packetType: kind,
packet,
packetId,
application: this.application
};

if (args) {
Object.assign(eventArgs, args);
}

this.emit(kind, eventArgs);
};

PACKET_HANDLERS = {
handshake: Connection.prototype._processHandshakePacket,
call: Connection.prototype._processCallPacket,
callback: Connection.prototype._processCallbackPacket,
event: Connection.prototype._processEventPacket,
inspect: Connection.prototype._processInspectPacket,
state: Connection.prototype._processStatePacket,
ping: Connection.prototype._processPingPacket,
pong: Connection.prototype._processPongPacket
};
46 changes: 24 additions & 22 deletions test/node/connection-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ test.test('server must process an event', (test) => {
client.connectAndHandshake(app.name, null, null, (error, connection) => {
test.assertNot(error, 'must connect to server');

server.getClients()[0].on('event', (event) => {
test.strictEqual(event.interfaceName, iface,
'event interface must match');
test.strictEqual(event.remoteEventName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(event.remoteEventArgs, args,
'event arguments must be equal to the passed ones');
server.getClients()[0].on('event',
(interfaceName, remoteName, remoteArgs) => {
test.strictEqual(interfaceName, iface,
'event interface must match');
test.strictEqual(remoteName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(remoteArgs, args,
'event arguments must be equal to the passed ones');

test.end();
});
test.end();
});

connection.emitRemoteEvent(iface, eventName, args);
});
Expand All @@ -61,12 +62,12 @@ test.test('client must process an event', (test) => {
client.connectAndHandshake(app.name, null, null, (error, connection) => {
test.assertNot(error, 'must connect to server');

connection.on('event', (event) => {
test.strictEqual(event.interfaceName, iface,
connection.on('event', (interfaceName, remoteName, remoteArgs) => {
test.strictEqual(interfaceName, iface,
'event interface must match');
test.strictEqual(event.remoteEventName, eventName,
test.strictEqual(remoteName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(event.remoteEventArgs, args,
test.strictDeepEqual(remoteArgs, args,
'event arguments must be equal to the passed ones');
test.end();
});
Expand All @@ -80,15 +81,16 @@ test.test('remote proxy must emit an event', (test) => {
(error, connection, api) => {
test.assertNot(error, 'must connect to server');

server.getClients()[0].on('event', (event) => {
test.strictEqual(event.interfaceName, iface,
'event interface must match');
test.strictEqual(event.remoteEventName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(event.remoteEventArgs, args,
'event arguments must be equal to the passed ones');
test.end();
});
server.getClients()[0].on('event',
(interfaceName, remoteName, remoteArgs) => {
test.strictEqual(interfaceName, iface,
'event interface must match');
test.strictEqual(remoteName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(remoteArgs, args,
'event arguments must be equal to the passed ones');
test.end();
});

api.iface.emit(eventName, ...args);
}
Expand Down

0 comments on commit e2a76b0

Please sign in to comment.