Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: optimize connection events #222

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 11 additions & 86 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ function Connection(transport, server, client) {

this._heartbeatCallbackInstance = null;

// Defined in constructor to be used as default callback in callMethod
// without binding it.
this._emitError = (error) => {
if (error) this.emit('error', error);
};

transport.on('packet', this._processPacket.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
Expand All @@ -77,9 +83,7 @@ Connection.prototype.callMethod = function(
) {
const packet = this.createPacket('call', interfaceName, methodName, args);
const packetId = packet.call[0];
this._callbacks[packetId] = callback || ((error) => {
if (error) this.emit('error', error);
});
this._callbacks[packetId] = callback || this._emitError;
this._send(packet);
};

Expand Down Expand Up @@ -115,17 +119,6 @@ Connection.prototype.emitRemoteEvent = function(
this._send(packet);
};

// Send a state synchronization packet over the connection
// path - path to a value in the application state
// verb - operation with this value (inc, dec, let, delete, push, pop, shift,
// unshift)
// value - a value to modify the current value with
//
Connection.prototype.notifyStateChange = function(path, verb, value) {
const packet = this.createPacket('state', path, verb, value);
this._send(packet);
};

// Send a handshake packet over the connection
// appName - name of an application to connect to
// login - user name (optional)
Expand Down Expand Up @@ -393,12 +386,6 @@ Connection.prototype._processHandshakeRequest = function(packet, keys) {
const credentials = authStrategy && packet[authStrategy];
authStrategy = authStrategy || 'anonymous';

this._emitPacketEvent('handshakeRequest', packet, packet.handshake[0], {
packetType: 'handshake',
handshakeRequest: true,
authStrategy
});

this.server.startSession(this, application, authStrategy, credentials,
this._onSessionCreated.bind(this));
};
Expand Down Expand Up @@ -442,10 +429,6 @@ Connection.prototype._processHandshakeResponse = function(packet) {
this.handshakeDone = true;
this.application = this.client.application;

this._emitPacketEvent('handshake', packet, packetId, {
sessionId: packet.ok
});

callback(null, packet.ok);
} else if (packet.error) {
delete this._callbacks[packetId];
Expand Down Expand Up @@ -475,11 +458,6 @@ Connection.prototype._processCallPacket = function(packet, keys) {
const methodName = keys[1];
const args = packet[methodName];

this._emitPacketEvent('call', packet, packetId, {
interfaceName,
methodName
});

const callback = this._remoteCallbackWrapper.bind(this, packetId);

if (!args) {
Expand All @@ -506,20 +484,12 @@ Connection.prototype._processCallbackPacket = function(packet) {
delete this._callbacks[packetId];

if (packet.ok) {
callback(null, ...packet.ok);
return callback(null, ...packet.ok);
} else if (packet.error) {
callback(errors.RemoteError.fromJstpArray(packet.error));
} else {
this._rejectPacket(packet);
return callback(errors.RemoteError.fromJstpArray(packet.error));
}
}

const eventArgs = callback ? null : { sourcePacketUnknown: true };
this._emitPacketEvent('callback', packet, packetId, eventArgs);

if (!callback) {
this.emit('callbackForUnknownPacket', packetId, packet, this);
}
this._rejectPacket(packet);
};

// Process incoming event packet
Expand All @@ -531,11 +501,7 @@ Connection.prototype._processEventPacket = function(packet, keys) {
const eventName = keys[1];
const eventArgs = packet[eventName];

this._emitPacketEvent('event', packet, packet.event[0], {
interfaceName,
remoteEventName: eventName,
remoteEventArgs: eventArgs
});
this.emit('event', interfaceName, eventName, eventArgs);

const remoteProxy = this.remoteProxies[interfaceName];
if (remoteProxy) {
Expand All @@ -550,8 +516,6 @@ Connection.prototype._processInspectPacket = function(packet) {
const packetId = packet.inspect[0];
const interfaceName = packet.inspect[1];

this._emitPacketEvent('inspect', packet, packetId, { interfaceName });

const methods = this.application.getMethods(interfaceName);
if (methods) {
this.callback(packetId, null, methods);
Expand All @@ -560,19 +524,6 @@ Connection.prototype._processInspectPacket = function(packet) {
}
};

// Process incoming state packet
// packet - parsed packet
//
Connection.prototype._processStatePacket = function(packet, keys) {
const path = packet.state[1];
const verb = keys[1];
const value = packet[verb];

this._emitPacketEvent('state', packet, packet.state[0], {
path, verb, value
});
};

// Process incoming ping packet
// packet - parsed packet
//
Expand Down Expand Up @@ -604,38 +555,12 @@ Connection.prototype._remoteCallbackWrapper = function(
this.callback(packetId, error, result);
};

// Emit an event notifying about incoming packet. The event payload is an
// object that contains information about the connection, application, packet,
// packet type, packet ID and any additional data that you pass to this
// function.
// kind - packet type and event name
// packet - parsed packet
// packetId - packet ID
// args - additional event arguments (optional)
//
Connection.prototype._emitPacketEvent = function(kind, packet, packetId, args) {
const eventArgs = {
connection: this,
packetType: kind,
packet,
packetId,
application: this.application
};

if (args) {
Object.assign(eventArgs, args);
}

this.emit(kind, eventArgs);
};

PACKET_HANDLERS = {
handshake: Connection.prototype._processHandshakePacket,
call: Connection.prototype._processCallPacket,
callback: Connection.prototype._processCallbackPacket,
event: Connection.prototype._processEventPacket,
inspect: Connection.prototype._processInspectPacket,
state: Connection.prototype._processStatePacket,
ping: Connection.prototype._processPingPacket,
pong: Connection.prototype._processPongPacket
};
46 changes: 24 additions & 22 deletions test/node/connection-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ test.test('server must process an event', (test) => {
client.connectAndHandshake(app.name, null, null, (error, connection) => {
test.assertNot(error, 'must connect to server');

server.getClients()[0].on('event', (event) => {
test.strictEqual(event.interfaceName, iface,
'event interface must match');
test.strictEqual(event.remoteEventName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(event.remoteEventArgs, args,
'event arguments must be equal to the passed ones');
server.getClients()[0].on('event',
(interfaceName, remoteName, remoteArgs) => {
test.strictEqual(interfaceName, iface,
'event interface must match');
test.strictEqual(remoteName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(remoteArgs, args,
'event arguments must be equal to the passed ones');

test.end();
});
test.end();
});

connection.emitRemoteEvent(iface, eventName, args);
});
Expand All @@ -61,12 +62,12 @@ test.test('client must process an event', (test) => {
client.connectAndHandshake(app.name, null, null, (error, connection) => {
test.assertNot(error, 'must connect to server');

connection.on('event', (event) => {
test.strictEqual(event.interfaceName, iface,
connection.on('event', (interfaceName, remoteName, remoteArgs) => {
test.strictEqual(interfaceName, iface,
'event interface must match');
test.strictEqual(event.remoteEventName, eventName,
test.strictEqual(remoteName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(event.remoteEventArgs, args,
test.strictDeepEqual(remoteArgs, args,
'event arguments must be equal to the passed ones');
test.end();
});
Expand All @@ -80,15 +81,16 @@ test.test('remote proxy must emit an event', (test) => {
(error, connection, api) => {
test.assertNot(error, 'must connect to server');

server.getClients()[0].on('event', (event) => {
test.strictEqual(event.interfaceName, iface,
'event interface must match');
test.strictEqual(event.remoteEventName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(event.remoteEventArgs, args,
'event arguments must be equal to the passed ones');
test.end();
});
server.getClients()[0].on('event',
(interfaceName, remoteName, remoteArgs) => {
test.strictEqual(interfaceName, iface,
'event interface must match');
test.strictEqual(remoteName, eventName,
'event name must be equal to the emitted one');
test.strictDeepEqual(remoteArgs, args,
'event arguments must be equal to the passed ones');
test.end();
});

api.iface.emit(eventName, ...args);
}
Expand Down