Skip to content

Commit

Permalink
lib: emit events about connection messages
Browse files Browse the repository at this point in the history
Fixes: #250
PR-URL: #252
Reviewed-By: Alexey Orlenko <eaglexrlnk@gmail.com>
Reviewed-By: Dmytro Nechai <nechaido@gmail.com>
Reviewed-By: Timur Shemsedinov <timur.shemsedinov@gmail.com>
Reviewed-By: Mykola Bilochub <nbelochub@gmail.com>
  • Loading branch information
lundibundi authored and belochub committed Jul 29, 2017
1 parent 5bf3944 commit 04ed72e
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 1 deletion.
38 changes: 37 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class Connection extends EventEmitter {
if (error) this.emit('error', error);
};

// Defined in constructor to be used as heartbeat message
// in debug mode events
this._heartbeatMessage = {};

transport.on('message', this._processMessage.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
Expand Down Expand Up @@ -197,6 +201,10 @@ class Connection extends EventEmitter {
_heartbeatCallback(interval) {
this.transport.send('{}');
this.setTimeout(interval, this._heartbeatCallbackInstance);

if (process.env.NODE_ENV !== 'production') {
this.emit('heartbeat', this._heartbeatMessage);
}
}

// Stop sending heartbeat messages
Expand Down Expand Up @@ -273,6 +281,10 @@ class Connection extends EventEmitter {
_send(message) {
const data = serde.stringify(message);
this.transport.send(data);

if (process.env.NODE_ENV !== 'production') {
this.emit('outgoingMessage', message);
}
}

// Close the connection, optionally sending a final message
Expand All @@ -284,6 +296,10 @@ class Connection extends EventEmitter {
if (message) {
const data = serde.stringify(message);
this.transport.end(data);

if (process.env.NODE_ENV !== 'production') {
this.emit('outgoingMessage', message);
}
} else {
this.transport.end();
}
Expand Down Expand Up @@ -312,7 +328,17 @@ class Connection extends EventEmitter {
//
_processMessage(message) {
const keys = Object.keys(message);
if (keys.length === 0) return; // heartbeat message
if (keys.length === 0) {
// heartbeat message
if (process.env.NODE_ENV !== 'production') {
this.emit('heartbeat', message);
}
return;
}

if (process.env.NODE_ENV !== 'production') {
this.emit('incomingMessage', message);
}

const kind = keys[0];
if (!this.handshakeDone && kind !== 'handshake') {
Expand Down Expand Up @@ -385,6 +411,8 @@ class Connection extends EventEmitter {
this, application, authStrategy, credentials,
this._onSessionCreated.bind(this)
);

this.emit('handshakeRequest', applicationName, authStrategy);
}

// Callback of authentication operation
Expand Down Expand Up @@ -416,6 +444,8 @@ class Connection extends EventEmitter {
const messageId = message.handshake[0];
const callback = this._callbacks[messageId];

this.emit('handshake', message.error, message.ok);

if (!callback) {
this._rejectMessage(message);
}
Expand Down Expand Up @@ -464,6 +494,8 @@ class Connection extends EventEmitter {
return;
}

this.emit('call', interfaceName, methodName, args);

try {
this.application.callMethod(
this, interfaceName, methodName, args, callback
Expand All @@ -481,6 +513,8 @@ class Connection extends EventEmitter {
const messageId = message.callback[0];
const callback = this._callbacks[messageId];

this.emit('callback', message.error, message.ok);

if (callback) {
delete this._callbacks[messageId];

Expand Down Expand Up @@ -519,6 +553,8 @@ class Connection extends EventEmitter {
const messageId = message.inspect[0];
const interfaceName = message.inspect[1];

this.emit('inspect', interfaceName);

const methods = this.application.getMethods(interfaceName);
if (methods) {
this.callback(messageId, null, methods);
Expand Down
210 changes: 210 additions & 0 deletions test/node/connection-emit-actions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
'use strict';

const test = require('tap');
const net = require('net');

const jstp = require('../..');

const app = require('../fixtures/application');

const application = new jstp.Application(app.name, app.interfaces);
const serverConfig = {
applications: [application],
authPolicy: app.authCallback,
};

let server;
let connection;

test.beforeEach((done) => {
server = jstp.net.createServer(serverConfig);
server.listen(0, () => {
const port = server.address().port;
jstp.net.connect(app.name, null, port, 'localhost', (error, conn) => {
test.assertNot(error, 'must connect to server and perform handshake');
connection = conn;
done();
});
});
});

test.afterEach((done) => {
if (connection) {
connection.close();
connection = null;
}
server.close();
done();
});

test.test('must emit server and client events upon anonymous handshake',
(test) => {
test.plan(7);

const client = {
application: new jstp.Application('jstp', {}),
};

server.once('connect', (serverConnection) => {
serverConnection.on('handshakeRequest',
(applicationName, authStrategy) => {
test.equal(applicationName, app.name,
'application name must match');
test.equal(authStrategy, 'anonymous',
'auth strategy must be anonymous by default');
}
);
});

const port = server.address().port;
const socket = net.connect(port);
socket.on('error', () => {
test.fail('must create socket and connect to server');
});
socket.on('connect', () => {
const transport = new jstp.net.Transport(socket);
const connection = new jstp.Connection(transport, null, client);

connection.on('handshake', (error, ok) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(ok, app.sessionId,
'session id must be equal to the one provided by authCallback');
});

connection.handshake(app.name, null, null, (error) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(connection.username, null, 'username must be null');
test.equal(connection.sessionId, app.sessionId,
'session id must be equal to the one provided by authCallback');
connection.close();
});
});
}
);

test.test('must emit server and client events login authentication strategy',
(test) => {
test.plan(7);

const client = {
application: new jstp.Application('jstp', {}),
};

server.once('connect', (serverConnection) => {
serverConnection.on('handshakeRequest',
(applicationName, authStrategy) => {
test.equal(applicationName, app.name,
'application name must match');
test.equal(authStrategy, 'login',
'authentication strategy must be \'login\'');
}
);
});

const port = server.address().port;
const socket = net.connect(port);
socket.on('error', () => {
test.fail('must create socket and connect to server');
});
socket.on('connect', () => {
const transport = new jstp.net.Transport(socket);
const connection = new jstp.Connection(transport, null, client);

connection.on('handshake', (error, ok) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(ok, app.sessionId,
'session id must be equal to the one provided by authCallback');
});

connection.handshake(app.name, app.login, app.password, (error) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(connection.username, app.login, 'username must match');
test.equal(connection.sessionId, app.sessionId,
'session id must be equal to the one provided by authCallback');
connection.close();
});
});
}
);

test.test('must emit event on call without arguments and with a return value',
(test) => {
test.plan(5);

const iface = 'calculator';
const methodName = 'answer';
const args = [];

server.getClients()[0].on('call',
(actualInterfaceName, actualMethodName, actualArgs) => {
test.equal(actualInterfaceName, iface,
'method interface must match');
test.equal(actualMethodName, methodName,
'method name must be equal to the called one');
test.strictSame(actualArgs, args,
'method arguments must be equal to the passed ones');
}
);

connection.on('callback', (error, ok) => {
test.assertNot(error, 'callMethod must not return an error');
test.strictSame(ok, [42], 'ok contents must match');
});

connection.callMethod(iface, methodName, args);
}
);

test.test('must emit event upon inspect message', (test) => {
const expectedInterfaces = Object.keys(app.interfaces);
const expectedTests = expectedInterfaces.length;

test.plan(expectedTests);
server.getClients()[0].on('inspect', (interfaceName) => {
test.assert(expectedInterfaces.includes(interfaceName),
'inspect event interface must be one of expected');
});

expectedInterfaces.forEach((iface) => {
connection.inspectInterface(iface);
});
});

test.test('must emit messages in development mode', (test) => {
test.plan(4);

const clientOutgoingMessage = { call: [1, 'calculator'], answer: [] };
const serverOutgoingMessage = { callback: [1], ok: [42] };

server.getClients()[0].on('outgoingMessage', (message) => {
test.strictSame(message, serverOutgoingMessage,
'server outgoing message must match');
});
server.getClients()[0].on('incomingMessage', (message) => {
test.strictSame(message, clientOutgoingMessage,
'server incoming message must match the one sent from client');
});
connection.on('outgoingMessage', (message) => {
test.strictSame(message, clientOutgoingMessage,
'client outgoing message must match');
});
connection.on('incomingMessage', (message) => {
test.strictSame(message, serverOutgoingMessage,
'client incoming message must match the one sent from server');
});

connection.callMethod('calculator', 'answer', []);
});

test.test('must emit heartbeat messages in development mode', (test) => {
test.plan(2);

server.getClients()[0].on('heartbeat', (message) => {
test.strictSame(message, {}, 'heartbeat message must match on server side');
});
connection.on('heartbeat', (message) => {
test.strictSame(message, {}, 'heartbeat message must match on client side');
});

connection.startHeartbeat(100);
});

0 comments on commit 04ed72e

Please sign in to comment.