Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mds-web-socket] extensible entities #310

Merged
merged 18 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions packages/mds-repository/@types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@
*/

import { InsertResult, UpdateResult } from 'typeorm'
import { Nullable, AnyFunction } from '@mds-core/mds-types'

// eslint-reason recursive declarations require interfaces
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface JsonArray extends Array<Json> {}

export interface JsonObject {
[property: string]: Json
}

export type JsonValue = string | number | boolean | JsonArray | JsonObject

export type Json = Nullable<JsonValue>
import { AnyFunction } from '@mds-core/mds-types'

export interface InsertReturning<T> extends InsertResult {
raw: T[]
Expand Down
11 changes: 11 additions & 0 deletions packages/mds-types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,17 @@ export interface Stop {
reservation_cost?: Partial<{ [S in VEHICLE_TYPE]: number }> // Cost to reserve a spot per vehicle_type
}

// eslint-reason recursive declarations require interfaces
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface JsonArray extends Array<Json> {}

export interface JsonObject {
[property: string]: Json
}

export type JsonValue = string | number | boolean | JsonArray | JsonObject

export type Json = Nullable<JsonValue>
// eslint-reason Function and constructor inference must use a single rest parameter of type 'any[]'
/* eslint-disable @typescript-eslint/no-explicit-any */
export type AnyFunction<A = any> = (...args: any[]) => A
Expand Down
4 changes: 2 additions & 2 deletions packages/mds-web-sockets/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ async function sendPush(entity: ENTITY_TYPE, data: VehicleEvent | Telemetry) {
}

export function writeTelemetry(telemetries: Telemetry[]) {
return telemetries.map(telemetry => sendPush('TELEMETRIES', telemetry))
return telemetries.map(telemetry => sendPush('telemetry', telemetry))
}

export function writeEvent(event: VehicleEvent) {
return sendPush('EVENTS', event)
return sendPush('event', event)
}

export function shutdown() {
Expand Down
7 changes: 5 additions & 2 deletions packages/mds-web-sockets/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ export class Clients {
return key.publicKey || key.rsaPublicKey
}

public constructor() {
this.subList = { EVENTS: [], TELEMETRIES: [] }
public constructor(supportedEntities: readonly string[]) {
// Initialize subscription list with configured entities
this.subList = Object.fromEntries(supportedEntities.map(e => [e, []]))
this.authenticatedClients = []
this.saveClient = this.saveClient.bind(this)
}
Expand All @@ -49,7 +50,9 @@ export class Clients {
trimmedEntities.map(entity => {
try {
this.subList[entity].push(client)
client.send(`SUB%${JSON.stringify({ status: 'Success' })}`)
twelch marked this conversation as resolved.
Show resolved Hide resolved
} catch {
client.send(`SUB%${JSON.stringify({ status: 'Failure' })}`)
return logger.error(`failed to push ${entity}`)
}
})
Expand Down
26 changes: 26 additions & 0 deletions packages/mds-web-sockets/tests/ws.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,31 @@ describe('Tests MDS-Web-Sockets', () => {
return done(data)
})
})

it('Subscribe and send event', done => {
const client = new WebSocket(`ws://localhost:${process.env.PORT || 4000}`)
client.onopen = () => {
client.send(`AUTH%${ADMIN_AUTH}`)
}

client.on('message', data => {
if (data === 'AUTH%{"status":"Success"}') {
client.send('SUB%event')
return
}

if (data === 'SUB%{"status":"Success"}') {
client.send(`PUSH%event%${JSON.stringify({ foo: 'bar' })}`)
return
}

if (data === 'event%{"foo":"bar"}') {
client.close()
return done()
}

return done
})
})
})
})
2 changes: 1 addition & 1 deletion packages/mds-web-sockets/types.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export const ENTITY_TYPES = ['EVENTS', 'TELEMETRIES'] as const
export const ENTITY_TYPES = ['event', 'telemetry'] as const
export type ENTITY_TYPE = typeof ENTITY_TYPES[number]
63 changes: 25 additions & 38 deletions packages/mds-web-sockets/ws-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ import logger from '@mds-core/mds-logger'
import { seconds, getEnvVar } from '@mds-core/mds-utils'
import WebSocket from 'ws'
import { setWsHeartbeat } from 'ws-heartbeat/server'
import { Telemetry, VehicleEvent, Nullable } from '@mds-core/mds-types'
import { Nullable } from '@mds-core/mds-types'
import { ApiServer, HttpServer } from '@mds-core/mds-api-server'
import stream from '@mds-core/mds-stream'
import { NatsError, Msg } from 'ts-nats'
import { ENTITY_TYPES } from './types'
import { Clients } from './clients'
import { ENTITY_TYPE } from './types'

export const WebSocketServer = async () => {
/**
* Web Socket Server that autosubscribes to Nats stream and allows socket subscription by entity type
* @param entityTypes - entity names to support
*/
export const WebSocketServer = async <T extends readonly string[]>(entityTypes?: T) => {
const supportedEntities = entityTypes || ENTITY_TYPES
const server = HttpServer(ApiServer(app => app))

logger.info('Creating WS server')
Expand All @@ -26,9 +31,13 @@ export const WebSocketServer = async () => {
seconds(60)
)

const clients = new Clients()
const clients = new Clients(supportedEntities)

function pushToClients(entity: ENTITY_TYPE, message: string) {
function isSupported(entity: string) {
return supportedEntities.some(e => e === entity)
}

function pushToClients(entity: string, message: string) {
const staleClients: WebSocket[] = []
if (clients.subList[entity]) {
clients.subList[entity].forEach(client => {
Expand All @@ -47,15 +56,6 @@ export const WebSocketServer = async () => {
staleClients.forEach(client => client.close())
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
function writeTelemetry(telemetry: Telemetry) {
pushToClients('TELEMETRIES', JSON.stringify(telemetry))
}

function writeEvent(event: VehicleEvent) {
pushToClients('EVENTS', JSON.stringify(event))
}

wss.on('connection', (ws: WebSocket) => {
ws.on('message', async (data: WebSocket.Data) => {
const message = data.toString().trim().split('%')
Expand All @@ -66,19 +66,12 @@ export const WebSocketServer = async () => {
if (clients.isAuthenticated(ws)) {
if (args.length === 2) {
const [entity, payload] = args
switch (entity) {
case 'EVENTS': {
const event = JSON.parse(payload)
return writeEvent(event)
}
case 'TELEMETRIES': {
const telemetry = JSON.parse(payload)
return writeTelemetry(telemetry)
}
default: {
return ws.send(`Invalid entity: ${entity}`)
}
// Limit messages to only supported entities
if (isSupported(entity)) {
await pushToClients(entity, payload)
return
}
return ws.send(`Invalid entity: ${entity}`)
}
}
}
Expand Down Expand Up @@ -106,18 +99,12 @@ export const WebSocketServer = async () => {
TENANT_ID: 'mds'
})

const eventProcessor = async (err: Nullable<NatsError>, msg: Msg) => {
if (err) logger.error(err)
const data = JSON.parse(msg.data)
await writeEvent(data)
}

const telemetryProcessor = async (err: Nullable<NatsError>, msg: Msg) => {
if (err) logger.error(err)
const data = JSON.parse(msg.data)
await writeTelemetry(data)
const processor = async (err: Nullable<NatsError>, msg: Msg) => {
const entity = msg.subject.split('.')?.[1]
await pushToClients(entity, JSON.stringify(msg.data))
}

await stream.NatsStreamConsumer(`${TENANT_ID}.event`, eventProcessor).initialize()
await stream.NatsStreamConsumer(`${TENANT_ID}.telemetry`, telemetryProcessor).initialize()
supportedEntities.forEach(async e => {
await stream.NatsStreamConsumer(`${TENANT_ID}.event`, processor).initialize()
})
twelch marked this conversation as resolved.
Show resolved Hide resolved
}