Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@Atrovato Fix #728 limit websocket messages #755

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions front/src/components/boxs/user-presence/UserPresence.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,18 @@ const UserPresence = ({ children, ...props }) => (
class UserPresenceComponent extends Component {
componentDidMount() {
this.props.getUsersWithPresence();
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.BACK_HOME, payload =>
this.props.userChanged(payload)
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.BACK_HOME, this.props.userChanged);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.LEFT_HOME, this.props.userChanged);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.BACK_HOME,
this.props.userChanged
);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.LEFT_HOME, payload =>
this.props.userChanged(payload)
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.USER_PRESENCE.LEFT_HOME,
this.props.userChanged
);
}

Expand Down
5 changes: 3 additions & 2 deletions front/src/routes/chat/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ import { Component } from 'preact';
import { connect } from 'unistore/preact';
import ChatPage from './ChatPage';
import actions from '../../actions/message';
import { WEBSOCKET_MESSAGE_TYPES } from '../../../../server/utils/constants';

@connect('session', actions)
class Chat extends Component {
newChatMessage = payload => this.props.pushMessage(payload);

componentDidMount() {
this.props.getMessages();
this.props.session.dispatcher.addListener('message.new', this.newChatMessage);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MESSAGE.NEW, this.newChatMessage);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener('message.new', this.newChatMessage);
this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.MESSAGE.NEW, this.newChatMessage);
}

render({}, {}) {
Expand Down
15 changes: 11 additions & 4 deletions front/src/routes/integration/all/mqtt/setup-page/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ class MqttNodePage extends Component {
componentWillMount() {
this.props.getIntegrationByName('mqtt');
this.props.loadProps();
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MQTT.CONNECTED, () =>
this.props.displayConnectedMessage()
this.props.session.dispatcher.addListener(
WEBSOCKET_MESSAGE_TYPES.MQTT.CONNECTED,
this.props.displayConnectedMessage
);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MQTT.ERROR, payload =>
this.props.displayMqttError(payload)
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.MQTT.ERROR, this.props.displayMqttError);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.MQTT.CONNECTED,
this.props.displayConnectedMessage
);
this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.MQTT.ERROR, this.props.displayMqttError);
}

render(props, {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class TasmotaIntegration extends Component {
);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.TASMOTA.NEW_DEVICE,
this.props.addDiscoveredDevice
);
}

render(props) {
return (
<TasmotaPage user={props.user}>
Expand Down
11 changes: 8 additions & 3 deletions front/src/routes/integration/all/xiaomi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ class XiaomiPage extends Component {
this.props.getHouses();
this.props.getXiaomiSensors();
this.props.getXiaomiDevices();
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.XIAOMI.NEW_DEVICE, payload => {
this.props.getXiaomiSensors();
});
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.XIAOMI.NEW_DEVICE, this.props.getXiaomiSensors);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.XIAOMI.NEW_DEVICE,
this.props.getXiaomiSensors
);
}

render(props, {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ZwaveNodeOperationPage extends Component {
componentWillUnmount() {
this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_ADDED, this.nodeAddedListener);
this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_READY, this.nodeReadyListener);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_REMOVED, this.nodeRemovedListener);
this.props.session.dispatcher.removeListener(WEBSOCKET_MESSAGE_TYPES.ZWAVE.NODE_REMOVED, this.nodeRemovedListener);
}

render(props, { remainingTimeInSeconds, nodeAdded }) {
Expand Down
13 changes: 7 additions & 6 deletions front/src/routes/scene/edit-scene/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,13 @@ class EditScene extends Component {

componentDidMount() {
this.getSceneBySelector();
this.props.session.dispatcher.addListener('scene.executing-action', payload =>
this.highlighCurrentlyExecutedAction(payload)
);
this.props.session.dispatcher.addListener('scene.finished-executing-action', payload =>
this.removeHighlighAction(payload)
);
this.props.session.dispatcher.addListener('scene.executing-action', this.highlighCurrentlyExecutedAction);
this.props.session.dispatcher.addListener('scene.finished-executing-action', this.removeHighlighAction);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener('scene.executing-action', this.highlighCurrentlyExecutedAction);
this.props.session.dispatcher.removeListener('scene.finished-executing-action', this.removeHighlighAction);
}

render(props, { saving, error, variables, scene }) {
Expand Down
30 changes: 24 additions & 6 deletions front/src/routes/settings/settings-system/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,32 @@ class SettingsSystem extends Component {
this.props.getDiskSpace();
this.props.getContainers();
this.props.getUpgradeDownloadStatus();
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_PROGRESS, payload =>
this.props.newDownloadProgress(payload)
this.props.session.dispatcher.addListener(
WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_PROGRESS,
this.props.newDownloadProgress
);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FINISHED, payload =>
this.props.downloadFinished()
this.props.session.dispatcher.addListener(
WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FINISHED,
this.props.downloadFinished
);
this.props.session.dispatcher.addListener(WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FAILED, payload =>
this.props.downloadFailed()
this.props.session.dispatcher.addListener(
WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FAILED,
this.props.downloadFailed
);
}

componentWillUnmount() {
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_PROGRESS,
this.props.newDownloadProgress
);
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FINISHED,
this.props.downloadFinished
);
this.props.session.dispatcher.removeListener(
WEBSOCKET_MESSAGE_TYPES.UPGRADE.DOWNLOAD_FAILED,
this.props.downloadFailed
);
}

Expand Down
27 changes: 27 additions & 0 deletions front/src/utils/Dispatcher.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { WEBSOCKET_MESSAGE_TYPES } from '../../../server/utils/constants';

export class Dispatcher {
constructor() {
this.events = {};
this.ws = null;
}

addListener(event, callback) {
Expand All @@ -24,6 +27,18 @@ export class Dispatcher {
}

this.events[event].listeners.push(callback);

if (this.events[event].listeners.length === 1 && this.ws) {
// Subscribe to websocket topic
this.ws.send(
JSON.stringify({
type: WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.SUBSCRIBE,
payload: {
event
}
})
);
}
}

removeListener(event, callback) {
Expand All @@ -36,6 +51,18 @@ export class Dispatcher {
this.events[event].listeners = this.events[event].listeners.filter(
listener => listener.toString() !== callback.toString()
);

if (this.events[event].listeners.length === 0 && this.ws) {
// Unsubscribe from websocket topic
this.ws.send(
JSON.stringify({
type: WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.UNSUBSCRIBE,
payload: {
event
}
})
);
}
}

dispatch(event, details) {
Expand Down
1 change: 1 addition & 0 deletions front/src/utils/Session.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Session {
const { type, payload } = JSON.parse(data);
this.dispatcher.dispatch(type, payload);
};
this.dispatcher.ws = this.ws;
};
this.ws.onerror = e => {
console.log('Error', e);
Expand Down
65 changes: 63 additions & 2 deletions server/api/websockets/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ function sendMessageUser({ type, payload, userId }) {
return;
}
this.connections[userId].forEach((userConnection) => {
userConnection.client.send(formatWebsocketMessage(type, payload));
if (userConnection.subscriptions[type] > 0) {
userConnection.client.send(formatWebsocketMessage(type, payload));
} else {
logger.trace(`No subscriber on ${type}`);
}
});
}

Expand All @@ -35,7 +39,11 @@ function sendMessageAllUsers({ type, payload }) {
const usersIds = Object.keys(this.connections);
usersIds.forEach((userId) => {
this.connections[userId].forEach((userConnection) => {
userConnection.client.send(formatWebsocketMessage(type, payload));
if (userConnection.subscriptions[type] > 0) {
userConnection.client.send(formatWebsocketMessage(type, payload));
} else {
logger.trace(`No subscriber on ${type}`);
}
});
});
}
Expand All @@ -58,6 +66,7 @@ function userConnected(user, client) {
this.connections[user.id].push({
user,
client,
subscriptions: {},
});
}

Expand Down Expand Up @@ -85,6 +94,50 @@ function userDisconnected(user, client) {
return null;
}

/**
* @description Add subscriber to client.
* @param {string} event - Event to subscribe to.
* @param {Object} client - Websocket client.
* @example
* addSubscriber('device.new', ws);
*/
function addSubscriber(event, client) {
logger.trace(`Websocket subscribed to ${event}`);

Object.values(this.connections).forEach((connection) => {
const connectionIndex = connection.findIndex((elem) => elem.client === client);

if (connectionIndex !== -1) {
const nbSubscribed = connection[connectionIndex].subscriptions[event] || 0;
connection[connectionIndex].subscriptions[event] = nbSubscribed + 1;
}
});

return null;
}

/**
* @description Remove subscriber from client.
* @param {string} event - Event to subscribe to.
* @param {Object} client - Websocket client.
* @example
* removeSubscriber('device.new', ws);
*/
function removeSubscriber(event, client) {
logger.trace(`Websocket unsubscribed from ${event}`);

Object.values(this.connections).forEach((connection) => {
const connectionIndex = connection.findIndex((elem) => elem.client === client);

if (connectionIndex !== -1) {
const nbSubscribed = connection[connectionIndex].subscriptions[event] || 0;
connection[connectionIndex].subscriptions[event] = Math.max(nbSubscribed - 1, 0);
}
});

return null;
}

/**
* @description Init websocket server.
* @example
Expand All @@ -102,6 +155,12 @@ function init() {
ws.on('message', async (data) => {
const parsedMessage = parseWebsocketMessage(data);
switch (parsedMessage.type) {
case WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.SUBSCRIBE:
this.addSubscriber(parsedMessage.payload.event, ws);
break;
case WEBSOCKET_MESSAGE_TYPES.SUBSCRIPTION.UNSUBSCRIBE:
this.removeSubscriber(parsedMessage.payload.event, ws);
break;
case WEBSOCKET_MESSAGE_TYPES.AUTHENTICATION.REQUEST:
try {
// we validate the token
Expand Down Expand Up @@ -134,5 +193,7 @@ WebsocketManager.prototype.userConnected = userConnected;
WebsocketManager.prototype.userDisconnected = userDisconnected;
WebsocketManager.prototype.sendMessageUser = sendMessageUser;
WebsocketManager.prototype.sendMessageAllUsers = sendMessageAllUsers;
WebsocketManager.prototype.addSubscriber = addSubscriber;
WebsocketManager.prototype.removeSubscriber = removeSubscriber;

module.exports = WebsocketManager;
15 changes: 15 additions & 0 deletions server/test/websockets/WebSocketServerMock.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const { fake } = require('sinon');
const EventEmitter = require('events');

const WebSocketServerMock = function WebSocketServerMock() {
this.ws = new EventEmitter();
this.ws.close = fake.returns(null);
this.ws.send = fake.returns(null);
this.ws.terminate = fake.returns(null);
};

WebSocketServerMock.prototype.on = function on(type, cb) {
cb(this.ws);
};

module.exports = WebSocketServerMock;
Loading