diff --git a/packages/mds-repository/@types/index.ts b/packages/mds-repository/@types/index.ts index 11ceb2fd2..abb1ccd30 100644 --- a/packages/mds-repository/@types/index.ts +++ b/packages/mds-repository/@types/index.ts @@ -15,19 +15,7 @@ */ import { InsertResult, UpdateResult } from 'typeorm' -import { Nullable, AnyFunction } from '@mds-core/mds-types' - -// eslint-reason recursive declarations require interfaces -// eslint-disable-next-line @typescript-eslint/no-empty-interface -export interface JsonArray extends Array {} - -export interface JsonObject { - [property: string]: Json -} - -export type JsonValue = string | number | boolean | JsonArray | JsonObject - -export type Json = Nullable +import { AnyFunction } from '@mds-core/mds-types' export interface InsertReturning extends InsertResult { raw: T[] diff --git a/packages/mds-types/index.ts b/packages/mds-types/index.ts index 12b819f36..9216d33c3 100644 --- a/packages/mds-types/index.ts +++ b/packages/mds-types/index.ts @@ -445,6 +445,17 @@ export interface Stop { reservation_cost?: Partial<{ [S in VEHICLE_TYPE]: number }> // Cost to reserve a spot per vehicle_type } +// eslint-reason recursive declarations require interfaces +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface JsonArray extends Array {} + +export interface JsonObject { + [property: string]: Json +} + +export type JsonValue = string | number | boolean | JsonArray | JsonObject + +export type Json = Nullable // eslint-reason Function and constructor inference must use a single rest parameter of type 'any[]' /* eslint-disable @typescript-eslint/no-explicit-any */ export type AnyFunction = (...args: any[]) => A diff --git a/packages/mds-web-sockets/client.ts b/packages/mds-web-sockets/client.ts index 2a5cddb32..addfaac6c 100644 --- a/packages/mds-web-sockets/client.ts +++ b/packages/mds-web-sockets/client.ts @@ -54,11 +54,11 @@ async function sendPush(entity: ENTITY_TYPE, data: VehicleEvent | Telemetry) { } export function writeTelemetry(telemetries: Telemetry[]) { - return telemetries.map(telemetry => sendPush('TELEMETRIES', telemetry)) + return telemetries.map(telemetry => sendPush('telemetry', telemetry)) } export function writeEvent(event: VehicleEvent) { - return sendPush('EVENTS', event) + return sendPush('event', event) } export function shutdown() { diff --git a/packages/mds-web-sockets/clients.ts b/packages/mds-web-sockets/clients.ts index 05fe6fc3c..e4381267e 100644 --- a/packages/mds-web-sockets/clients.ts +++ b/packages/mds-web-sockets/clients.ts @@ -28,8 +28,9 @@ export class Clients { return key.publicKey || key.rsaPublicKey } - public constructor() { - this.subList = { EVENTS: [], TELEMETRIES: [] } + public constructor(supportedEntities: readonly string[]) { + // Initialize subscription list with configured entities + this.subList = Object.fromEntries(supportedEntities.map(e => [e, []])) this.authenticatedClients = [] this.saveClient = this.saveClient.bind(this) } @@ -49,7 +50,9 @@ export class Clients { trimmedEntities.map(entity => { try { this.subList[entity].push(client) + client.send(`SUB%${JSON.stringify({ status: 'Success' })}`) } catch { + client.send(`SUB%${JSON.stringify({ status: 'Failure' })}`) return logger.error(`failed to push ${entity}`) } }) diff --git a/packages/mds-web-sockets/tests/ws.spec.ts b/packages/mds-web-sockets/tests/ws.spec.ts index 991f20e28..6c4bc4088 100644 --- a/packages/mds-web-sockets/tests/ws.spec.ts +++ b/packages/mds-web-sockets/tests/ws.spec.ts @@ -77,5 +77,31 @@ describe('Tests MDS-Web-Sockets', () => { return done(data) }) }) + + it('Subscribe and send event', done => { + const client = new WebSocket(`ws://localhost:${process.env.PORT || 4000}`) + client.onopen = () => { + client.send(`AUTH%${ADMIN_AUTH}`) + } + + client.on('message', data => { + if (data === 'AUTH%{"status":"Success"}') { + client.send('SUB%event') + return + } + + if (data === 'SUB%{"status":"Success"}') { + client.send(`PUSH%event%${JSON.stringify({ foo: 'bar' })}`) + return + } + + if (data === 'event%{"foo":"bar"}') { + client.close() + return done() + } + + return done + }) + }) }) }) diff --git a/packages/mds-web-sockets/types.ts b/packages/mds-web-sockets/types.ts index 312ed6556..2110f93b3 100644 --- a/packages/mds-web-sockets/types.ts +++ b/packages/mds-web-sockets/types.ts @@ -1,2 +1,2 @@ -export const ENTITY_TYPES = ['EVENTS', 'TELEMETRIES'] as const +export const ENTITY_TYPES = ['event', 'telemetry'] as const export type ENTITY_TYPE = typeof ENTITY_TYPES[number] diff --git a/packages/mds-web-sockets/ws-server.ts b/packages/mds-web-sockets/ws-server.ts index 43861d821..0cb8da80c 100644 --- a/packages/mds-web-sockets/ws-server.ts +++ b/packages/mds-web-sockets/ws-server.ts @@ -2,14 +2,19 @@ import logger from '@mds-core/mds-logger' import { seconds, getEnvVar } from '@mds-core/mds-utils' import WebSocket from 'ws' import { setWsHeartbeat } from 'ws-heartbeat/server' -import { Telemetry, VehicleEvent, Nullable } from '@mds-core/mds-types' +import { Nullable } from '@mds-core/mds-types' import { ApiServer, HttpServer } from '@mds-core/mds-api-server' import stream from '@mds-core/mds-stream' import { NatsError, Msg } from 'ts-nats' +import { ENTITY_TYPES } from './types' import { Clients } from './clients' -import { ENTITY_TYPE } from './types' -export const WebSocketServer = async () => { +/** + * Web Socket Server that autosubscribes to Nats stream and allows socket subscription by entity type + * @param entityTypes - entity names to support + */ +export const WebSocketServer = async (entityTypes?: T) => { + const supportedEntities = entityTypes || ENTITY_TYPES const server = HttpServer(ApiServer(app => app)) logger.info('Creating WS server') @@ -26,9 +31,13 @@ export const WebSocketServer = async () => { seconds(60) ) - const clients = new Clients() + const clients = new Clients(supportedEntities) - function pushToClients(entity: ENTITY_TYPE, message: string) { + function isSupported(entity: string) { + return supportedEntities.some(e => e === entity) + } + + function pushToClients(entity: string, message: string) { const staleClients: WebSocket[] = [] if (clients.subList[entity]) { clients.subList[entity].forEach(client => { @@ -47,15 +56,6 @@ export const WebSocketServer = async () => { staleClients.forEach(client => client.close()) } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - function writeTelemetry(telemetry: Telemetry) { - pushToClients('TELEMETRIES', JSON.stringify(telemetry)) - } - - function writeEvent(event: VehicleEvent) { - pushToClients('EVENTS', JSON.stringify(event)) - } - wss.on('connection', (ws: WebSocket) => { ws.on('message', async (data: WebSocket.Data) => { const message = data.toString().trim().split('%') @@ -66,19 +66,12 @@ export const WebSocketServer = async () => { if (clients.isAuthenticated(ws)) { if (args.length === 2) { const [entity, payload] = args - switch (entity) { - case 'EVENTS': { - const event = JSON.parse(payload) - return writeEvent(event) - } - case 'TELEMETRIES': { - const telemetry = JSON.parse(payload) - return writeTelemetry(telemetry) - } - default: { - return ws.send(`Invalid entity: ${entity}`) - } + // Limit messages to only supported entities + if (isSupported(entity)) { + await pushToClients(entity, payload) + return } + return ws.send(`Invalid entity: ${entity}`) } } } @@ -106,18 +99,12 @@ export const WebSocketServer = async () => { TENANT_ID: 'mds' }) - const eventProcessor = async (err: Nullable, msg: Msg) => { - if (err) logger.error(err) - const data = JSON.parse(msg.data) - await writeEvent(data) - } - - const telemetryProcessor = async (err: Nullable, msg: Msg) => { - if (err) logger.error(err) - const data = JSON.parse(msg.data) - await writeTelemetry(data) + const processor = async (err: Nullable, msg: Msg) => { + const entity = msg.subject.split('.')?.[1] + await pushToClients(entity, JSON.stringify(msg.data)) } - await stream.NatsStreamConsumer(`${TENANT_ID}.event`, eventProcessor).initialize() - await stream.NatsStreamConsumer(`${TENANT_ID}.telemetry`, telemetryProcessor).initialize() + supportedEntities.forEach(async e => { + await stream.NatsStreamConsumer(`${TENANT_ID}.event`, processor).initialize() + }) }