Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mds-web-socket] extensible entities #310

Merged
merged 18 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions packages/mds-repository/@types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@
*/

import { InsertResult, UpdateResult } from 'typeorm'
import { Nullable, AnyFunction } from '@mds-core/mds-types'

// eslint-reason recursive declarations require interfaces
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface JsonArray extends Array<Json> {}

export interface JsonObject {
[property: string]: Json
}

export type JsonValue = string | number | boolean | JsonArray | JsonObject

export type Json = Nullable<JsonValue>
import { AnyFunction } from '@mds-core/mds-types'

export interface InsertReturning<T> extends InsertResult {
raw: T[]
Expand Down
11 changes: 11 additions & 0 deletions packages/mds-types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,17 @@ export interface Stop {
reservation_cost?: Partial<{ [S in VEHICLE_TYPE]: number }> // Cost to reserve a spot per vehicle_type
}

// eslint-reason recursive declarations require interfaces
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface JsonArray extends Array<Json> {}

export interface JsonObject {
[property: string]: Json
}

export type JsonValue = string | number | boolean | JsonArray | JsonObject

export type Json = Nullable<JsonValue>
// eslint-reason Function and constructor inference must use a single rest parameter of type 'any[]'
/* eslint-disable @typescript-eslint/no-explicit-any */
export type AnyFunction<A = any> = (...args: any[]) => A
Expand Down
8 changes: 4 additions & 4 deletions packages/mds-web-sockets/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { VehicleEvent, Telemetry } from '@mds-core/mds-types'
import logger from '@mds-core/mds-logger'
import { setWsHeartbeat, WebSocketBase } from 'ws-heartbeat/client'
import requestPromise from 'request-promise'
import { ENTITY_TYPE } from './types'
import { EntityType } from './types'

const { TOKEN, URL = 'mds-web-sockets:4000' } = process.env

Expand Down Expand Up @@ -44,7 +44,7 @@ async function getClient() {
}

/* Force test event to be send back to client */
async function sendPush(entity: ENTITY_TYPE, data: VehicleEvent | Telemetry) {
async function sendPush(entity: EntityType, data: VehicleEvent | Telemetry) {
try {
const client = await getClient()
return client.send(`PUSH%${entity}%${JSON.stringify(data)}`)
Expand All @@ -54,11 +54,11 @@ async function sendPush(entity: ENTITY_TYPE, data: VehicleEvent | Telemetry) {
}

export function writeTelemetry(telemetries: Telemetry[]) {
return telemetries.map(telemetry => sendPush('TELEMETRIES', telemetry))
return telemetries.map(telemetry => sendPush('telemetry', telemetry))
}

export function writeEvent(event: VehicleEvent) {
return sendPush('EVENTS', event)
return sendPush('event', event)
}

export function shutdown() {
Expand Down
7 changes: 5 additions & 2 deletions packages/mds-web-sockets/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ export class Clients {
return key.publicKey || key.rsaPublicKey
}

public constructor() {
this.subList = { EVENTS: [], TELEMETRIES: [] }
public constructor(supportedEntities: readonly string[]) {
// Initialize subscription list with configured entities
this.subList = Object.fromEntries(supportedEntities.map(e => [e, []]))
this.authenticatedClients = []
this.saveClient = this.saveClient.bind(this)
}
Expand All @@ -49,7 +50,9 @@ export class Clients {
trimmedEntities.map(entity => {
try {
this.subList[entity].push(client)
client.send(`SUB%${JSON.stringify({ status: 'Success' })}`)
twelch marked this conversation as resolved.
Show resolved Hide resolved
} catch {
client.send(`SUB%${JSON.stringify({ status: 'Failure' })}`)
return logger.error(`failed to push ${entity}`)
}
})
Expand Down
64 changes: 25 additions & 39 deletions packages/mds-web-sockets/server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import logger from '@mds-core/mds-logger'
import { seconds, getEnvVar } from '@mds-core/mds-utils'
import { Json } from '@mds-core/mds-types'
import WebSocket from 'ws'
import { setWsHeartbeat } from 'ws-heartbeat/server'
import { Telemetry, VehicleEvent } from '@mds-core/mds-types'
import { ApiServer, HttpServer } from '@mds-core/mds-api-server'
import { initializeNatsSubscriber } from '@mds-core/mds-stream/nats/nats'
import { Clients } from './clients'
import { ENTITY_TYPE } from './types'

export const WebSocketServer = () => {
import { EntityTypes } from './types'

/**
* Web Socket Server that pas
* @param entityTypes - entities to pass on to clients
*/
avatarneil marked this conversation as resolved.
Show resolved Hide resolved
export const WebSocketServer = <T extends readonly string[]>(entityTypes?: T) => {
const supportedEntities = entityTypes || EntityTypes
twelch marked this conversation as resolved.
Show resolved Hide resolved
const server = HttpServer(ApiServer(app => app))

logger.info('Creating WS server')
Expand All @@ -25,9 +30,13 @@ export const WebSocketServer = () => {
seconds(60)
)

const clients = new Clients()
const clients = new Clients(supportedEntities)

function isSupported(entity: string) {
return supportedEntities.findIndex(e => e === entity) > -1
}

function pushToClients(entity: ENTITY_TYPE, message: string) {
function pushToClients(entity: string, message: string) {
const staleClients: WebSocket[] = []
if (clients.subList[entity]) {
clients.subList[entity].forEach(client => {
Expand All @@ -46,15 +55,6 @@ export const WebSocketServer = () => {
staleClients.forEach(client => client.close())
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
function writeTelemetry(telemetry: Telemetry) {
pushToClients('TELEMETRIES', JSON.stringify(telemetry))
}

function writeEvent(event: VehicleEvent) {
pushToClients('EVENTS', JSON.stringify(event))
}

wss.on('connection', (ws: WebSocket) => {
ws.on('message', async (data: WebSocket.Data) => {
const message = data.toString().trim().split('%')
Expand All @@ -65,19 +65,12 @@ export const WebSocketServer = () => {
if (clients.isAuthenticated(ws)) {
if (args.length === 2) {
const [entity, payload] = args
switch (entity) {
case 'EVENTS': {
const event = JSON.parse(payload)
return writeEvent(event)
}
case 'TELEMETRIES': {
const telemetry = JSON.parse(payload)
return writeTelemetry(telemetry)
}
default: {
return ws.send(`Invalid entity: ${entity}`)
}
// Limit messages to only supported entities
if (isSupported(entity)) {
await pushToClients(entity, payload)
return
}
return ws.send(`Invalid entity: ${entity}`)
}
}
}
Expand Down Expand Up @@ -105,19 +98,12 @@ export const WebSocketServer = () => {
TENANT_ID: 'mds'
})

const processor = async (type: string, data: VehicleEvent | Telemetry) => {
switch (type) {
case 'event': {
await writeEvent(data as VehicleEvent)
return
}
case 'telemetry': {
await writeTelemetry(data as Telemetry)
return
}
default:
logger.error(`Unprocessable entity of type: ${type} and data: ${JSON.stringify(data)}`)
const processor = async (entity: string, data: Json) => {
if (isSupported(entity)) {
await pushToClients(entity, JSON.stringify(data))
return
}
logger.error(`Unprocessable entity of type: ${entity} and data: ${JSON.stringify(data)}`)
}

// eslint-disable-next-line @typescript-eslint/no-floating-promises
Expand Down
26 changes: 26 additions & 0 deletions packages/mds-web-sockets/tests/ws.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,31 @@ describe('Tests MDS-Web-Sockets', () => {
return done(data)
})
})

it('Subscribe and send event', done => {
const client = new WebSocket(`ws://localhost:${process.env.PORT || 4000}`)
client.onopen = () => {
client.send(`AUTH%${ADMIN_AUTH}`)
}

client.on('message', data => {
if (data === 'AUTH%{"status":"Success"}') {
client.send('SUB%event')
return
}

if (data === 'SUB%{"status":"Success"}') {
client.send(`PUSH%event%${JSON.stringify({ foo: 'bar' })}`)
return
}

if (data === 'event%{"foo":"bar"}') {
client.close()
return done()
}

return done
})
})
})
})
4 changes: 2 additions & 2 deletions packages/mds-web-sockets/types.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export const ENTITY_TYPES = ['EVENTS', 'TELEMETRIES'] as const
export type ENTITY_TYPE = typeof ENTITY_TYPES[number]
export const EntityTypes = ['event', 'telemetry'] as const
export type EntityType = typeof EntityTypes[number]
twelch marked this conversation as resolved.
Show resolved Hide resolved