Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: emit events about connection messages #252

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class Connection extends EventEmitter {
if (error) this.emit('error', error);
};

// Defined in constructor to be used as heartbeat message
// in debug mode events
this._heartbeatMessage = {};

transport.on('message', this._processMessage.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
Expand Down Expand Up @@ -197,6 +201,10 @@ class Connection extends EventEmitter {
_heartbeatCallback(interval) {
this.transport.send('{}');
this.setTimeout(interval, this._heartbeatCallbackInstance);

if (process.env.NODE_ENV !== 'production') {
this.emit('heartbeat', this._heartbeatMessage);
}
}

// Stop sending heartbeat messages
Expand Down Expand Up @@ -273,6 +281,10 @@ class Connection extends EventEmitter {
_send(message) {
const data = serde.stringify(message);
this.transport.send(data);

if (process.env.NODE_ENV !== 'production') {
this.emit('outgoingMessage', message);
}
}

// Close the connection, optionally sending a final message
Expand All @@ -284,6 +296,10 @@ class Connection extends EventEmitter {
if (message) {
const data = serde.stringify(message);
this.transport.end(data);

if (process.env.NODE_ENV !== 'production') {
this.emit('outgoingMessage', message);
}
} else {
this.transport.end();
}
Expand Down Expand Up @@ -312,7 +328,17 @@ class Connection extends EventEmitter {
//
_processMessage(message) {
const keys = Object.keys(message);
if (keys.length === 0) return; // heartbeat message
if (keys.length === 0) {
// heartbeat message
if (process.env.NODE_ENV !== 'production') {
this.emit('heartbeat', message);
}
return;
}

if (process.env.NODE_ENV !== 'production') {
this.emit('incomingMessage', message);
}

const kind = keys[0];
if (!this.handshakeDone && kind !== 'handshake') {
Expand Down Expand Up @@ -385,6 +411,8 @@ class Connection extends EventEmitter {
this, application, authStrategy, credentials,
this._onSessionCreated.bind(this)
);

this.emit('handshakeRequest', applicationName, authStrategy);
}

// Callback of authentication operation
Expand Down Expand Up @@ -416,6 +444,8 @@ class Connection extends EventEmitter {
const messageId = message.handshake[0];
const callback = this._callbacks[messageId];

this.emit('handshake', message.error, message.ok);

if (!callback) {
this._rejectMessage(message);
}
Expand Down Expand Up @@ -464,6 +494,8 @@ class Connection extends EventEmitter {
return;
}

this.emit('call', interfaceName, methodName, args);

try {
this.application.callMethod(
this, interfaceName, methodName, args, callback
Expand All @@ -481,6 +513,8 @@ class Connection extends EventEmitter {
const messageId = message.callback[0];
const callback = this._callbacks[messageId];

this.emit('callback', message.error, message.ok);

if (callback) {
delete this._callbacks[messageId];

Expand Down Expand Up @@ -519,6 +553,8 @@ class Connection extends EventEmitter {
const messageId = message.inspect[0];
const interfaceName = message.inspect[1];

this.emit('inspect', interfaceName);

const methods = this.application.getMethods(interfaceName);
if (methods) {
this.callback(messageId, null, methods);
Expand Down
210 changes: 210 additions & 0 deletions test/node/connection-emit-actions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
'use strict';

const test = require('tap');
const net = require('net');

const jstp = require('../..');

const app = require('../fixtures/application');

const application = new jstp.Application(app.name, app.interfaces);
const serverConfig = {
applications: [application],
authPolicy: app.authCallback,
};

let server;
let connection;

test.beforeEach((done) => {
server = jstp.net.createServer(serverConfig);
server.listen(0, () => {
const port = server.address().port;
jstp.net.connect(app.name, null, port, 'localhost', (error, conn) => {
test.assertNot(error, 'must connect to server and perform handshake');
connection = conn;
done();
});
});
});

test.afterEach((done) => {
if (connection) {
connection.close();
connection = null;
}
server.close();
done();
});

test.test('must emit server and client events upon anonymous handshake',
(test) => {
test.plan(7);

const client = {
application: new jstp.Application('jstp', {}),
};

server.once('connect', (serverConnection) => {
serverConnection.on('handshakeRequest',
(applicationName, authStrategy) => {
test.equal(applicationName, app.name,
'application name must match');
test.equal(authStrategy, 'anonymous',
'auth strategy must be anonymous by default');
}
);
});

const port = server.address().port;
const socket = net.connect(port);
socket.on('error', () => {
test.fail('must create socket and connect to server');
});
socket.on('connect', () => {
const transport = new jstp.net.Transport(socket);
const connection = new jstp.Connection(transport, null, client);

connection.on('handshake', (error, ok) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(ok, app.sessionId,
'session id must be equal to the one provided by authCallback');
});

connection.handshake(app.name, null, null, (error) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(connection.username, null, 'username must be null');
test.equal(connection.sessionId, app.sessionId,
'session id must be equal to the one provided by authCallback');
connection.close();
});
});
}
);

test.test('must emit server and client events login authentication strategy',
(test) => {
test.plan(7);

const client = {
application: new jstp.Application('jstp', {}),
};

server.once('connect', (serverConnection) => {
serverConnection.on('handshakeRequest',
(applicationName, authStrategy) => {
test.equal(applicationName, app.name,
'application name must match');
test.equal(authStrategy, 'login',
'authentication strategy must be \'login\'');
}
);
});

const port = server.address().port;
const socket = net.connect(port);
socket.on('error', () => {
test.fail('must create socket and connect to server');
});
socket.on('connect', () => {
const transport = new jstp.net.Transport(socket);
const connection = new jstp.Connection(transport, null, client);

connection.on('handshake', (error, ok) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(ok, app.sessionId,
'session id must be equal to the one provided by authCallback');
});

connection.handshake(app.name, app.login, app.password, (error) => {
test.assertNot(error, 'handshake must not return an error');
test.equal(connection.username, app.login, 'username must match');
test.equal(connection.sessionId, app.sessionId,
'session id must be equal to the one provided by authCallback');
connection.close();
});
});
}
);

test.test('must emit event on call without arguments and with a return value',
(test) => {
test.plan(5);

const iface = 'calculator';
const methodName = 'answer';
const args = [];

server.getClients()[0].on('call',
(actualInterfaceName, actualMethodName, actualArgs) => {
test.equal(actualInterfaceName, iface,
'method interface must match');
test.equal(actualMethodName, methodName,
'method name must be equal to the called one');
test.strictSame(actualArgs, args,
'method arguments must be equal to the passed ones');
}
);

connection.on('callback', (error, ok) => {
test.assertNot(error, 'callMethod must not return an error');
test.strictSame(ok, [42], 'ok contents must match');
});

connection.callMethod(iface, methodName, args);
}
);

test.test('must emit event upon inspect message', (test) => {
const expectedInterfaces = Object.keys(app.interfaces);
const expectedTests = expectedInterfaces.length;

test.plan(expectedTests);
server.getClients()[0].on('inspect', (interfaceName) => {
test.assert(expectedInterfaces.includes(interfaceName),
'inspect event interface must be one of expected');
});

expectedInterfaces.forEach((iface) => {
connection.inspectInterface(iface);
});
});

test.test('must emit messages in development mode', (test) => {
test.plan(4);

const clientOutgoingMessage = { call: [1, 'calculator'], answer: [] };
const serverOutgoingMessage = { callback: [1], ok: [42] };

server.getClients()[0].on('outgoingMessage', (message) => {
test.strictSame(message, serverOutgoingMessage,
'server outgoing message must match');
});
server.getClients()[0].on('incomingMessage', (message) => {
test.strictSame(message, clientOutgoingMessage,
'server incoming message must match the one sent from client');
});
connection.on('outgoingMessage', (message) => {
test.strictSame(message, clientOutgoingMessage,
'client outgoing message must match');
});
connection.on('incomingMessage', (message) => {
test.strictSame(message, serverOutgoingMessage,
'client incoming message must match the one sent from server');
});

connection.callMethod('calculator', 'answer', []);
});

test.test('must emit heartbeat messages in development mode', (test) => {
test.plan(2);

server.getClients()[0].on('heartbeat', (message) => {
test.strictSame(message, {}, 'heartbeat message must match on server side');
});
connection.on('heartbeat', (message) => {
test.strictSame(message, {}, 'heartbeat message must match on client side');
});

connection.startHeartbeat(100);
});