diff --git a/front/src/components/boxs/user-presence/UserPresence.jsx b/front/src/components/boxs/user-presence/UserPresence.jsx index 8751e3e76f..3ffb1638d3 100644 --- a/front/src/components/boxs/user-presence/UserPresence.jsx +++ b/front/src/components/boxs/user-presence/UserPresence.jsx @@ -97,11 +97,18 @@ const UserPresence = ({ children, ...props }) => ( class UserPresenceComponent extends Component { componentDidMount() { this.props.getUsersWithPresence(); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.BACK_HOME, payload => - this.props.userChanged(payload) + this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.BACK_HOME, this.props.userChanged); + this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.LEFT_HOME, this.props.userChanged); + } + + componentWillUnmount() { + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.BACK_HOME, + this.props.userChanged ); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.LEFT_HOME, payload => - this.props.userChanged(payload) + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.LEFT_HOME, + this.props.userChanged ); } diff --git a/front/src/routes/chat/index.js b/front/src/routes/chat/index.js index 946676a219..b765d8d544 100644 --- a/front/src/routes/chat/index.js +++ b/front/src/routes/chat/index.js @@ -2,6 +2,7 @@ import { Component } from 'preact'; import { connect } from 'unistore/preact'; import ChatPage from './ChatPage'; import actions from '../../actions/message'; +import { WEBSOCKET_MESSAGE_TYPES } from '../../../../server/utils/constants'; @connect('session', actions) class Chat extends Component { @@ -9,11 +10,11 @@ class Chat extends Component { componentDidMount() { this.props.getMessages(); - this.props.session.dispatcher.addListener('message.new', this.newChatMessage); + this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MESSAGE.NEW, this.newChatMessage); } componentWillUnmount() { - this.props.session.dispatcher.removeListener('message.new', this.newChatMessage); + this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.MESSAGE.NEW, this.newChatMessage); } render({}, {}) { diff --git a/front/src/routes/integration/all/mqtt/setup-page/index.js b/front/src/routes/integration/all/mqtt/setup-page/index.js index ae4ae6cf42..89fbddac68 100644 --- a/front/src/routes/integration/all/mqtt/setup-page/index.js +++ b/front/src/routes/integration/all/mqtt/setup-page/index.js @@ -10,12 +10,19 @@ class MqttNodePage extends Component { componentWillMount() { this.props.getIntegrationByName('mqtt'); this.props.loadProps(); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MQTT.CONNECTED, () => - this.props.displayConnectedMessage() + this.props.session.dispatcher.addListener( + WEBSOCKET_MESSAGE_TYPES.MQTT.CONNECTED, + this.props.displayConnectedMessage ); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MQTT.ERROR, payload => - this.props.displayMqttError(payload) + this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MQTT.ERROR, this.props.displayMqttError); + } + + componentWillUnmount() { + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.MQTT.CONNECTED, + this.props.displayConnectedMessage ); + this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.MQTT.ERROR, this.props.displayMqttError); } render(props, {}) { diff --git a/front/src/routes/integration/all/tasmota/discover-page/index.js b/front/src/routes/integration/all/tasmota/discover-page/index.js index 7f05a9b770..0a782a9f8d 100644 --- a/front/src/routes/integration/all/tasmota/discover-page/index.js +++ b/front/src/routes/integration/all/tasmota/discover-page/index.js @@ -18,6 +18,13 @@ class TasmotaIntegration extends Component { ); } + componentWillUnmount() { + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.TASMOTA.NEW_DEVICE, + this.props.addDiscoveredDevice + ); + } + render(props) { return ( diff --git a/front/src/routes/integration/all/xiaomi/index.js b/front/src/routes/integration/all/xiaomi/index.js index f743a1a8db..9ca72ccde4 100644 --- a/front/src/routes/integration/all/xiaomi/index.js +++ b/front/src/routes/integration/all/xiaomi/index.js @@ -12,9 +12,14 @@ class XiaomiPage extends Component { this.props.getHouses(); this.props.getXiaomiSensors(); this.props.getXiaomiDevices(); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.XIAOMI.NEW_DEVICE, payload => { - this.props.getXiaomiSensors(); - }); + this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.XIAOMI.NEW_DEVICE, this.props.getXiaomiSensors); + } + + componentWillUnmount() { + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.XIAOMI.NEW_DEVICE, + this.props.getXiaomiSensors + ); } render(props, {}) { diff --git a/front/src/routes/integration/all/zwave/node-operation-page/index.js b/front/src/routes/integration/all/zwave/node-operation-page/index.js index 3dbf862c5a..a71939281b 100644 --- a/front/src/routes/integration/all/zwave/node-operation-page/index.js +++ b/front/src/routes/integration/all/zwave/node-operation-page/index.js @@ -75,7 +75,7 @@ class ZwaveNodeOperationPage extends Component { componentWillUnmount() { this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_ADDED, this.nodeAddedListener); this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_READY, this.nodeReadyListener); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_REMOVED, this.nodeRemovedListener); + this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_REMOVED, this.nodeRemovedListener); } render(props, { remainingTimeInSeconds, nodeAdded }) { diff --git a/front/src/routes/scene/edit-scene/index.js b/front/src/routes/scene/edit-scene/index.js index 622f73dca2..ce6d876bc9 100644 --- a/front/src/routes/scene/edit-scene/index.js +++ b/front/src/routes/scene/edit-scene/index.js @@ -241,12 +241,13 @@ class EditScene extends Component { componentDidMount() { this.getSceneBySelector(); - this.props.session.dispatcher.addListener('scene.executing-action', payload => - this.highlighCurrentlyExecutedAction(payload) - ); - this.props.session.dispatcher.addListener('scene.finished-executing-action', payload => - this.removeHighlighAction(payload) - ); + this.props.session.dispatcher.addListener('scene.executing-action', this.highlighCurrentlyExecutedAction); + this.props.session.dispatcher.addListener('scene.finished-executing-action', this.removeHighlighAction); + } + + componentWillUnmount() { + this.props.session.dispatcher.removeListener('scene.executing-action', this.highlighCurrentlyExecutedAction); + this.props.session.dispatcher.removeListener('scene.finished-executing-action', this.removeHighlighAction); } render(props, { saving, error, variables, scene }) { diff --git a/front/src/routes/settings/settings-system/index.js b/front/src/routes/settings/settings-system/index.js index 13c223b0a4..c83d304c15 100644 --- a/front/src/routes/settings/settings-system/index.js +++ b/front/src/routes/settings/settings-system/index.js @@ -17,14 +17,32 @@ class SettingsSystem extends Component { this.props.getDiskSpace(); this.props.getContainers(); this.props.getUpgradeDownloadStatus(); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_PROGRESS, payload => - this.props.newDownloadProgress(payload) + this.props.session.dispatcher.addListener( + WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_PROGRESS, + this.props.newDownloadProgress ); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FINISHED, payload => - this.props.downloadFinished() + this.props.session.dispatcher.addListener( + WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FINISHED, + this.props.downloadFinished ); - this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FAILED, payload => - this.props.downloadFailed() + this.props.session.dispatcher.addListener( + WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FAILED, + this.props.downloadFailed + ); + } + + componentWillUnmount() { + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_PROGRESS, + this.props.newDownloadProgress + ); + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FINISHED, + this.props.downloadFinished + ); + this.props.session.dispatcher.removeListener( + WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FAILED, + this.props.downloadFailed ); } diff --git a/front/src/utils/Dispatcher.js b/front/src/utils/Dispatcher.js index c34dc12815..665a8a335f 100644 --- a/front/src/utils/Dispatcher.js +++ b/front/src/utils/Dispatcher.js @@ -1,6 +1,9 @@ +import { WEBSOCKET_MESSAGE_TYPES } from '../../../server/utils/constants'; + export class Dispatcher { constructor() { this.events = {}; + this.ws = null; } addListener(event, callback) { @@ -24,6 +27,18 @@ export class Dispatcher { } this.events[event].listeners.push(callback); + + if (this.events[event].listeners.length === 1 && this.ws) { + // Subscribe to websocket topic + this.ws.send( + JSON.stringify({ + type: WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.SUBSCRIBE, + payload: { + event + } + }) + ); + } } removeListener(event, callback) { @@ -36,6 +51,18 @@ export class Dispatcher { this.events[event].listeners = this.events[event].listeners.filter( listener => listener.toString() !== callback.toString() ); + + if (this.events[event].listeners.length === 0 && this.ws) { + // Unsubscribe from websocket topic + this.ws.send( + JSON.stringify({ + type: WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.UNSUBSCRIBE, + payload: { + event + } + }) + ); + } } dispatch(event, details) { diff --git a/front/src/utils/Session.js b/front/src/utils/Session.js index 8fbcd92895..27e31f5d6f 100644 --- a/front/src/utils/Session.js +++ b/front/src/utils/Session.js @@ -54,6 +54,7 @@ class Session { const { type, payload } = JSON.parse(data); this.dispatcher.dispatch(type, payload); }; + this.dispatcher.ws = this.ws; }; this.ws.onerror = e => { console.log('Error', e); diff --git a/server/api/websockets/index.js b/server/api/websockets/index.js index 0e85cf8b26..6dc70b6a59 100644 --- a/server/api/websockets/index.js +++ b/server/api/websockets/index.js @@ -21,7 +21,11 @@ function sendMessageUser({ type, payload, userId }) { return; } this.connections[userId].forEach((userConnection) => { - userConnection.client.send(formatWebsocketMessage(type, payload)); + if (userConnection.subscriptions[type] > 0) { + userConnection.client.send(formatWebsocketMessage(type, payload)); + } else { + logger.trace(`No subscriber on ${type}`); + } }); } @@ -35,7 +39,11 @@ function sendMessageAllUsers({ type, payload }) { const usersIds = Object.keys(this.connections); usersIds.forEach((userId) => { this.connections[userId].forEach((userConnection) => { - userConnection.client.send(formatWebsocketMessage(type, payload)); + if (userConnection.subscriptions[type] > 0) { + userConnection.client.send(formatWebsocketMessage(type, payload)); + } else { + logger.trace(`No subscriber on ${type}`); + } }); }); } @@ -58,6 +66,7 @@ function userConnected(user, client) { this.connections[user.id].push({ user, client, + subscriptions: {}, }); } @@ -85,6 +94,50 @@ function userDisconnected(user, client) { return null; } +/** + * @description Add subscriber to client. + * @param {string} event - Event to subscribe to. + * @param {Object} client - Websocket client. + * @example + * addSubscriber('device.new', ws); + */ +function addSubscriber(event, client) { + logger.trace(`Websocket subscribed to ${event}`); + + Object.values(this.connections).forEach((connection) => { + const connectionIndex = connection.findIndex((elem) => elem.client === client); + + if (connectionIndex !== -1) { + const nbSubscribed = connection[connectionIndex].subscriptions[event] || 0; + connection[connectionIndex].subscriptions[event] = nbSubscribed + 1; + } + }); + + return null; +} + +/** + * @description Remove subscriber from client. + * @param {string} event - Event to subscribe to. + * @param {Object} client - Websocket client. + * @example + * removeSubscriber('device.new', ws); + */ +function removeSubscriber(event, client) { + logger.trace(`Websocket unsubscribed from ${event}`); + + Object.values(this.connections).forEach((connection) => { + const connectionIndex = connection.findIndex((elem) => elem.client === client); + + if (connectionIndex !== -1) { + const nbSubscribed = connection[connectionIndex].subscriptions[event] || 0; + connection[connectionIndex].subscriptions[event] = Math.max(nbSubscribed - 1, 0); + } + }); + + return null; +} + /** * @description Init websocket server. * @example @@ -102,6 +155,12 @@ function init() { ws.on('message', async (data) => { const parsedMessage = parseWebsocketMessage(data); switch (parsedMessage.type) { + case WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.SUBSCRIBE: + this.addSubscriber(parsedMessage.payload.event, ws); + break; + case WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.UNSUBSCRIBE: + this.removeSubscriber(parsedMessage.payload.event, ws); + break; case WEBSOCKET_MESSAGE_TYPES.AUTHENTICATION.REQUEST: try { // we validate the token @@ -134,5 +193,7 @@ WebsocketManager.prototype.userConnected = userConnected; WebsocketManager.prototype.userDisconnected = userDisconnected; WebsocketManager.prototype.sendMessageUser = sendMessageUser; WebsocketManager.prototype.sendMessageAllUsers = sendMessageAllUsers; +WebsocketManager.prototype.addSubscriber = addSubscriber; +WebsocketManager.prototype.removeSubscriber = removeSubscriber; module.exports = WebsocketManager; diff --git a/server/test/websockets/WebSocketServerMock.test.js b/server/test/websockets/WebSocketServerMock.test.js new file mode 100644 index 0000000000..c7d09302fa --- /dev/null +++ b/server/test/websockets/WebSocketServerMock.test.js @@ -0,0 +1,15 @@ +const { fake } = require('sinon'); +const EventEmitter = require('events'); + +const WebSocketServerMock = function WebSocketServerMock() { + this.ws = new EventEmitter(); + this.ws.close = fake.returns(null); + this.ws.send = fake.returns(null); + this.ws.terminate = fake.returns(null); +}; + +WebSocketServerMock.prototype.on = function on(type, cb) { + cb(this.ws); +}; + +module.exports = WebSocketServerMock; diff --git a/server/test/websockets/websockets.test.js b/server/test/websockets/websockets.test.js new file mode 100644 index 0000000000..cd86306c88 --- /dev/null +++ b/server/test/websockets/websockets.test.js @@ -0,0 +1,260 @@ +const sinon = require('sinon'); +const { expect } = require('chai'); + +const { fake, assert } = sinon; +const WebSocketServerMock = require('./WebSocketServerMock.test'); +const WebsocketManager = require('../../api/websockets'); +const { WEBSOCKET_MESSAGE_TYPES } = require('../../utils/constants'); + +const wss = new WebSocketServerMock(); +const userId = 'USER_ID'; +const user = { id: userId, firstname: 'Tony' }; + +const gladys = { + event: { + on: fake.returns(null), + }, + session: { + validateAccessToken: fake.returns(user), + }, + user: { + getById: fake.returns({ id: userId }), + }, +}; + +describe('websocket lib', () => { + let websocketManager; + + beforeEach(() => { + sinon.reset(); + websocketManager = new WebsocketManager(wss, gladys); + websocketManager.init(); + }); + + afterEach(() => { + wss.ws.removeAllListeners(); + }); + + it('message type not handled', () => { + const message = {}; + wss.ws.emit('message', JSON.stringify(message)); + + assert.notCalled(wss.ws.close); + assert.notCalled(gladys.session.validateAccessToken); + assert.notCalled(gladys.user.getById); + }); + + it('handle empty authentication request', () => { + const message = { + type: WEBSOCKET_MESSAGE_TYPES.AUTHENTICATION.REQUEST, + }; + wss.ws.emit('message', JSON.stringify(message)); + + assert.called(wss.ws.close); + assert.notCalled(gladys.session.validateAccessToken); + assert.notCalled(gladys.user.getById); + }); + + it('handle authentication request', () => { + const message = { + type: WEBSOCKET_MESSAGE_TYPES.AUTHENTICATION.REQUEST, + payload: { + accessToken: 'accessToken', + }, + }; + wss.ws.emit('message', JSON.stringify(message)); + + assert.notCalled(wss.ws.close); + assert.called(gladys.session.validateAccessToken); + assert.called(gladys.user.getById); + }); + + it('userConnected: add new connection', () => { + const client = 'CLIENT'; + + // First add + websocketManager.userConnected(user, client); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [ + { + user, + client, + subscriptions: {}, + }, + ], + }); + + // Second add (same) + websocketManager.userConnected(user, client); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [ + { + user, + client, + subscriptions: {}, + }, + ], + }); + + // Third add (a new one) + const client2 = 'CLIENT2'; + websocketManager.userConnected(user, client2); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [ + { + user, + client, + subscriptions: {}, + }, + { + user, + client: client2, + subscriptions: {}, + }, + ], + }); + }); + + it('userDisconnected: remove connection', () => { + const client = 'CLIENT'; + websocketManager.userConnected(user, client); + const client2 = 'CLIENT2'; + websocketManager.userConnected(user, client2); + + websocketManager.userDisconnected(user, client2); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [ + { + user, + client, + subscriptions: {}, + }, + ], + }); + }); + + it('userDisconnected: remove not existing connection', () => { + const client2 = 'CLIENT2'; + websocketManager.userDisconnected(user, client2); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [], + }); + }); + + it('addSubscriber: add subscription', () => { + const client = 'CLIENT'; + websocketManager.userConnected(user, client); + + const event = 'event'; + websocketManager.addSubscriber(event, client); + websocketManager.addSubscriber(event, client); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [ + { + user, + client, + subscriptions: { event: 2 }, + }, + ], + }); + }); + + it('addSubscriber: add subscription to missing client', () => { + const client = 'CLIENT'; + const event = 'event'; + websocketManager.addSubscriber(event, client); + + expect(websocketManager.connections).to.deep.eq({}); + }); + + it('removeSubscriber: remove subscription', () => { + const client = 'CLIENT'; + websocketManager.userConnected(user, client); + + const event = 'event'; + websocketManager.addSubscriber(event, client); + websocketManager.removeSubscriber(event, client); + + expect(websocketManager.connections).to.deep.eq({ + [userId]: [ + { + user, + client, + subscriptions: { event: 0 }, + }, + ], + }); + }); + + it('removeSubscriber: remove subscription to missing client', () => { + const client = 'CLIENT'; + const event = 'event'; + websocketManager.removeSubscriber(event, client); + + expect(websocketManager.connections).to.deep.eq({}); + }); + + it('sendMessageUser: to unknown user', () => { + const message = { type: 'type', payload: 'payload', userId }; + websocketManager.sendMessageUser(message); + + assert.notCalled(wss.ws.send); + }); + + it('sendMessageUser: to known user, but no subscription', () => { + const client = 'CLIENT'; + websocketManager.userConnected(user, client); + + const message = { type: 'type', payload: 'payload', userId }; + websocketManager.sendMessageUser(message); + + assert.notCalled(wss.ws.send); + }); + + it('sendMessageUser: success', () => { + const type = 'event'; + websocketManager.userConnected(user, wss.ws); + + const message = { type, payload: 'payload', userId }; + websocketManager.addSubscriber(type, wss.ws); + + websocketManager.sendMessageUser(message); + + assert.called(wss.ws.send); + }); + + it('sendMessageAllUsers: to unknown user', () => { + const message = { type: 'type', payload: 'payload' }; + websocketManager.sendMessageUser(message); + + assert.notCalled(wss.ws.send); + }); + + it('sendMessageAllUsers: to known user, but no subscription', () => { + const client = 'CLIENT'; + websocketManager.userConnected(user, client); + + const message = { type: 'type', payload: 'payload' }; + websocketManager.sendMessageAllUsers(message); + + assert.notCalled(wss.ws.send); + }); + + it('sendMessageAllUsers: success', () => { + const type = 'event'; + websocketManager.userConnected(user, wss.ws); + + const message = { type, payload: 'payload' }; + websocketManager.addSubscriber(type, wss.ws); + + websocketManager.sendMessageAllUsers(message); + + assert.called(wss.ws.send); + }); +}); diff --git a/server/utils/constants.js b/server/utils/constants.js index 98bc4b941a..1f563ebde3 100644 --- a/server/utils/constants.js +++ b/server/utils/constants.js @@ -338,6 +338,10 @@ const WEBSOCKET_MESSAGE_TYPES = { AUTHENTICATION: { REQUEST: 'authenticate.request', }, + SUBSCRIPTION: { + SUBSCRIBE: 'subscription.subscribe', + UNSUBSCRIBE: 'subscription.unsubscribe', + }, GATEWAY: { BACKUP_UPLOAD_PROGRESS: 'gateway.backup-upload-progress', BACKUP_DOWNLOAD_PROGRESS: 'gateway.backup-download-progress',