From 8bb03ed3503f8f23ccabedb831dfc8ed15c59887 Mon Sep 17 00:00:00 2001 From: Mykola Bilochub Date: Thu, 27 Jul 2017 22:23:56 +0300 Subject: [PATCH 1/4] lib: implement sessions --- jstp.js | 5 +- lib/applications.js | 11 +- lib/connection.js | 184 ++++++++++++++++++------- lib/net.js | 13 +- lib/server.js | 104 ++++++++++++-- lib/session.js | 101 ++++++++++++++ lib/simple-auth-policy.js | 35 ----- lib/simple-connect-policy.js | 26 +++- lib/simple-session-storage-provider.js | 82 +++++++++++ lib/tls.js | 11 +- lib/transport-common.js | 23 ++-- lib/ws.js | 5 +- lib/wss.js | 5 +- test/fixtures/application.js | 8 +- test/node/connection-emit-actions.js | 14 +- test/node/connection-handshake.js | 22 +-- test/node/session.js | 79 +++++++++++ test/utils/session/client.js | 57 ++++++++ 18 files changed, 636 insertions(+), 149 deletions(-) create mode 100644 lib/session.js delete mode 100644 lib/simple-auth-policy.js create mode 100644 lib/simple-session-storage-provider.js create mode 100644 test/node/session.js create mode 100644 test/utils/session/client.js diff --git a/jstp.js b/jstp.js index 9f4221c2..82036df6 100644 --- a/jstp.js +++ b/jstp.js @@ -12,11 +12,14 @@ Object.assign(jstp, jstp.RemoteProxy = require('./lib/remote-proxy'); jstp.Connection = require('./lib/connection'); jstp.Server = require('./lib/server'); +jstp.Session = require('./lib/session'); jstp.net = require('./lib/net'); jstp.tls = require('./lib/tls'); jstp.ws = require('./lib/ws'); jstp.wss = require('./lib/wss'); -jstp.SimpleAuthPolicy = require('./lib/simple-auth-policy'); jstp.SimpleConnectPolicy = require('./lib/simple-connect-policy'); +jstp.SimpleSessionStorageProvider = require( + './lib/simple-session-storage-provider' +); diff --git a/lib/applications.js b/lib/applications.js index 57711f59..8ebf74b9 100644 --- a/lib/applications.js +++ b/lib/applications.js @@ -20,8 +20,16 @@ const errors = require('./errors'); // version - application version of 'name' application (optional). // If a version is not provided either here or in `name`, // '1.0.0' will be used +// sessionStorageProvider - provider for session storage (optional). +// If provided, it will be used to store sessions +// independently of other applications +// class Application { - constructor(name, api, eventHandlers = {}, version) { + constructor(name, api, eventHandlers = {}, version, sessionStorageProvider) { + if (sessionStorageProvider === undefined && typeof version === 'object') { + sessionStorageProvider = version; + version = null; + } [this.name, this.version] = common.rsplit(name, '@'); const providedVersion = this.version || version; if (providedVersion && !semver.valid(providedVersion)) { @@ -30,6 +38,7 @@ class Application { this.version = providedVersion || '1.0.0'; this.api = api; this.eventHandlers = eventHandlers; + this.sessionsStorage = sessionStorageProvider; } // Call application method diff --git a/lib/connection.js b/lib/connection.js index 430d4976..207f502e 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -8,6 +8,7 @@ const common = require('./common'); const serde = require('./serde'); const errors = require('./errors'); const RemoteProxy = require('./remote-proxy'); +const Session = require('./session'); let nextConnectionId = 0; @@ -47,7 +48,7 @@ class Connection extends EventEmitter { this.handshakeDone = false; this.username = null; - this.sessionId = null; + this.session = null; this.application = null; this.remoteProxies = {}; @@ -82,7 +83,7 @@ class Connection extends EventEmitter { ); const messageId = message.call[0]; this._callbacks[messageId] = callback || this._emitError; - this._send(message); + this._sendWithBuffering(message, messageId); } // Send a callback message over the connection @@ -102,7 +103,7 @@ class Connection extends EventEmitter { message.callback[0] = messageId; - this._send(message); + this._send(this._prepareMessage(message), message); } // Send an event message over the connection @@ -114,14 +115,15 @@ class Connection extends EventEmitter { const message = this._createMessage( 'event', interfaceName, eventName, args ); - this._send(message); + this._sendWithBuffering(message, message.event[0]); } // Send a handshake message over the connection // app - string or object, application to connect to as 'name' or // 'name@version' or { name, version }, where version // must be a valid semver range - // login - user name (optional) + // login - user name or Session object (password should be omitted + // in this case) (optional) // password - user password (optional) // callback - callback function to invoke after the handshake is completed // @@ -145,32 +147,57 @@ class Connection extends EventEmitter { } let message; + let isNewSession = true; + let handshakeStrategy, handshakeCredentials; + + if (login) { + if (!password) { + this.session = login; + handshakeStrategy = 'session'; + handshakeCredentials = [this.session.id, this.session.receivedCount]; + isNewSession = false; + } else { + handshakeStrategy = 'login'; + handshakeCredentials = [login, password]; + } + } + if (version) { - message = login && password ? - this._createMessageWithArray( - 'handshake', [name, version], 'login', [login, password] - ) : - this._createMessageWithArray( - 'handshake', [name, version] - ); + message = this._createMessageWithArray( + 'handshake', [name, version], handshakeStrategy, handshakeCredentials + ); } else { - message = login && password ? - this._createMessage('handshake', name, 'login', [login, password]) : - this._createMessage('handshake', name); + message = this._createMessage( + 'handshake', name, handshakeStrategy, handshakeCredentials + ); } const messageId = message.handshake[0]; - this._callbacks[messageId] = (error, sessionId) => { - if (login && password && !error) { - this.username = login; - } - this.sessionId = sessionId; - if (callback) { - callback(error, sessionId); - } - }; + if (isNewSession) { + this._callbacks[messageId] = (error, sessionId) => { + if (!error) { + if (login && password) { + this.username = login; + } + this.session = new Session(this, this.username, sessionId); + } + if (callback) { + callback(error, this.session); + } + }; + } else { + this._callbacks[messageId] = (error, receivedCount) => { + if (!error) { + this.session._restore(this, receivedCount); + this.session._resendBufferedMessages(); + } + if (callback) { + callback(error); + } + }; + } - this._send(message); + this._send(this._prepareMessage(message), message); } // Send an inspect message over the connection @@ -200,7 +227,7 @@ class Connection extends EventEmitter { } }; - this._send(message); + this._sendWithBuffering(message, messageId); } // Send a ping message @@ -209,14 +236,14 @@ class Connection extends EventEmitter { const message = this._createMessage('ping'); const messageId = message.ping[0]; this._callbacks[messageId] = callback || common.doNothing; - this._send(message); + this._sendWithBuffering(message, messageId); } // Send a pong message // pong(messageId) { const message = { pong: [messageId] }; - this._send(message); + this._send(this._prepareMessage(message), message); } // Start sending heartbeat messages @@ -328,15 +355,35 @@ class Connection extends EventEmitter { this.emit('_timeout'); } + // Prepare a JSTP message to be sent over this connection + // message - a message to prepare + // + _prepareMessage(message) { + return serde.stringify(message); + } + + _restorePreparedMessage(preparedMessage) { + return serde.parse(preparedMessage); + } + + _sendWithBuffering(message, messageId) { + const preparedMessage = this._prepareMessage(message); + this._send(preparedMessage, message); + this.session._bufferMessage(messageId, preparedMessage); + } + // Send a JSTP message over this connection - // message - a message to send + // preparedMessage - a prepared message to send + // message - a message object (used for development events thus optional) // - _send(message) { - const data = serde.stringify(message); - this.transport.send(data); + _send(preparedMessage, message) { + this.transport.send(preparedMessage); if (process.env.NODE_ENV !== 'production') { - this.emit('outgoingMessage', message); + this.emit( + 'outgoingMessage', + message || this._restorePreparedMessage(preparedMessage) + ); } } @@ -467,34 +514,66 @@ class Connection extends EventEmitter { const credentials = authStrategy && message[authStrategy]; authStrategy = authStrategy || 'anonymous'; - this.server.startSession( - this, application, authStrategy, credentials, - this._onSessionCreated.bind(this) - ); - + if (authStrategy !== 'session') { + this.server.startSession( + this, application, authStrategy, credentials, + this._onSessionCreated.bind(this) + ); + } else { + this.server.restoreSession( + this, application, credentials, + this._onSessionRestored.bind(this) + ); + } this.server.emit('handshakeRequest', this, applicationName, authStrategy); } - // Callback of authentication operation + // Callback of session creation operation + // error - error that has occurred or null + // username - user login or null + // session - session object + // + _onSessionCreated(error, username, session) { + if (error) { + this._handshakeError(error); + return; + } + + this.username = username; + this.session = session; + this.handshakeDone = true; + + this.emit('client', session, this); + this.server.emit('connect', this); + + const message = this._createMessage('handshake', null, 'ok', session.id); + this._send(this._prepareMessage(message), message); + } + + // Callback of session restoration operation // error - error that has occured or null // username - user login or null - // sessionId - session id + // session - session object // - _onSessionCreated(error, username, sessionId) { + _onSessionRestored(error, username, session) { if (error) { - this._handshakeError(errors.ERR_AUTH_FAILED); + this._handshakeError(error); return; } this.username = username; + this.session = session; this.handshakeDone = true; - this.sessionId = sessionId; - this.emit('client', sessionId, this); + this.emit('client', session, this); this.server.emit('connect', this); - const message = this._createMessage('handshake', null, 'ok', sessionId); - this._send(message); + const message = this._createMessage( + 'handshake', null, 'ok', session.receivedCount + ); + this._send(this._prepareMessage(message), message); + this.session._resendBufferedMessages(); + this._nextMessageId = this.session.latestBufferedMessageId + 1; } // Process incoming handshake message which is a handshake response @@ -510,7 +589,7 @@ class Connection extends EventEmitter { this._rejectMessage(message); } - if (message.ok) { + if (message.ok !== undefined) { delete this._callbacks[messageId]; this.handshakeDone = true; @@ -549,6 +628,8 @@ class Connection extends EventEmitter { const callback = this._remoteCallbackWrapper.bind(this, messageId); + this.session._onMessageReceived(messageId); + if (!args) { callback(errors.ERR_INVALID_SIGNATURE); return; @@ -574,6 +655,7 @@ class Connection extends EventEmitter { const callback = this._callbacks[messageId]; this.emit('callback', message.error, message.ok); + this.session._onCallbackMessageReceived(messageId); if (callback) { delete this._callbacks[messageId]; @@ -599,6 +681,7 @@ class Connection extends EventEmitter { const eventArgs = message[eventName]; this.emit('event', interfaceName, eventName, eventArgs); + this.session._onMessageReceived(message.event[0]); const remoteProxy = this.remoteProxies[interfaceName]; if (remoteProxy) { @@ -618,6 +701,7 @@ class Connection extends EventEmitter { const interfaceName = message.inspect[1]; this.emit('inspect', interfaceName); + this.session._onMessageReceived(messageId); const methods = this.application.getMethods(interfaceName); if (methods) { @@ -631,7 +715,9 @@ class Connection extends EventEmitter { // message - parsed message // _processPingMessage(message) { - this.pong(message.ping[0]); + const messageId = message.ping[0]; + this.pong(messageId); + this.session._onMessageReceived(messageId); } // Process incoming pong message @@ -639,6 +725,8 @@ class Connection extends EventEmitter { // _processPongMessage(message) { const messageId = message.pong[0]; + this.session._onCallbackMessageReceived(messageId); + const callback = this._callbacks[messageId]; if (callback) { delete this._callbacks[messageId]; diff --git a/lib/net.js b/lib/net.js index cee15bfd..5416b75d 100644 --- a/lib/net.js +++ b/lib/net.js @@ -10,7 +10,8 @@ const common = require('./common'); class Server extends net.Server { // Constructs JSTP Server bound to node net.Server. // options - an object that contains applications array or - // index and can optionally contain authPolicy and + // index and can optionally contain authPolicy, + // sessionStorageProvider and // heartbeatInterval (see jstp.Server). // Also it will be passed directly to node net.Server. // listener - jstp server connection listener that will be registered on @@ -20,8 +21,14 @@ class Server extends net.Server { // constructor(options, listener) { super(options); - jstpServer.initServer.call(this, options.applications, - options.authPolicy, options.heartbeatInterval, listener); + jstpServer.initServer.call( + this, + options.applications, + options.authPolicy, + options.sessionStorageProvider, + options.heartbeatInterval, + listener + ); this.on('connection', (socket) => { this._onRawConnection(socket); }); diff --git a/lib/server.js b/lib/server.js index e7e2bcb6..41120660 100644 --- a/lib/server.js +++ b/lib/server.js @@ -4,7 +4,11 @@ const semver = require('semver'); const apps = require('./applications'); const Connection = require('./connection'); -const SimpleAuthPolicy = require('./simple-auth-policy'); +const errors = require('./errors'); +const Session = require('./session'); +const SimpleSessionStorageProvider = require( + './simple-session-storage-provider' +); const HANDSHAKE_TIMEOUT = 3000; @@ -32,34 +36,35 @@ const prepareApplications = function(applications) { // authPolicy - authentication policy is a function or an object with method // startSession (optional). // see jstp.SimpleAuthPolicy. +// sessionStorageProvider - provider for session storage (optional). +// If provided, it will be used to store sessions +// for applications that do not provide storage +// provider // heartbeatInterval - heartbeat interval, if heartbeat should be used // (optional). // listener - jstp connection listener that will be registered on // server 'connect' event (optional). // const initServer = function( - applications, authPolicy, heartbeatInterval, listener + applications, authPolicy, sessionStorageProvider, heartbeatInterval, listener ) { ({ cachedVersions: this._cachedVersions, applications: this.applications, } = prepareApplications(applications)); - if (typeof(authPolicy) === 'number') { - heartbeatInterval = authPolicy; - authPolicy = null; - } - this.heartbeatInterval = heartbeatInterval; - if (typeof authPolicy === 'function') { - this.startSession = authPolicy; - } else { - if (!authPolicy) authPolicy = new SimpleAuthPolicy(); - this.startSession = authPolicy.startSession.bind(authPolicy); + if (typeof authPolicy === 'object') { + authPolicy = authPolicy.authenticate.bind(authPolicy); } + this.authenticate = authPolicy; this.clients = new Map(); + this._clientsBySessionId = new Map(); + + this.sessionsStorage = sessionStorageProvider || + new SimpleSessionStorageProvider(); this.on('connect', this._onClientConnect.bind(this)); this.on('disconnect', this._onClientDisconnect.bind(this)); @@ -118,6 +123,78 @@ class Server { }); } + _createSession(username, connection, sessionStorageProvider, callback) { + const session = new Session(connection, username); + this._clientsBySessionId.set(session.id, connection); + connection.once('close', () => { + if (connection.id === session.connection.id) { + sessionStorageProvider.set(session.id, session); + if (sessionStorageProvider.setInactive) { + sessionStorageProvider.setInactive(session.id); + } + } + }); + callback(null, username, session); + } + + startSession(connection, application, strategy, credentials, callback) { + const sessionStorageProvider = application.sessionsStorage || + this.sessionsStorage; + + if (strategy === 'anonymous') { + this._createSession(null, connection, sessionStorageProvider, callback); + return; + } + + if (!this.authenticate) { + callback(errors.ERR_AUTH_FAILED); + return; + } + this.authenticate( + connection, application, strategy, credentials, + (error, username) => { + if (error || !username) { + callback(errors.ERR_AUTH_FAILED); + return; + } + this._createSession( + username, connection, sessionStorageProvider, callback + ); + } + ); + } + + restoreSession(connection, application, credentials, callback) { + const [sessionId, receivedCount] = credentials; + const existingConnection = this._clientsBySessionId.get(sessionId); + const sessionStorageProvider = application.sessionsStorage || + this.sessionsStorage; + let session; + if (existingConnection) { + session = existingConnection.session; + } else { + session = sessionStorageProvider.get(sessionId); + } + + if (!session) { + callback(errors.ERR_AUTH_FAILED); + return; + } + + this._clientsBySessionId.set(sessionId, connection); + session._restore(connection, receivedCount); + connection.once('close', () => { + if (connection.id === session.connection.id) { + sessionStorageProvider.set(session.id, session); + if (sessionStorageProvider.setInactive) { + sessionStorageProvider.setInactive(session.id); + } + } + }); + this.emit('restoreSession', session, connection, application); + callback(null, session.username, session); + } + // Handler of a new connection event emitter from the underlying server. // socket - a lower-level socket or connection // @@ -179,6 +256,9 @@ class Server { // _onClientDisconnect(connection) { this.clients.delete(connection.id); + if (connection.session) { + this._clientsBySessionId.set(connection.session.id, null); + } } } diff --git a/lib/session.js b/lib/session.js new file mode 100644 index 00000000..d36fe56d --- /dev/null +++ b/lib/session.js @@ -0,0 +1,101 @@ +'use strict'; + +const uuid4 = require('uuid/v4'); + +// JSTP Session class used to buffer and resend the messages on unexpected +// connection closes. +// Extends Map class and thus can be used to store the current session state +// independently of connection. +// +class Session extends Map { + // connection - Connection class object + // username - user's username obtained during authentication + // sessionId - used on client to represent session with corresponding id + // which exists on the server + // state - Map object that can be provided to initialize session state + // + constructor(connection, username, sessionId, state) { + super(state); + + this.id = sessionId || uuid4(); + this.connection = connection; + this.username = username; + + this.guaranteedDeliveredCount = 0; + this.buffer = new Map(); + + this.receivedCount = 0; + this.latestBufferedMessageId = 0; + + Object.preventExtensions(this); + } + + // Convert Session object to string. + // Must be used by implementers of external storage provider + // for Session objects when exporting the Session objects. + // + toString() { + const copy = Object.assign({}, this); + copy.storage = Array.from(this); + function replacer(key, value) { + switch (key) { + case 'connection': + return undefined; + case 'buffer': + return Array.from(value); + default: + return value; + } + } + return JSON.stringify(copy, replacer); + } + + // Restore Session object from string created by toString method. + // Must be used by implementers of external storage provider + // for Session objects when importing the Session objects. + // sessionString - session object stringified by toString method + // + static fromString(sessionString) { + const reviver = (key, value) => ( + key === 'buffer' || key === 'storage' ? new Map(value) : value + ); + const session = JSON.parse(sessionString, reviver); + const result = new Session(null, null, null, session.storage); + delete session.storage; + return Object.assign(result, session); + } + + _bufferMessage(id, message) { + this.buffer.set(Math.abs(id), message); + this.latestBufferedMessageId = id; + } + + _onMessageReceived(messageId) { + this.receivedCount = Math.abs(messageId); + } + + _onCallbackMessageReceived(messageId) { + messageId = Math.abs(messageId); + for (let i = this.guaranteedDeliveredCount + 1; i <= messageId; i++) { + this.buffer.delete(i); + } + this.guaranteedDeliveredCount = messageId; + } + + _restore(newConnection, receivedCount) { + this.connection.close(); + this.connection = newConnection; + for (let i = this.guaranteedDeliveredCount + 1; i <= receivedCount; i++) { + this.buffer.delete(i); + } + this.guaranteedDeliveredCount = receivedCount; + } + + _resendBufferedMessages() { + this.buffer.forEach((message) => { + this.connection._send(message); + }); + } +} + +module.exports = Session; diff --git a/lib/simple-auth-policy.js b/lib/simple-auth-policy.js deleted file mode 100644 index 893ccdbc..00000000 --- a/lib/simple-auth-policy.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict'; - -const EventEmitter = require('events').EventEmitter; -const uuid4 = require('uuid/v4'); - -const errors = require('./errors'); - -// Simple generic authentication provider. You are free to implement -// whatever suits your needs instead. -// -module.exports = class SimpleAuthPolicy extends EventEmitter { - constructor() { - super(); - } - - // Start session. Only anonymous handshakes are allowed. - // connection - JSTP connection - // application - application instance - // strategy - authentication strategy (only 'anonymous' is supported) - // credentials - authentication credentials - // callback - callback function that has signature - // (error, username, sessionId) - // - startSession(connection, application, strategy, credentials, callback) { - if (strategy !== 'anonymous') { - callback(errors.ERR_AUTH_FAILED); - return; - } - - const sessionId = uuid4(); - this.emit('session', sessionId, connection, application); - - callback(null, null, sessionId); - } -}; diff --git a/lib/simple-connect-policy.js b/lib/simple-connect-policy.js index aacdda4f..d02bd258 100644 --- a/lib/simple-connect-policy.js +++ b/lib/simple-connect-policy.js @@ -6,6 +6,9 @@ // anonymous handshake. // module.exports = class SimpleConnectPolicy { + // login - user name (optional) + // password - user password (optional, but must be provided if login + // is provided) constructor(login, password) { this.login = login; this.password = password; @@ -18,14 +21,25 @@ module.exports = class SimpleConnectPolicy { // 'name@version' or { name, version }, where version // must be a valid semver range // connection - JSTP connection + // session - Session object to reconnect to existing session (optional) // callback - callback function that has signature // (error, connection) // - connect(app, connection, callback) { - connection.handshake( - app, this.login, this.password, - (error) => { - callback(error, connection); - }); + connect(app, connection, session, callback) { + if (!session) { + connection.handshake( + app, this.login, this.password, + (error, session) => { + callback(error, connection, session); + } + ); + } else { + connection.handshake( + app, session, null, + (error) => { + callback(error, connection); + } + ); + } } }; diff --git a/lib/simple-session-storage-provider.js b/lib/simple-session-storage-provider.js new file mode 100644 index 00000000..66ec6bd7 --- /dev/null +++ b/lib/simple-session-storage-provider.js @@ -0,0 +1,82 @@ +'use strict'; + +// Simple default storage provider for Session objects. +// Used on server by default, can be also used by Application +// objects if you need to store clients' sessions +// separately for this application. +// You are free to substitute it with other class with +// the corresponding interface. +class SimpleSessionStorageProvider extends Map { + // Constructor arguments can be used to change the default + // purging options. + // inactiveSessionLifetime - determines the minimal lifetime of + // the session which was marked as inactive + // purgeInterval - determines the interval at which session + // purging occurs + // + constructor( + inactiveSessionLifetime = 24 * 60 * 60 * 1000, + purgeInterval = 60 * 60 * 1000 + ) { + super(); + this._purgeInterval = purgeInterval; + this._inactiveSessionLifetime = inactiveSessionLifetime; + setInterval(() => { + this._purgeSessions(); + }, purgeInterval).unref(); + } + + // Must return the Session object with the corresponding session id + // or undefined if the session cannot be found. + // This method is called at most once for each connection + // on handshake, in case of session restoring when there is no connection + // associated with the session in memory. + // sessionId - id of the requested session + // + get(sessionId) { + const storedSession = super.get(sessionId); + if (!storedSession) return storedSession; + storedSession.lastActive = null; + return storedSession.value; + } + + // Must save the Session object with the corresponding session id. + // This method is called once on session creation and every time + // when connection associated with session is being closed. + // sessionId - id of the session to be added + // session - Session object to be added + // + set(sessionId, session) { + const storedSession = { + value: session, + lastActive: null, + }; + super.set(sessionId, storedSession); + return this; + } + + // Optional method, can be omitted if this functionality is not + // required. + // If provided, must mark the session as inactive. + // Called whenever the connection associated with the session + // is being closed. + // sessionId - id of the session to mark inactive + // + setInactive(sessionId) { + const session = super.get(sessionId); + session.lastActive = Date.now(); + } + + _purgeSessions() { + const purgeStartTime = Date.now(); + this.forEach((storedSession, sessionId) => { + if (!storedSession.lastActive) return; + const sessionInactivityTime = purgeStartTime - storedSession.lastActive; + if (sessionInactivityTime >= this._inactiveSessionLifetime) { + super.delete(sessionId); + } + }); + } +} + +module.exports = SimpleSessionStorageProvider; diff --git a/lib/tls.js b/lib/tls.js index 6d8e1a0d..9a8e38f1 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -10,7 +10,8 @@ const common = require('./common'); class Server extends tls.Server { // Constructs JSTP Server bound to node tls.Server. // options - an object that contains applications array or - // index and can optionally contain authPolicy and + // index and can optionally contain authPolicy, + // sessionStorageProvider and // heartbeatInterval (see jstp.Server). // Also it will be passed directly to node tls.Server. // listener - jstp server connection listener that will be registered on @@ -21,8 +22,12 @@ class Server extends tls.Server { constructor(options, listener) { super(options); jstpServer.initServer.call( - this, options.applications, - options.authPolicy, options.heartbeatInterval, listener + this, + options.applications, + options.authPolicy, + options.sessionStorageProvider, + options.heartbeatInterval, + listener ); this.on('secureConnection', (socket) => { this._onRawConnection(socket); diff --git a/lib/transport-common.js b/lib/transport-common.js index 73620726..8171be7c 100644 --- a/lib/transport-common.js +++ b/lib/transport-common.js @@ -22,6 +22,8 @@ const common = require('./common'); // object ('connect' function will be extracted as bound), // optional (see connect in jstp.SimpleConnectPolicy) // * heartbeatInterval - number, optional +// * session - jstp.Session object, if provided, will try +// to reconnect to the existing session (optional) // options - will be destructured and passed directly to connFactory. // The last argument of options is optional callback // that will be called when connection is established @@ -53,16 +55,19 @@ const newConnectFn = ( client.connectPolicy.connect.bind(client.connectPolicy); } const connection = new Connection(transport, null, client); - client.connectPolicy(app, connection, (error, connection) => { - if (error) { - callback(error, connection); - return; + client.connectPolicy( + app, connection, client.session, + (error, connection, session) => { + if (error) { + callback(error, connection); + return; + } + if (client.heartbeatInterval) { + connection.startHeartbeat(client.heartbeatInterval); + } + callback(null, connection, session); } - if (client.heartbeatInterval) { - connection.startHeartbeat(client.heartbeatInterval); - } - callback(null, connection); - }); + ); }); }; diff --git a/lib/ws.js b/lib/ws.js index 14c2b1d6..e77fbde9 100644 --- a/lib/ws.js +++ b/lib/ws.js @@ -13,7 +13,8 @@ class Server extends http.Server { // Constructs JSTP Server bound to WebSocket Server based on // node http.Server. // options - an object that contains applications array or - // index and can optionally contain authPolicy and + // index and can optionally contain authPolicy, + // sessionStorageProvider and // heartbeatInterval (see jstp.Server). // // webSocketOptions - can contain originCheckStrategy or default @@ -31,7 +32,7 @@ class Server extends http.Server { super(); jstpServer.initServer.call( this, options.applications, options.authPolicy, - options.heartbeatInterval, listener + options.sessionStorageProvider, options.heartbeatInterval, listener ); webSocket.initServer(webSocketOptions, this); } diff --git a/lib/wss.js b/lib/wss.js index 57913770..998779c5 100644 --- a/lib/wss.js +++ b/lib/wss.js @@ -13,7 +13,8 @@ class Server extends https.Server { // Constructs JSTP Server bound to WebSocket Server based on // node https.Server. // options - an object that contains applications array or - // index and can optionally contain authPolicy and + // index and can optionally contain authPolicy, + // sessionStorageProvider and // heartbeatInterval (see jstp.Server). // Also it will be passed directly to node https.Server. // @@ -32,7 +33,7 @@ class Server extends https.Server { super(options); jstpServer.initServer.call( this, options.applications, options.authPolicy, - options.heartbeatInterval, listener + options.sessionStorageProvider, options.heartbeatInterval, listener ); webSocket.initServer(webSocketOptions, this); } diff --git a/test/fixtures/application.js b/test/fixtures/application.js index 971757b6..5ffe6788 100644 --- a/test/fixtures/application.js +++ b/test/fixtures/application.js @@ -5,7 +5,6 @@ const jstp = require('../..'); const name = 'testApp'; const login = 'login'; const password = 'password'; -const sessionId = '12892e85-5bd7-4c77-a0c5-a0aecfcbc93a'; const expectedErrorMessage = 'Zero division'; @@ -53,10 +52,6 @@ const authCallback = ( let username = null; let success = false; - if (strategy === 'anonymous') { - success = true; - } - if (strategy === 'login' && credentials[0] === login && credentials[1] === password) { @@ -67,7 +62,7 @@ const authCallback = ( if (!success) { callback(new jstp.RemoteError(jstp.ERR_AUTH_FAILED)); } - callback(null, username, sessionId); + callback(null, username); }; module.exports = { @@ -75,7 +70,6 @@ module.exports = { interfaces, login, password, - sessionId, authCallback, expectedErrorMessage, }; diff --git a/test/node/connection-emit-actions.js b/test/node/connection-emit-actions.js index b7a7a234..816eb319 100644 --- a/test/node/connection-emit-actions.js +++ b/test/node/connection-emit-actions.js @@ -39,7 +39,7 @@ test.afterEach((done) => { test.test('must emit server and client events upon anonymous handshake', (test) => { - test.plan(8); + test.plan(7); const client = { application: new jstp.Application('jstp', {}), @@ -67,15 +67,12 @@ test.test('must emit server and client events upon anonymous handshake', connection.on('handshake', (error, ok) => { test.assertNot(error, 'handshake must not return an error'); - test.equal(ok, app.sessionId, - 'session id must be equal to the one provided by authCallback'); + test.assert(ok, 'handshake must return ok'); }); connection.handshake(app.name, null, null, (error) => { test.assertNot(error, 'handshake must not return an error'); test.equal(connection.username, null, 'username must be null'); - test.equal(connection.sessionId, app.sessionId, - 'session id must be equal to the one provided by authCallback'); connection.close(); }); }); @@ -84,7 +81,7 @@ test.test('must emit server and client events upon anonymous handshake', test.test('must emit server and client events login authentication strategy', (test) => { - test.plan(8); + test.plan(7); const client = { application: new jstp.Application('jstp', {}), @@ -112,15 +109,12 @@ test.test('must emit server and client events login authentication strategy', connection.on('handshake', (error, ok) => { test.assertNot(error, 'handshake must not return an error'); - test.equal(ok, app.sessionId, - 'session id must be equal to the one provided by authCallback'); + test.assert(ok, 'handshake must return ok'); }); connection.handshake(app.name, app.login, app.password, (error) => { test.assertNot(error, 'handshake must not return an error'); test.equal(connection.username, app.login, 'username must match'); - test.equal(connection.sessionId, app.sessionId, - 'session id must be equal to the one provided by authCallback'); connection.close(); }); }); diff --git a/test/node/connection-handshake.js b/test/node/connection-handshake.js index f116e14b..6d6f384a 100644 --- a/test/node/connection-handshake.js +++ b/test/node/connection-handshake.js @@ -30,7 +30,7 @@ test.afterEach((done) => { done(); }); -test.test('must perform an anonymous handshake', (test) => { +test.test('must perform an anonymous handshake manually', (test) => { const client = { application: new jstp.Application('jstp', {}), }; @@ -41,11 +41,11 @@ test.test('must perform an anonymous handshake', (test) => { socket.on('connect', () => { const transport = new jstp.net.Transport(socket); connection = new jstp.Connection(transport, null, client); - connection.handshake(app.name, null, null, (error) => { + connection.handshake(app.name, null, null, (error, session) => { test.assertNot(error, 'handshake must not return an error'); test.equal(connection.username, null, 'username must be null'); - test.equal(connection.sessionId, app.sessionId, - 'session id must be equal to the one provided by authCallback'); + test.assert(session instanceof jstp.Session, + 'session must be an instance of jstp.Session'); test.end(); }); }); @@ -53,12 +53,12 @@ test.test('must perform an anonymous handshake', (test) => { test.test('must perform an anonymous handshake', (test) => { const port = server.address().port; - jstp.net.connect(app.name, null, port, (error, conn) => { + jstp.net.connect(app.name, null, port, (error, conn, session) => { connection = conn; test.assertNot(error, 'handshake must not return an error'); test.equal(connection.username, null, 'username must be null'); - test.equal(connection.sessionId, app.sessionId, - 'session id must be equal to the one provided by authCallback'); + test.assert(session instanceof jstp.Session, + 'session must be an instance of jstp.Session'); test.end(); }); }); @@ -69,13 +69,15 @@ test.test('must perform a handshake with credentials', (test) => { connectPolicy: new jstp.SimpleConnectPolicy(app.login, app.password), }; const port = server.address().port; - jstp.net.connect(app.name, client, port, (error, conn) => { + jstp.net.connect(app.name, client, port, (error, conn, session) => { connection = conn; test.assertNot(error, 'handshake must not return an error'); test.equal(connection.username, app.login, 'username must be same as the one passed with handshake'); - test.equal(connection.sessionId, app.sessionId, - 'session id must be equal to the one provided by authCallback'); + test.assert(session instanceof jstp.Session, + 'session must be an instance of jstp.Session'); + test.equal(session.username, app.login, + 'session username must be same as the one passed with handshake'); test.end(); }); }); diff --git a/test/node/session.js b/test/node/session.js new file mode 100644 index 00000000..0b9caae6 --- /dev/null +++ b/test/node/session.js @@ -0,0 +1,79 @@ +'use strict'; + +const test = require('tap'); + +const jstp = require('../..'); + +const app = require('../fixtures/application'); +const application = new jstp.Application(app.name, app.interfaces); + +const serverConfig = { applications: [application] }; + +let server; +let connection; +let port; + +test.afterEach((done) => { + if (connection) { + connection.close(); + connection = null; + } + server.close(); + done(); +}); + +test.test('must reconnect to existing session', (test) => { + server = jstp.net.createServer(serverConfig); + server.listen(0, () => { + port = server.address().port; + jstp.net.connect(app.name, null, port, (error, conn, session) => { + test.assertNot(error, 'handshake must not return an error'); + test.equal(conn.username, null, 'username must be null'); + test.assert(session instanceof jstp.Session, + 'session must be an instance of jstp.Session'); + const client = { session }; + jstp.net.connect(app.name, client, port, (error, conn, session) => { + connection = conn; + test.assertNot(error, + 'must successfully reconnect to existing session'); + test.assertNot(session, 'must not return Session object'); + test.end(); + }); + }); + }); +}); + +test.test('must not resend messages received by other side', (test) => { + test.plan(7); + server = jstp.net.createServer({ + applications: [ + new jstp.Application('testApp', { + calculator: { + doNothing(connection, callback) { + test.pass('method must only be called once'); + callback(null); + }, + }, + }), + ], + }); + server.listen(0, () => { + port = server.address().port; + jstp.net.connect(app.name, null, port, (error, conn, session) => { + test.assertNot(error, 'handshake must not return an error'); + test.equal(conn.username, null, 'username must be null'); + test.assert(session instanceof jstp.Session, + 'session must be an instance of jstp.Session'); + const client = { session }; + conn.callMethod('calculator', 'doNothing', [], (error) => { + test.assertNot(error, 'call must not return an error'); + }); + jstp.net.connect(app.name, client, port, (error, conn, session) => { + connection = conn; + test.assertNot(error, + 'must successfully reconnect to existing session'); + test.assertNot(session, 'must not return Session object'); + }); + }); + }); +}); diff --git a/test/utils/session/client.js b/test/utils/session/client.js new file mode 100644 index 00000000..1011757f --- /dev/null +++ b/test/utils/session/client.js @@ -0,0 +1,57 @@ +'use strict'; + +const jstp = require('../../..'); + +const appName = 'testApp'; +const iface = 'iface'; +const method = 'method'; + +let connection = null; +const client = { session: null }; + +const connect = (port) => { + jstp.net.connect(appName, null, port, (error, conn, session) => { + if (error) { + process.send(['error', error]); + } + connection = conn; + client.session = session; + connection.callMethod(iface, method, [], (error) => { + if (error) { + process.send(['error', error]); + } + }); + }); +}; + +const sendSession = () => { + process.send(['session', client.session.toString()]); +}; + +const reconnect = (port, serializedSession) => { + client.session = jstp.Session.fromString(serializedSession); + jstp.net.connect(appName, client, port, (error, conn, session) => { + if (error) { + process.send(['error', error]); + } + connection = conn; + client.session = session; + connection.on('event', (iface, event) => { + process.send(['event', iface, event]); + }); + }); +}; + +process.on('message', ([message, ...args]) => { + switch (message) { + case 'connect': + connect(...args); + break; + case 'sendSession': + sendSession(); + break; + case 'reconnect': + reconnect(...args); + break; + } +}); From 6193013e6296240da5ff30043526d364e6fe4fe2 Mon Sep 17 00:00:00 2001 From: Dmytro Nechai Date: Wed, 23 Aug 2017 20:27:05 +0300 Subject: [PATCH 2/4] test: add test for resending on connection drop --- ...session-resend-event-on-connection-drop.js | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 test/node/session-resend-event-on-connection-drop.js diff --git a/test/node/session-resend-event-on-connection-drop.js b/test/node/session-resend-event-on-connection-drop.js new file mode 100644 index 00000000..0e708e6e --- /dev/null +++ b/test/node/session-resend-event-on-connection-drop.js @@ -0,0 +1,70 @@ +'use strict'; + +const test = require('tap'); + +const cp = require('child_process'); +const path = require('path'); + +const jstp = require('../..'); + +let client = createClient(); + +const iface = 'iface'; +const event = 'testEvent'; +const ifaces = { + iface: { + method: (connection, callback) => { + client.send(['sendSession']); + callback(null); + }, + }, +}; + +const application = new jstp.Application('testApp', ifaces); +const serverConfig = { applications: [application] }; +const server = jstp.net.createServer(serverConfig); + +client.on('message', ([message, ...args]) => { + switch (message) { + case 'error': + console.error(args[0].message); + test.fail('must not encounter an error'); + break; + case 'session': + session(...args); + break; + } +}); + +server.listen(0, () => { + client.send(['connect', server.address().port]); +}); + +function createClient() { + return cp.fork(path.join(__dirname, '../utils/session/client')); +} + +function session(serializedSession) { + client.kill('SIGKILL'); + server.getClientsArray()[0].emitRemoteEvent(iface, event, []); + client = createClient(); + client.on('message', ([message, ...args]) => { + switch (message) { + case 'error': + console.error(args[0].message); + test.fail('must not encounter an error'); + break; + case 'event': + handleEvent(...args); + break; + } + }); + client.send(['reconnect', server.address().port, serializedSession]); +} + +function handleEvent(ifaceName, eventName) { + test.equals(ifaceName, iface, 'interface name must match'); + test.equals(eventName, event, 'event name must match'); + client.kill(); + server.close(); +} From 2ab7872d5e9a7461984ae9787e71dc34a54c6454 Mon Sep 17 00:00:00 2001 From: Dmytro Nechai Date: Wed, 23 Aug 2017 20:28:23 +0300 Subject: [PATCH 3/4] lib: fix connection restore --- lib/session.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/session.js b/lib/session.js index d36fe56d..c10a3ab4 100644 --- a/lib/session.js +++ b/lib/session.js @@ -83,7 +83,9 @@ class Session extends Map { } _restore(newConnection, receivedCount) { - this.connection.close(); + if (this.connection) { + this.connection.close(); + } this.connection = newConnection; for (let i = this.guaranteedDeliveredCount + 1; i <= receivedCount; i++) { this.buffer.delete(i); From 141bede4bc367fbf4f5a23ea5e68d82a87079e7b Mon Sep 17 00:00:00 2001 From: Dmytro Nechai Date: Wed, 22 Nov 2017 16:57:24 +0200 Subject: [PATCH 4/4] connection: use ping-pong instead of heartbeat * Send Ping-Pong instead of Heartbeat messages. * Use setInterval() instead of timers.enroll(). * Rewrite tests. Fixes: https://github.com/metarhia/jstp/issues/217 PR-URL: https://github.com/metarhia/jstp/pull/303 Reviewed-By: Alexey Orlenko Reviewed-By: Mykola Bilochub --- lib/connection.js | 64 +++++++++------------------- lib/server.js | 4 +- test/node/connection-emit-actions.js | 35 ++++++++++++--- test/node/regress-gh-283.js | 6 ++- 4 files changed, 54 insertions(+), 55 deletions(-) diff --git a/lib/connection.js b/lib/connection.js index 207f502e..5a3da7c7 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -2,7 +2,6 @@ const { EventEmitter } = require('events'); const semver = require('semver'); -const timers = require('timers'); const common = require('./common'); const serde = require('./serde'); @@ -53,7 +52,8 @@ class Connection extends EventEmitter { this.application = null; this.remoteProxies = {}; - this._heartbeatCallbackInstance = null; + this._heartbeatTimer = null; + this._closed = false; // Defined in constructor to be used as default callback in callMethod // without binding it. @@ -61,10 +61,6 @@ class Connection extends EventEmitter { if (error) this.emit('error', error); }; - // Defined in constructor to be used as heartbeat message - // in debug mode events - this._heartbeatMessage = {}; - transport.on('message', this._processMessage.bind(this)); transport.on('close', this._onSocketClose.bind(this)); transport.on('error', this._onSocketError.bind(this)); @@ -241,34 +237,33 @@ class Connection extends EventEmitter { // Send a pong message // - pong(messageId) { + _pong(messageId) { + if (this._closed) { + return; + } const message = { pong: [messageId] }; this._send(this._prepareMessage(message), message); } - // Start sending heartbeat messages + // Start sending ping messages // interval - heartbeat interval in milliseconds // startHeartbeat(interval) { - const callback = () => { - this.transport.send('{}'); - this.setTimeout(interval, this._heartbeatCallbackInstance); - - if (process.env.NODE_ENV !== 'production') { - this.emit('heartbeat', this._heartbeatMessage); + const heartbeat = () => { + if (!this._closed) { + this.ping(); } }; - this._heartbeatCallbackInstance = callback; - callback(); + this._heartbeatTimer = setInterval(heartbeat, interval); } - // Stop sending heartbeat messages + // Stop sending ping messages // stopHeartbeat() { - if (this._heartbeatCallbackInstance) { - this.clearTimeout(this._heartbeatCallbackInstance); - this._heartbeatCallbackInstance = null; + if (this._heartbeatTimer) { + clearTimeout(this._heartbeatTimer); + this._heartbeatTimer = null; } } @@ -321,40 +316,17 @@ class Connection extends EventEmitter { // Close the connection // close() { + this._closed = true; this.stopHeartbeat(); this.transport.end(); } - // Set a timeout using timers.enroll() - // milliseconds - amount of milliseconds - // callback - callback function - // - setTimeout(milliseconds, callback) { - timers.enroll(this, milliseconds); - timers._unrefActive(this); - this.once('_timeout', callback); - } - - // Clear a timeout set with Connection#setTimeout - // handler - timer callback to remove - // - clearTimeout(handler) { - timers.unenroll(this); - this.removeListener('_timeout', handler); - } - // Returns underlying transport // getTransport() { return this.transport.getRawTransport(); } - // timers.enroll() timeout handler - // - _onTimeout() { - this.emit('_timeout'); - } - // Prepare a JSTP message to be sent over this connection // message - a message to prepare // @@ -391,6 +363,7 @@ class Connection extends EventEmitter { // message - a message to send (optional) // _end(message) { + this._closed = true; this.stopHeartbeat(); if (message) { @@ -408,6 +381,7 @@ class Connection extends EventEmitter { // Closed socket event handler // _onSocketClose() { + this._closed = true; this.stopHeartbeat(); this.emit('close', this); if (this.server) { @@ -716,7 +690,7 @@ class Connection extends EventEmitter { // _processPingMessage(message) { const messageId = message.ping[0]; - this.pong(messageId); + this._pong(messageId); this.session._onMessageReceived(messageId); } diff --git a/lib/server.js b/lib/server.js index 41120660..9160780e 100644 --- a/lib/server.js +++ b/lib/server.js @@ -212,10 +212,10 @@ class Server { } }; - connection.setTimeout(HANDSHAKE_TIMEOUT, handleTimeout); + const handshakeTimeout = setTimeout(handleTimeout, HANDSHAKE_TIMEOUT); connection.on('client', () => { - connection.clearTimeout(handleTimeout); + clearTimeout(handshakeTimeout); if (this.heartbeatInterval) { connection.startHeartbeat(this.heartbeatInterval); } diff --git a/test/node/connection-emit-actions.js b/test/node/connection-emit-actions.js index 816eb319..82e3824b 100644 --- a/test/node/connection-emit-actions.js +++ b/test/node/connection-emit-actions.js @@ -7,10 +7,13 @@ const jstp = require('../..'); const app = require('../fixtures/application'); +const HEARTBEAT_INTERVAL = 100; + const application = new jstp.Application(app.name, app.interfaces); const serverConfig = { applications: [application], authPolicy: app.authCallback, + heartbeatInterval: HEARTBEAT_INTERVAL, }; let server; @@ -191,14 +194,34 @@ test.test('must emit messages in development mode', (test) => { }); test.test('must emit heartbeat messages in development mode', (test) => { - test.plan(2); + test.plan(4); + const received = { + serverPing: false, + serverPong: false, + clientPing: false, + clientPong: false, + }; - server.getClientsArray()[0].on('heartbeat', (message) => { - test.strictSame(message, {}, 'heartbeat message must match on server side'); + server.getClientsArray()[0].on('incomingMessage', (message) => { + if (message.ping !== undefined) { + received.serverPing = true; + } else if (message.pong !== undefined) { + received.serverPong = true; + } }); - connection.on('heartbeat', (message) => { - test.strictSame(message, {}, 'heartbeat message must match on client side'); + connection.on('incomingMessage', (message) => { + if (message.ping !== undefined) { + received.clientPing = true; + } else if (message.pong !== undefined) { + received.clientPong = true; + } }); - connection.startHeartbeat(100); + connection.startHeartbeat(HEARTBEAT_INTERVAL); + setTimeout(() => { + test.assert(received.serverPing, 'server must receive ping message'); + test.assert(received.serverPong, 'server must receive pong message'); + test.assert(received.clientPing, 'client must receive ping message'); + test.assert(received.clientPing, 'client must receive pong message'); + }, 2 * HEARTBEAT_INTERVAL); }); diff --git a/test/node/regress-gh-283.js b/test/node/regress-gh-283.js index 941fadf3..a7ed1c90 100644 --- a/test/node/regress-gh-283.js +++ b/test/node/regress-gh-283.js @@ -23,8 +23,10 @@ server.listen(() => { let heartbeatsCount = 0; - connection.on('heartbeat', () => { - heartbeatsCount++; + connection.on('incomingMessage', (message) => { + if (message.ping !== undefined) { + heartbeatsCount++; + } }); setTimeout(() => {