diff --git a/src/server/admin-api.ts b/src/server/admin-api.ts index 7116d3a..8a12c77 100644 --- a/src/server/admin-api.ts +++ b/src/server/admin-api.ts @@ -58,7 +58,7 @@ export default (server: Server) => { // TODO: validate JSON payload server.isDebugEnabled = req.body.debug const value = server.isDebugEnabled - server.emit("serverevent", { type: "DEBUG", data: value }) + server.emit("loaderevent", { type: "DEBUG", data: value }) res.send(JSON.stringify({ debug: value })) }) diff --git a/src/server/influxdb.ts b/src/server/influxdb.ts index 56da83f..e54fb11 100644 --- a/src/server/influxdb.ts +++ b/src/server/influxdb.ts @@ -25,7 +25,7 @@ export class InfluxDBBackend { this.server = server this.logger = server.getLogger("influxdb") this.lastWriteTime = Date.now() - this.server.app.on("settingsChanged", () => { this.settingsChanged() }) + this.server.on("settingsChanged", () => { this.settingsChanged() }) this.settingsChanged() } @@ -64,7 +64,7 @@ export class InfluxDBBackend { } } - async store(portalId: string, name: string, instanceNumber: number, measurement: string, value: number) { + async store(portalId: string, name: string, instanceNumber: string, measurement: string, value: number) { if (!this.isConnected || value === undefined || value === null) { return } diff --git a/src/server/loader.cjs b/src/server/loader.cjs deleted file mode 100644 index 59c414a..0000000 --- a/src/server/loader.cjs +++ /dev/null @@ -1,414 +0,0 @@ -const _ = require("lodash") -const mqtt = require("mqtt") -const ignoredMeasurements = require("./ignoredMeasurements.cjs") -const buildVersion = require("../buildInfo.cjs").buildVersion - -const collectStatsInterval = 5 -const keepAliveInterval = 30 - -function Loader(app) { - this.app = app - this.upnpConnections = {} - this.manualConnections = {} - this.vrmConnections = {} - - this.deviceStats = {} - this.deviceMeasurements = {} - this.totalCount = 0 - this.lastIntervalCount = 0 - this.vrmSubscriptions = [] - - this.logger = app.getLogger("loader") -} - -Loader.prototype.start = function () { - this.logger.debug("starting...") - this.app.emit("serverevent", { - type: "LOADERSTATISTICS", - }) - - this.app.on("upnpDiscovered", (info) => { - const upnp = this.app.config.settings.upnp - if (!this.upnpConnections[info.portalId] && upnp.enabledPortalIds.indexOf(info.portalId) !== -1) { - this.connectUPNP(info) - } - }) - - this.app.on("vrmDiscovered", (devices) => { - if (this.app.config.settings.vrm.enabled) { - this.connectVRM(devices) - } - }) - - this.app.on("settingsChanged", this.settingsChanged.bind(this)) - - this.collectInterval = setInterval(this.collectStats.bind(this), collectStatsInterval * 1000) -} - -Loader.prototype.getPortalName = function (client, id) { - if (client.deviceName) { - return client.deviceName - } - if (this.app.config.settings.upnp.enabled) { - const info = this.app.upnpDiscovered[id] - if (info && info.name) { - return info.name - } - } - if (this.app.config.settings.vrm.enabled) { - const info = this.app.vrmDiscovered.find((info) => info.portalId === id) - if (info && info.name) { - return info.name - } - } -} - -Loader.prototype.sendKeepAlive = function (client, portalId, isFirstKeepAliveRequest) { - this.logger.debug( - `sending keep alive for ${portalId}, isFirstKeepAliveRequest: ${isFirstKeepAliveRequest} [${client.venusKeepAlive}]`, - ) - client.publish( - `R/${portalId}/system/0/Serial`, - isFirstKeepAliveRequest ? "" : '{ "keepalive-options" : ["suppress-republish"] }', - ) -} - -Loader.prototype.keepAlive = function (client) { - if (client.portalId) { - this.sendKeepAlive(client, client.portalId, client.isFirstKeepAliveRequest) - client.isFirstKeepAliveRequest = false - } -} - -Loader.prototype.onMessage = function (client, topic, message) { - //console.log(`${topic} : ${message}`) - - if (_.isUndefined(message) || message == null || message.length === 0) { - return - } - - const split = topic.split("/") - const id = split[1] - const instanceNumber = split[3] - - split.splice(0, 2) - split.splice(1, 1) - const measurement = split.join("/") - - if (ignoredMeasurements.find((path) => measurement.startsWith(path))) { - return - } - - try { - const json = JSON.parse(message) - - //console.log(`${id} ${instanceNumber} ${measurement} ${json.value}`) - - if (client.venusNeedsID && measurement === "system/Serial") { - this.logger.info("Detected portalId %s", json.value) - client.subscribe(`N/${json.value}/settings/0/Settings/SystemSetup/SystemName`) - client.subscribe(`N/${json.value}/#`) - client.publish(`R/${json.value}/settings/0/Settings/SystemSetup/SystemName`) - client.publish(`R/${json.value}/system/0/Serial`) - client.venusNeedsID = false - client.portalId = json.value - return - } - - const name = this.getPortalName(client, id) - - let portalStats - let measurements - this.totalCount++ - if (this.deviceStats[id]) { - portalStats = this.deviceStats[id] - measurements = this.deviceMeasurements[id] - } else { - portalStats = { - measurementCount: 0, - measurementRate: 0, - lastIntervalCount: 0, - name: name || id, - } - this.deviceStats[id] = portalStats - measurements = [] - this.deviceMeasurements[id] = measurements - } - - portalStats.lastMeasurement = new Date() - portalStats.measurementCount++ - - if (measurements.indexOf(measurement) === -1) { - this.logger.debug("got measurement %s = %j", measurement, json.value) - measurements.push(measurement) - } - - if (!name && !client.isVrm) { - if (measurement === "settings/Settings/SystemSetup/SystemName") { - if (json.value.length === 0) { - client.deviceName = id - } else { - this.logger.info("Detected name %s : %j", id, json.value) - client.deviceName = json.value - portalStats.name = client.deviceName - } - } - return - } - - this.app.influxdb.store(id, name, instanceNumber, measurement, json.value) - } catch (e) { - this.logger.error(`can't record ${topic}: ${message}`) - this.logger.error(e) - } -} - -Loader.prototype.close = function (connectionInfo) { - this.logger.info("closing connection to %s", connectionInfo.client.portalId || connectionInfo.address) - connectionInfo.client.end(true) -} - -Loader.prototype.settingsChanged = function (settings) { - //close existing connections if upnp disabled or a device is disabled - _.keys(this.upnpConnections).forEach((id) => { - if (!settings.upnp.enabled || settings.upnp.enabledPortalIds.indexOf(id) === -1) { - this.close(this.upnpConnections[id]) - delete this.upnpConnections[id] - } - }) - - // open connections for upnp devices that were previously disabled - if (settings.upnp.enabled) { - _.keys(this.app.upnpDiscovered).forEach((id) => { - if (!this.upnpConnections[id] && settings.upnp.enabledPortalIds.indexOf(id) !== -1) { - this.connectUPNP(this.app.upnpDiscovered[id]) - } - }) - } - - if (_.keys(this.vrmConnections).length > 0) { - _.values(this.vrmConnections).forEach((info) => { - this.logger.info(`closing vrm connection for ${info.portalId}`) - info.client.end(true) - }) - this.vrmConnections = {} - this.app.emit("vrmStatus", { - status: "success", - message: "Connections Closed", - }) - } - if (settings.vrm.enabled) { - this.connectVRM(this.app.vrmDiscovered) - } - - if (settings.manual.enabled) { - settings.manual.hosts.forEach((host) => { - if (host.enabled) { - if (!this.manualConnections[host.hostName]) { - this.connectManual({ address: host.hostName }) - } - } else if (this.manualConnections[host.hostName]) { - this.close(this.manualConnections[host.hostName]) - delete this.manualConnections[host.hostName] - } - }) - _.keys(this.manualConnections).forEach((ip) => { - if (!settings.manual.hosts.find((host) => host.hostName === ip)) { - this.close(this.manualConnections[ip]) - delete this.manualConnections[ip] - } - }) - } else { - _.keys(this.manualConnections).forEach((ip) => { - this.close(this.manualConnections[ip]) - }) - this.manualConnections = {} - } -} - -// for UPNP info contains IP `address` and `portalId` -Loader.prototype.connectUPNP = function (info) { - this.upnpConnections[info.portalId] = { - name: info.name, - address: info.address, - } - - this.connect(info.address, 1883, info) - .then((client) => { - this.upnpConnections[info.portalId].client = client - }) - .catch((err) => { - this.logger.error(err) - }) -} - -// for Manual info contains IP `address` -Loader.prototype.connectManual = function (info) { - this.manualConnections[info.address] = { - name: info.name, - address: info.address, - } - - this.connect(info.address, 1883, info) - .then((client) => { - this.manualConnections[info.address].client = client - }) - .catch((err) => { - this.logger.error(err) - }) -} - -function calculateVRMBrokerURL(portalId) { - let sum = 0 - const lowered = portalId.toLowerCase() - for (let i = 0; i < lowered.length; i++) { - sum = sum + lowered.charCodeAt(i) - } - return `mqtt${sum % 128}.victronenergy.com` -} - -// for VRM portalInfos contains an array of objects with `portalId` and `name` -// belonging to the VRM account -Loader.prototype.connectVRM = function (portalInfos) { - if (this.app.config.secrets.vrmToken) { - const enabled = portalInfos.filter((info) => { - return this.app.config.settings.vrm.enabledPortalIds.indexOf(info.portalId) !== -1 - }) - - enabled.forEach((info) => { - const port = 8883 - const address = calculateVRMBrokerURL(info.portalId) - - if (!this.vrmConnections[info.portalId]) { - this.vrmConnections[info.portalId] = { - address: address, - portalId: info.portalId, - } - - this.connect(address, port, info, true) - .then((client) => { - this.vrmConnections[info.portalId].client = client - }) - .catch((err) => { - delete this.vrmConnections[info.portalId] - this.logger.error(err) - }) - } - }) - } -} - -function formatClientRemoteAddress(client) { - return `${client.options?.host}:${client.options?.port}` -} - -Loader.prototype.setupClient = function (client, info, isVrm) { - client.on("connect", () => { - this.logger.info(`MQTT connected to ${formatClientRemoteAddress(client)}`) - if (info.portalId === undefined) { - // we do not know the portalId yet (manual connection) - this.logger.info("Detecting portalId...") - client.subscribe("N/+/#") - client.venusNeedsID = true - } else { - // we do know the portalId already (vrm + upnp connection) - this.logger.info("Subscribing to portalId %s", info.portalId) - client.subscribe(`N/${info.portalId}/settings/0/Settings/SystemSetup/SystemName`) - client.subscribe(`N/${info.portalId}/#`) - client.publish(`R/${info.portalId}/settings/0/Settings/SystemSetup/SystemName`) - client.publish(`R/${info.portalId}/system/0/Serial`) - client.portalId = info.portalId - client.venusNeedsID = false - } - if (!client.venusKeepAlive) { - client.isFirstKeepAliveRequest = true - client.venusKeepAlive = setInterval(this.keepAlive.bind(this, client), keepAliveInterval * 1000) - this.logger.debug(`starting keep alive timer for ${client.portalId} [${client.venusKeepAlive}]`) - } - }) - - client.on("message", (topic, message) => this.onMessage(client, topic, message)) - - client.on("error", (error) => { - this.logger.error(`MQTT connection to ${formatClientRemoteAddress(client)}, ${error}`) - }) - - client.on("close", () => { - this.logger.debug(`MQTT connection to ${formatClientRemoteAddress(client)} closed`) - - if (client.venusKeepAlive) { - this.logger.debug(`clearing keep alive timer for ${client.portalId} [${client.venusKeepAlive}]`) - clearInterval(client.venusKeepAlive) - delete client.venusKeepAlive - } - - if (isVrm) { - delete this.vrmConnections[client.portalId] - } - }) - client.on("offline", () => { - this.logger.debug(`MQTT connection to ${formatClientRemoteAddress(client)} offline`) - }) - client.on("end", () => { - this.logger.info(`MQTT connection to ${formatClientRemoteAddress(client)} ended`) - }) - client.on("reconnect", () => { - this.logger.debug(`MQTT reconnecting to ${formatClientRemoteAddress(client)}`) - }) -} - -Loader.prototype.connect = function (address, port, info, isVrm = false) { - return new Promise((resolve, _reject) => { - const clientId = Math.random().toString(16).slice(3) - let options - if (isVrm) { - options = { - rejectUnauthorized: false, - username: `${this.app.config.secrets.vrmUsername}`, - password: `Token ${this.app.config.secrets.vrmToken}`, - // use random clientId + vrmTokenId to identify this loader instance - clientId: `venus_influx_loader_${buildVersion}_${clientId}_${this.app.config.secrets.vrmTokenId}`, - reconnectPeriod: 10_000, - } - } else { - options = { - // use random clientId to identify this loader instance - clientId: `venus_influx_loader_${buildVersion}_${clientId}`, - reconnectPeriod: 10_000, - } - } - this.logger.info(`MQTT connecting to ${address}:${port} using clientId: ${options.clientId}`) - const client = mqtt.connect(`${isVrm ? "mqtts" : "mqtt"}:${address}:${port}`, options) - this.setupClient(client, info, isVrm) - resolve(client) - }) -} - -Loader.prototype.collectStats = function () { - //this.logger.debug('collecting stats...') - - let measurementCount = 0 - _.keys(this.deviceStats).forEach((id) => { - const stats = this.deviceStats[id] - stats.measurementRate = (stats.measurementCount - stats.lastIntervalCount) / collectStatsInterval - stats.lastIntervalCount = stats.measurementCount - measurementCount += this.deviceMeasurements[id].length - }) - - const stats = { - measurementRate: (this.totalCount - this.lastIntervalCount) / collectStatsInterval, - measurementCount: measurementCount, - deviceStatistics: this.deviceStats, - } - - this.lastIntervalCount = this.totalCount - - this.app.lastStats = stats - - this.app.emit("serverevent", { - type: "LOADERSTATISTICS", - data: stats, - }) -} - -module.exports = Loader diff --git a/src/server/loader.ts b/src/server/loader.ts new file mode 100644 index 0000000..e40c3bd --- /dev/null +++ b/src/server/loader.ts @@ -0,0 +1,434 @@ +import mqtt, { MqttClient } from "mqtt" +import ignoredMeasurements from "./ignoredMeasurements.js" +// @ts-expect-error +import buildInfo from "../buildInfo.cjs" +const buildVersion = buildInfo.buildVersion +import { Server } from "./server" +import { Logger } from "winston" +import { ConfiguredDevice, DiscoveredDevice, LoaderStatistics } from "../shared/state.js" + +const collectStatsInterval = 5 +const keepAliveInterval = 30 + +export class Loader { + server: Server + logger: Logger + + upnpConnections: { [portalId: string]: VenusMqttClient } = {} + manualConnections: { [address: string]: VenusMqttClient } = {} + vrmConnections: { [portalId: string]: VenusMqttClient } = {} + + loaderStatistics: LoaderStatistics = { distinctMeasurementsCount: 0, measurementRate: 0, deviceStatistics: {} } + lastIntervalCount = 0 + + collectInterval: any = undefined + + constructor(server: Server) { + this.server = server + this.logger = server.getLogger("loader") + } + + async start() { + // whenever settings change, we disconnect/reconnect as needed + this.server.on("settingsChanged", () => { + this.settingsChanged() + }) + + // whenever we discover new UPNP device, we disconnect/reconenct as needed + this.server.on("upnpDiscovered", (_device) => { + this.updateUpnpDeviceConnections() + }) + + // start sending loader statistics + this.server.emit("loaderevent", { + type: "LOADERSTATISTICS", + data: {}, + }) + this.collectInterval = setInterval(() => { + this.collectStats() + }, collectStatsInterval * 1000) + + // initiate connections to configured devices + this.settingsChanged() + } + + collectStats() { + // this.logger.debug("collectStats") + + let distinctMeasurementsCount = 0 + let totalMeasurementsCount = 0 + Object.keys(this.loaderStatistics.deviceStatistics).forEach((key) => { + let stats = this.loaderStatistics.deviceStatistics[key] + stats.measurementRate = (stats.totalMeasurementsCount - stats.lastIntervalCount) / collectStatsInterval + stats.lastIntervalCount = stats.totalMeasurementsCount + distinctMeasurementsCount += stats.distinctMeasurementsCount + totalMeasurementsCount += stats.totalMeasurementsCount + this.loaderStatistics.deviceStatistics[key] = stats + }) + + this.loaderStatistics = { + measurementRate: (totalMeasurementsCount - this.lastIntervalCount) / collectStatsInterval, + distinctMeasurementsCount: distinctMeasurementsCount, + deviceStatistics: this.loaderStatistics.deviceStatistics, + } + + this.lastIntervalCount = totalMeasurementsCount + + this.server.emit("loaderevent", { + type: "LOADERSTATISTICS", + data: this.loaderStatistics, + }) + } + + private settingsChanged() { + this.logger.debug("settingsChanged") + + this.updateUpnpDeviceConnections() + this.updateHostnameDeviceConnections() + this.updateVrmDeviceConnections() + } + + private updateUpnpDeviceConnections() { + // check what devices are enabled + // and compute what devices should be disabled + const config = this.server.config.upnp + const enabled = config.enabled ? config.enabledPortalIds : [] + const disabled = arrayDifference(Object.keys(this.upnpConnections), enabled) + // disconnect from Venus devices that are no longer enabled + disabled.forEach((portalId) => { + this.upnpConnections[portalId].stop() + delete this.upnpConnections[portalId] + }) + // connect to Venus devices that are enabled + enabled.forEach((portalId) => this.initiateUpnpDeviceConnection(this.server.upnpDevices[portalId])) + } + + private updateHostnameDeviceConnections() { + // check what devices are enabled + // and compute what devices should be disabled + const config = this.server.config.manual + const enabled = config.enabled + ? config.hosts.reduce((result: string[], host) => { + if (host.enabled) { + result.push(host.hostName) + } + return result + }, []) + : [] + const disabled = arrayDifference(Object.keys(this.manualConnections), enabled) + // disconnect from Venus devices that are no longer enabled + disabled.forEach((hostName) => { + this.manualConnections[hostName].stop() + delete this.manualConnections[hostName] + }) + // connect to Venus devices that are enabled + enabled.forEach((hostName) => this.initiateHostnameDeviceConnection(hostName)) + } + + private updateVrmDeviceConnections() { + // check what devices are enabled + // and compute what devices should be disabled + const config = this.server.config.vrm + const enabled = config.enabled ? config.enabledPortalIds : [] + const disabled = arrayDifference(Object.keys(this.vrmConnections), enabled) + // disconnect from Venus devices that are no longer enabled + disabled.forEach((portalId) => { + this.vrmConnections[portalId].stop() + delete this.vrmConnections[portalId] + }) + // connect to Venus devices that are enabled + enabled.forEach((portalId) => this.initiateVrmDeviceConnection(portalId)) + } + + private async initiateUpnpDeviceConnection(d: DiscoveredDevice) { + if (d === undefined) return + if (this.upnpConnections[d.portalId]) { + return + } + const device: ConfiguredDevice = { type: "UPNP", address: d.address, portalId: d.portalId } + this.logger.debug(`initiateUpnpDeviceConnection: ${JSON.stringify(device)}`) + const mqttClient = new VenusMqttClient(this, device) + this.upnpConnections[d.portalId] = mqttClient + await mqttClient.start() + } + + private async initiateHostnameDeviceConnection(hostName: string) { + if (hostName === undefined) return + if (this.manualConnections[hostName]) { + return + } + const device: ConfiguredDevice = { type: "IP", address: hostName } + this.logger.debug(`initiateHostnameDeviceConnection: ${JSON.stringify(device)}`) + const mqttClient = new VenusMqttClient(this, device) + this.manualConnections[hostName] = mqttClient + await mqttClient.start() + } + + private async initiateVrmDeviceConnection(portalId: string) { + if (portalId === undefined) return + if (this.vrmConnections[portalId]) { + return + } + const device: ConfiguredDevice = { type: "VRM", portalId: portalId, address: this.calculateVrmBrokerURL(portalId) } + this.logger.debug(`initiateVrmDeviceConnection: ${JSON.stringify(device)}`) + const mqttClient = new VenusMqttClient(this, device, true) + this.vrmConnections[portalId] = mqttClient + await mqttClient.start() + } + + private calculateVrmBrokerURL(portalId: string) { + let sum = 0 + const lowered = portalId.toLowerCase() + for (let i = 0; i < lowered.length; i++) { + sum = sum + lowered.charCodeAt(i) + } + return `mqtt${sum % 128}.victronenergy.com` + } +} + +class VenusMqttClient { + loader: Loader + logger: Logger + device: ConfiguredDevice + client!: MqttClient + address: string + port: number + isVrm: boolean + isFirstKeepAliveRequest: boolean = true + isDetectingPortalId: boolean = true + venusKeepAlive: any + + constructor(loader: Loader, device: ConfiguredDevice, isVrm = false) { + this.loader = loader + this.logger = loader.server.getLogger(`${device.type}:${device.portalId ?? device.address}`) + this.address = device.address!! + this.port = isVrm ? 8883 : 1883 + this.device = device + this.isVrm = isVrm + + this.setupStatistics() + } + + async start() { + this.logger.info("start") + return new Promise((resolve, _reject) => { + const clientId = Math.random().toString(16).slice(3) + let options + if (this.isVrm) { + options = { + rejectUnauthorized: false, + username: `${this.loader.server.secrets.vrmUsername}`, + password: `Token ${this.loader.server.secrets.vrmToken}`, + // use random clientId + vrmTokenId to identify this loader instance + clientId: `venus_influx_loader_${buildVersion}_${clientId}_${this.loader.server.secrets.vrmTokenId}`, + reconnectPeriod: 10_000, + } + } else { + options = { + // use random clientId to identify this loader instance + clientId: `venus_influx_loader_${buildVersion}_${clientId}`, + reconnectPeriod: 10_000, + } + } + this.logger.info(`MQTT connecting to ${this.address}:${this.port} using clientId: ${options.clientId}`) + this.client = mqtt.connect(`${this.isVrm ? "mqtts" : "mqtt"}:${this.address}:${this.port}`, options) + this.setupMqttClient() + resolve(this) + }) + } + + async stop() { + this.logger.info("stop") + this.client.end(true) + } + + async changeExpirationDate() { + // TODO: change expiration date, and if expired, disconnect + } + + private setupMqttClient() { + this.client.on("connect", () => { + this.logger.info(`MQTT connected to ${this.clientRemoteAddress}`) + if (this.device.portalId === undefined) { + // we do not know the portalId yet (manual connection) + this.logger.info("Detecting portalId...") + this.client.subscribe("N/+/#") + this.isDetectingPortalId = true + } else { + // we do know the portalId already (vrm + upnp connection) + this.logger.info("Subscribing to portalId %s", this.device.portalId) + this.client.subscribe(`N/${this.device.portalId}/settings/0/Settings/SystemSetup/SystemName`) + this.client.subscribe(`N/${this.device.portalId}/#`) + this.client.publish(`R/${this.device.portalId}/settings/0/Settings/SystemSetup/SystemName`, "") + this.client.publish(`R/${this.device.portalId}/system/0/Serial`, "") + this.isDetectingPortalId = false + } + if (!this.venusKeepAlive) { + this.isFirstKeepAliveRequest = true + this.venusKeepAlive = setInterval(() => { + this.keepAlive() + }, keepAliveInterval * 1000) + this.logger.debug(`Starting keep alive timer`) + } + }) + + this.client.on("message", (topic, message) => this.onMessage(topic, message)) + + this.client.on("error", (error) => { + this.logger.error(`MQTT connection to ${this.clientRemoteAddress}, ${error}`) + }) + + this.client.on("close", () => { + this.logger.debug(`MQTT connection to ${this.clientRemoteAddress} closed`) + + if (this.venusKeepAlive) { + this.logger.debug(`Clearing keep alive timer`) + clearInterval(this.venusKeepAlive) + this.venusKeepAlive = undefined + } + }) + + this.client.on("offline", () => { + this.logger.debug(`MQTT connection to ${this.clientRemoteAddress} offline`) + + // update stats + if (this.device.portalId) { + this.loader.loaderStatistics.deviceStatistics[this.device.portalId].isConnected = false + } + }) + + this.client.on("end", () => { + this.logger.info(`MQTT connection to ${this.clientRemoteAddress} ended`) + }) + + this.client.on("reconnect", () => { + this.logger.debug(`MQTT reconnecting to ${this.clientRemoteAddress}`) + }) + } + + private onMessage(topic: string, message: Buffer) { + // this.logger.debug(`${topic}: ${message}`) + + if (message === undefined || message == null || message.length === 0) { + return + } + + const split = topic.split("/") + const id = split[1] + const instanceNumber = split[3] + + split.splice(0, 2) + split.splice(1, 1) + const measurement = split.join("/") + + if (ignoredMeasurements.find((path) => measurement.startsWith(path))) { + return + } + + try { + const json = JSON.parse(message.toString("utf-8")) + + //this.logger.debug(`${id} ${instanceNumber} ${measurement} ${json.value}`) + + // detect portalId for manual connections + if (this.isDetectingPortalId && measurement === "system/Serial") { + this.logger.info("Detected portalId %s", json.value) + this.client.subscribe(`N/${json.value}/settings/0/Settings/SystemSetup/SystemName`) + this.client.subscribe(`N/${json.value}/#`) + this.client.publish(`R/${json.value}/settings/0/Settings/SystemSetup/SystemName`, "") + this.client.publish(`R/${json.value}/system/0/Serial`, "") + this.isDetectingPortalId = false + this.device.portalId = json.value + return + } + + // detect portalName for all connections + if (this.device.name === undefined) { + if (measurement === "settings/Settings/SystemSetup/SystemName") { + if (json.value.length === 0) { + this.device.name = id + } else { + this.logger.info("Detected portalName %s", json.value) + this.device.name = json.value + } + } + return + } + + // update stats + this.updateStatistics(measurement) + + this.loader.server.influxdb.store(id, this.device.name!!, instanceNumber, measurement, json.value) + } catch (error) { + this.logger.error(`can't record ${topic}: ${message}`) + this.logger.error(error) + } + } + + private sendKeepAlive(isFirstKeepAliveRequest: boolean) { + this.logger.debug(`sendKeepAlive: isFirstKeepAliveRequest: ${isFirstKeepAliveRequest}`) + this.client.publish( + `R/${this.device.portalId}/system/0/Serial`, + isFirstKeepAliveRequest ? "" : '{ "keepalive-options" : ["suppress-republish"] }', + ) + } + + private keepAlive() { + if (this.device.portalId) { + this.sendKeepAlive(this.isFirstKeepAliveRequest) + this.isFirstKeepAliveRequest = false + } + } + + get clientRemoteAddress() { + return `${this.client.options?.host}:${this.client.options?.port}` + } + + get statisticsKey() { + switch (this.device.type) { + case "UPNP": + return `${this.device.type}:${this.device.address}:${this.device.portalId!!}` + case "VRM": + return `${this.device.type}:${this.device.address}:${this.device.portalId!!}` + case "IP": + default: + return `${this.device.type}:${this.device.address}:` + } + } + + private setupStatistics() { + // prepare empty slot for device statistics + this.loader.loaderStatistics.deviceStatistics[this.statisticsKey] = { + type: this.device.type, + address: this.device.address, + isConnected: false, + name: this.device.name || this.device.portalId!!, + totalMeasurementsCount: 0, + distinctMeasurementsCount: 0, + measurementRate: 0, + lastIntervalCount: 0, + lastMeasurement: undefined, + } + } + + distinctMeasurements: Set = new Set() + + private updateStatistics(measurement: string) { + // no portalId -> wait + if (this.device.portalId === undefined) return + + // remember how many distinct measurements each device receives + this.distinctMeasurements.add(measurement) + + let portalStats = this.loader.loaderStatistics.deviceStatistics[this.statisticsKey]!! + portalStats.isConnected = true + portalStats.name = this.device.name || this.device.portalId!! + portalStats.totalMeasurementsCount++ + portalStats.distinctMeasurementsCount = this.distinctMeasurements.size + portalStats.lastMeasurement = new Date() + } +} + +function arrayDifference(arr1: T[], arr2: T[]): T[] { + return arr1.filter((item) => !arr2.includes(item)) +} diff --git a/src/server/logger.ts b/src/server/logger.ts index 6901551..306991c 100644 --- a/src/server/logger.ts +++ b/src/server/logger.ts @@ -26,7 +26,7 @@ export class LogStorageTransport extends Transport { this.entries.splice(0, this.entries.length - this.size) } - this.server.emit("serverevent", { + this.server.emit("loaderevent", { type: "LOG", data: entry, }) diff --git a/src/server/server.cjs b/src/server/server.cjs deleted file mode 100644 index 4433080..0000000 --- a/src/server/server.cjs +++ /dev/null @@ -1,345 +0,0 @@ -const express = require("express") -const path = require("node:path") -const http = require("node:http") -const _ = require("lodash") -const fs = require("node:fs/promises") -const createRootLogger = require("./logger.cjs") -const Loader = require("./loader.cjs") -const InfluxDB = require("./influxdb.cjs") -const bodyParser = require("body-parser") -const compare = require("tsscmp") -const auth = require("basic-auth") - -const defaultInfluxDBURL = new URL(process.env.VIL_INFLUXDB_URL || "http://influxdb:8086") -const defaultInfluxDBUsername = process.env.VIL_INFLUXDB_USERNAME || "" -const defaultInfluxDBPassword = process.env.VIL_INFLUXDB_PASSWORD || "" -const defaultInfluxDBDatabase = "venus" -const defaultInfluxDBRetention = "30d" - -const defaultAdminUsername = "admin" -const defaultAdminPassword = "admin" - -async function loadSecrets(app) { - try { - const contents = await fs.readFile(app.config.secretsLocation) - app.config.secrets = JSON.parse(contents) - } catch { - app.config.secrets = {} - } -} - -async function loadConfig(app) { - try { - const contents = await fs.readFile(app.config.configLocation) - app.config.settings = JSON.parse(contents) - } catch { - app.config.settings = { - upnp: { - enabled: false, - enabledPortalIds: [], - }, - vrm: { - enabled: false, - enabledPortalIds: [], - }, - manual: { - enabled: false, - hosts: [], - }, - influxdb: { - host: defaultInfluxDBURL.hostname, - port: defaultInfluxDBURL.port, - username: defaultInfluxDBUsername, - password: defaultInfluxDBPassword, - database: defaultInfluxDBDatabase, - retention: defaultInfluxDBRetention, - }, - } - - try { - await fs.writeFile(app.config.configLocation, JSON.stringify(app.config.settings, null, 2)) - } catch (error) { - app.logger.error(error) - } - } -} - -async function saveConfig(app, cb) { - try { - await fs.writeFile(app.config.configLocation, JSON.stringify(app.config.settings, null, 2)) - } catch (error) { - app.logger.error(error) - if (cb) { - cb(error) - } - } -} - -class Server { - constructor(options) { - const app = express() - this.app = app - - app.config = { - configLocation: path.join(options.configPath, "config.json"), - secretsLocation: path.join(options.configPath, "secrets.json"), - } - - app.saveSettings = (cb) => { - saveConfig(app, cb) - } - - app.options = options - app.started = false - } - - async start() { - const self = this - const app = this.app - - createRootLogger(app, "venus-influx-loader", "info") - - await loadSecrets(app) - await loadConfig(app) - - if (app.config.settings.debug) { - app.rootLogger.level = "debug" - } - - app.debug("Settings %j", app.config.settings) - - // TODO: clean this event handling up - app.lastServerEvents = {} - app.on("serverevent", (event) => { - if (event.type) { - app.lastServerEvents[event.type] = event - } - }) - - // TODO: clean upnp event handling up - app.upnpDiscovered = {} - - app.on("upnpDiscoveryDidStart", (_info) => { - app.upnpDiscovered = {} - app.emit("serverevent", { - type: "UPNPDISCOVERY", - data: [], - }) - }) - - app.on("upnpDiscoveryDidStop", (_info) => { - app.upnpDiscovered = {} - app.emit("serverevent", { - type: "UPNPDISCOVERY", - data: [], - }) - }) - - app.on("upnpDiscovered", (info) => { - if (_.isUndefined(app.upnpDiscovered[info.portalId])) { - app.upnpDiscovered[info.portalId] = info - app.info("Found new UPNP device %j", info) - - app.emit("serverevent", { - type: "UPNPDISCOVERY", - data: _.values(app.upnpDiscovered), - }) - } - }) - - // TODO: clean vrm event handling up - app.vrmDiscovered = [] - - app.emit("serverevent", { - type: "VRMDISCOVERY", - data: [], - }) - - app.on("vrmDiscovered", (devices) => { - app.vrmDiscovered = devices - app.emit("vrmDiscoveredChanged", app.vrmDiscovered) - app.debug("Found vrm devices %j", devices) - - app.emit("serverevent", { - type: "VRMDISCOVERY", - data: devices, - }) - }) - - app.emit("serverevent", { - type: "DEBUG", - data: app.logger.level === "debug", - }) - - app.on("vrmStatus", (status) => { - app.emit("serverevent", { - type: "VRMSTATUS", - data: status, - }) - }) - - app.upnp = require("./upnp.cjs")(this.app) - app.upnpLogger = app.upnp.logger - - app.vrm = require("./vrm.cjs")(this.app) - app.loader = new Loader(app) - app.influxdb = new InfluxDB(app) - - app.influxdb.start() - app.loader.start() - app.emit("settingsChanged", app.config.settings) - - // TODO: this is called from many places, clean up and clarify - function settingsChanged(settings) { - if (settings.upnp.enabled && app.upnp.isRunning() === false) { - if (!app.options.discoveryApiEndpoint) { - app.upnp.start() - } - } - if (!settings.upnp.enabled && app.upnp.isRunning()) { - if (!app.options.discoveryApiEndpoint) { - app.upnp.stop() - } - } - - if (settings.vrm.enabled && _.keys(app.vrmDiscovered).length === 0) { - /* - app.vrmDiscovered = [] - app.emit('serverevent', { - type: 'VRMDISCOVERY', - data: [] - }) - */ - app.vrm.loadPortalIDs() - } - if (!settings.vrm.enabled && _.keys(app.vrmDiscovered).length > 0) { - /* - app.vrmDiscovered = [] - app.emit('serverevent', { - type: 'VRMDISCOVERY', - data: [] - }) - */ - } - - app.emit("serverevent", { - type: "SETTINGSCHANGED", - data: settings, - }) - } - - app.on("settingsChanged", settingsChanged) - app.emit("settingsChanged", app.config.settings) - - app.emit("serverevent", { - type: "GRAFANA_URL", - data: app.options.grafanaUrl, - }) - app.emit("serverevent", { - type: "EDIT_SECURITY_SETTINGS_ENABLED", - data: app.options.showEditSecuritySettings, - }) - app.emit("serverevent", { - type: "EDIT_INFLUXDB_SETTINGS_ENABLED", - data: app.options.showEditInfluxDBSettings, - }) - app.emit("serverevent", { - type: "EDIT_DISCOVERY_SETTINGS_ENABLED", - data: app.options.showEditDiscoverySettings, - }) - app.emit("serverevent", { - type: "EDIT_MANUAL_SETTINGS_ENABLED", - data: app.options.showEditManualSettings, - }) - app.emit("serverevent", { - type: "EDIT_VRM_SETTINGS_ENABLED", - data: app.options.showEditVRMSettings, - }) - - return new Promise((resolve, _reject) => { - app.server = http.createServer(app) - - app.use(bodyParser.json()) - - // basic auth - const adminCredentials = (req, res, next) => { - const credentials = auth(req) - let login = app.config.secrets.login - if (!login) { - login = { - username: defaultAdminUsername, - password: defaultAdminPassword, - } - } - - if ( - !credentials || - compare(credentials.name, login.username) === false || - compare(credentials.pass, login.password) === false - ) { - res.statusCode = 401 - res.setHeader("WWW-Authenticate", 'Basic realm="example"') - res.status(401).send() - } else { - next() - } - } - - // setup /admin-api routes and authentication, if enabled - if (app.options.adminApiEndpoint) { - app.logger.info(`setting up ${app.options.adminApiEndpoint} routes`) - - // setup /admin-api basic auth, if enabled - if (app.options.adminApiEndpointAuthEnabled) { - app.use("/admin", adminCredentials) - } - app.use("/admin", express.static(path.join(__dirname, "../client"))) - app.get("/", (req, res) => { - res.redirect("/admin") - }) - - // setup /admin-api basic auth, if enabled - if (app.options.adminApiEndpointAuthEnabled) { - app.use(app.options.adminApiEndpoint, adminCredentials) - } - app.use(app.options.adminApiEndpoint, require("./admin-api.cjs")(app)) - - app.websocket = require("./websocket.cjs")(app) - app.websocket.start() - } - - // setup /discovery-api routes, if enabled - if (app.options.discoveryApiEndpoint) { - app.logger.info(`setting up ${app.options.discoveryApiEndpoint} routes`) - app.use(app.options.discoveryApiEndpoint, require("./discovery-api.cjs")(app)) - } - - // setup /grafana-api routes, if enabled - if (app.options.grafanaApiEndpoint) { - app.logger.info(`setting up ${app.options.grafanaApiEndpoint} routes`) - app.use(app.options.grafanaApiEndpoint, require("./grafana-api.cjs")(app)) - } - - // listen - const primaryPort = app.options.port - app.server.listen(primaryPort, function (_err) { - app.logger.info(`running at 0.0.0.0:${primaryPort}`) - app.started = true - resolve(self) - }) - }) - } - - async stop(cb) { - if (!this.app.started) { - return - } - this.app.debug("Closing server...") - await this.app.server.close() - this.app.debug("Server closed") - this.app.started = false - cb && cb() - } -} - -module.exports = Server diff --git a/src/server/server.ts b/src/server/server.ts new file mode 100644 index 0000000..a72a85e --- /dev/null +++ b/src/server/server.ts @@ -0,0 +1,455 @@ +import express from "express" +import path from "node:path" +import http from "node:http" +import fs from "node:fs/promises" +import { createRootLogger, LogStorageTransport } from "./logger.js" +import { InfluxDBBackend } from "./influxdb" +import { WebSocketChannel } from "./websocket.js" +import bodyParser from "body-parser" +import compare from "tsscmp" +import auth from "basic-auth" +import { AppConfig, AppConfigFiles, AppSecrets, createAppConfig, createAppSecrets, LogLevel } from "../shared/types" +import { LogEntry, Logger } from "winston" +import { UPNP } from "./upnp" +import { VRM } from "./vrm" +import { AppStateActionType, DiscoveredDevice, VRMStatus } from "../shared/state.js" +import { Loader } from "./loader.js" + +const defaultInfluxDBURL = new URL(process.env.VIL_INFLUXDB_URL || "http://influxdb:8086") +const defaultInfluxDBUsername = process.env.VIL_INFLUXDB_USERNAME || "" +const defaultInfluxDBPassword = process.env.VIL_INFLUXDB_PASSWORD || "" +const defaultInfluxDBDatabase = "venus" +const defaultInfluxDBRetention = "30d" + +const defaultAdminUsername = "admin" +const defaultAdminPassword = "admin" + +export interface ServerOptions { + configPath: string + port: number + grafanaUrl: string + discoveryApiEndpoint?: string + adminApiEndpoint?: string + adminApiEndpointAuthEnabled: boolean + grafanaApiEndpoint?: string + showEditDiscoverySettings: boolean + showEditVRMSettings: boolean + showEditManualSettings: boolean + showEditSecuritySettings: boolean + showEditInfluxDBSettings: boolean +} + +export class Server { + private app: express.Express + httpServer!: http.Server + private websocket!: WebSocketChannel + isRunning: boolean = false + private options: ServerOptions + + private configFiles: AppConfigFiles + config!: AppConfig + secrets!: AppSecrets + + private rootLogger: Logger + private logTransport: LogStorageTransport + logger: Logger + + influxdb!: InfluxDBBackend + // TODO: convert to ES class + loader!: any + upnp!: UPNP + vrm!: VRM + + constructor(options: ServerOptions) { + const app = express() + this.app = app + + this.options = options + + this.configFiles = { + configLocation: path.join(options.configPath, "config.json"), + secretsLocation: path.join(options.configPath, "secrets.json"), + } + + const x = createRootLogger(this, "info") + this.rootLogger = x.rootLogger + this.logTransport = x.logTransport + + this.logger = this.getLogger("server") + } + + async start() { + const self = this + const app = this.app + + // TODO: we should probably exit when we can not read secrets/config + // TODO: or shall we create a default config and try starting? + this.secrets = await this.loadSecrets() + this.config = await this.loadConfig() + + // if (this.config.debug) { + this.rootLogger.level = "debug" + // } + + this.logger.debug("Starting server...") + + // setup listeners + this.setupEventListeners() + + // start influxdb writer + this.influxdb = new InfluxDBBackend(this) + this.influxdb.start() + + // start loader + this.loader = new Loader(this) + this.loader.start() + + // prepare upnp browser + this.upnp = new UPNP(this as ServerMock) + + // prepare VRM helper + this.vrm = new VRM(this) + + // emit initial server state + this.emitInitialServerState() + + // create http server + this.httpServer = http.createServer(app) + + app.use(bodyParser.json()) + + // basic auth + const adminCredentials = (req: express.Request, res: express.Response, next: express.NextFunction) => { + const credentials = auth(req) + let login = this.secrets.login + if ( + !credentials || + compare(credentials.name, login?.username ?? defaultAdminUsername) === false || + compare(credentials.pass, login?.password ?? defaultAdminPassword) === false + ) { + res.statusCode = 401 + res.setHeader("WWW-Authenticate", 'Basic realm="venus-influx-loader"') + res.status(401).send() + } else { + next() + } + } + + // setup /admin-api routes and authentication, if enabled + if (this.options.adminApiEndpoint) { + this.logger.info(`Setting up ${this.options.adminApiEndpoint} routes`) + + // setup /admin-api basic auth, if enabled + if (this.options.adminApiEndpointAuthEnabled) { + app.use("/admin", adminCredentials) + } + app.use("/admin", express.static(path.join(__dirname, "../client"))) + app.get("/", (req, res) => { + res.redirect("/admin") + }) + + // setup /admin-api basic auth, if enabled + if (this.options.adminApiEndpointAuthEnabled) { + app.use(this.options.adminApiEndpoint, adminCredentials) + } + + const configureAdminRoutes = (await import("./admin-api")).default + app.use(this.options.adminApiEndpoint, configureAdminRoutes(this)) + + const configureVRMRoutes = (await import("./vrm-api")).default + app.use(this.options.adminApiEndpoint, configureVRMRoutes(this)) + + // prepare websocket channel for communication with Admin UI + this.websocket = new WebSocketChannel(this) + this.websocket.start() + } + + // setup /discovery-api routes, if enabled + if (this.options.discoveryApiEndpoint) { + this.logger.info(`Setting up ${this.options.discoveryApiEndpoint} routes`) + app.use(this.options.discoveryApiEndpoint, require("./discovery-api.cjs")(app)) + } + + // setup /grafana-api routes, if enabled + if (this.options.grafanaApiEndpoint) { + this.logger.info(`Setting up ${this.options.grafanaApiEndpoint} routes`) + const configureGrafanaApiRoutes = (await import("./grafana-api")).default + app.use(this.options.grafanaApiEndpoint, configureGrafanaApiRoutes(this)) + } + + // listen + return new Promise((resolve, _reject) => { + const primaryPort = this.options.port + this.httpServer.listen(primaryPort, () => { + this.logger.info(`Server started, listening at *:${primaryPort}`) + this.logger.debug("Starting server...") + this.isRunning = true + resolve(self) + }) + }) + } + + async stop(cb?: () => void) { + if (!this.isRunning) { + return + } + this.logger.debug("Stopping server...") + this.httpServer.close() + this.logger.debug("Server stopped.") + this.isRunning = false + cb && cb() + } + + get isDebugEnabled(): boolean { + return this.rootLogger.level === "debug" + } + + set isDebugEnabled(value: boolean) { + this.rootLogger.level = value ? "debug" : "info" + this.logger.log(this.rootLogger.level, `Log level changed to: ${this.rootLogger.level}`) + } + + get logEntries(): LogEntry[] { + return this.logTransport.entries + } + + getLogger(label: string): Logger { + return this.rootLogger.child({ label: label }) + } + + loaderState: { [type: string]: any } = {} + + upnpDevices: { [portalId: string]: DiscoveredDevice } = {} + vrmDevices: { [portalId: string]: DiscoveredDevice } = {} + + setupEventListeners() { + this.on("loaderevent", (event) => { + if (event.type) { + this.loaderState[event.type] = event + } + }) + + this.on("upnpDiscoveryDidStart", () => { + this.upnpDevices = {} + this.emit("loaderevent", { + type: "UPNPDISCOVERY", + data: [], + }) + }) + + this.on("upnpDiscoveryDidStop", () => { + this.upnpDevices = {} + this.emit("loaderevent", { + type: "UPNPDISCOVERY", + data: [], + }) + }) + + this.on("upnpDiscovered", (device) => { + if (this.upnpDevices[device.portalId] === undefined) { + this.upnpDevices[device.portalId] = device + // TODO: duplicate log from upnp and here + this.logger.info("Found new UPNP device %j", device) + + this.emit("loaderevent", { + type: "UPNPDISCOVERY", + data: Object.values(this.upnpDevices), + }) + } + }) + + this.on("vrmDiscovered", (devices) => { + let initial: { [portalId: string]: DiscoveredDevice } = {} + this.vrmDevices = devices.reduce((result, device) => { + result[device.portalId] = device + return result + }, initial) + // TODO: duplicate log from vrm and here + this.logger.debug("Found VRM devices %j", devices) + + this.emit("loaderevent", { + type: "VRMDISCOVERY", + data: devices, + }) + }) + + this.on("vrmStatus", (status) => { + this.emit("loaderevent", { + type: "VRMSTATUS", + data: status, + }) + }) + + this.on("settingsChanged", () => { + this.settingsChanged() + }) + } + + emitInitialServerState() { + this.emit("loaderevent", { + type: "DEBUG", + data: this.logger.level === "debug", + }) + + this.emit("settingsChanged", this.config) + + this.emit("loaderevent", { + type: "GRAFANA_URL", + data: this.options.grafanaUrl, + }) + this.emit("loaderevent", { + type: "EDIT_SECURITY_SETTINGS_ENABLED", + data: this.options.showEditSecuritySettings, + }) + this.emit("loaderevent", { + type: "EDIT_INFLUXDB_SETTINGS_ENABLED", + data: this.options.showEditInfluxDBSettings, + }) + this.emit("loaderevent", { + type: "EDIT_DISCOVERY_SETTINGS_ENABLED", + data: this.options.showEditDiscoverySettings, + }) + this.emit("loaderevent", { + type: "EDIT_MANUAL_SETTINGS_ENABLED", + data: this.options.showEditManualSettings, + }) + this.emit("loaderevent", { + type: "EDIT_VRM_SETTINGS_ENABLED", + data: this.options.showEditVRMSettings, + }) + + // TODO: here? + this.emit("loaderevent", { + type: "VRMDISCOVERY", + data: [], + }) + } + + settingsChanged() { + this.logger.debug("Settings changed...") + + // start local upnp browser if enabled and not running + if (this.config.upnp.enabled && !this.upnp.isRunning) { + if (!this.options.discoveryApiEndpoint) { + this.upnp.start() + } + } + + // stop local upnp browser if enabled and running + if (!this.config.upnp.enabled && this.upnp.isRunning) { + if (!this.options.discoveryApiEndpoint) { + this.upnp.stop() + } + } + + // TODO: do we need this? + // // reload VRM portals if vrm enabled and not running + // if (this.config.vrm.enabled && Object.keys(this.vrmDevices).length === 0) { + // this.vrm.refresh() + // } + // // clear VRM discovered + // if (!this.config.vrm.enabled && Object.keys(this.vrmDevices).length > 0) { + // } + + this.emit("loaderevent", { + type: "SETTINGSCHANGED", + data: this.config, + }) + } + + async loadSecrets(): Promise { + const location = this.configFiles.secretsLocation + try { + this.logger.info(`Loading secrets from: ${location}...`) + const contents = await fs.readFile(location, "utf-8") + return JSON.parse(contents) as AppSecrets + } catch (error) { + this.logger.error(`Failed loading secrets from: ${location}, error: ${error}.`) + return createAppSecrets({ + login: { + username: defaultAdminUsername, + password: defaultAdminPassword, + }, + }) + } + } + + async loadConfig(): Promise { + const location = this.configFiles.configLocation + try { + this.logger.info(`Loading config from: ${location}...`) + const contents = await fs.readFile(location, "utf-8") + return JSON.parse(contents) as AppConfig + } catch (error) { + this.logger.error(`Failed loading config from: ${location}, error: ${error}.`) + return createAppConfig({ + influxdb: { + host: defaultInfluxDBURL.hostname, + port: defaultInfluxDBURL.port, + username: defaultInfluxDBUsername, + password: defaultInfluxDBPassword, + database: defaultInfluxDBDatabase, + retention: defaultInfluxDBRetention, + }, + }) + } + } + + async saveConfig() { + const location = this.configFiles.configLocation + try { + this.logger.info(`Saving config to: ${location}...`) + await fs.writeFile(location, JSON.stringify(this.config, null, 2)) + this.emit("settingsChanged", this.config) + } catch (error) { + this.logger.error(`Failed saving config to: ${location}, error: ${error}.`) + throw error + } + } + + async saveSecrets() { + const location = this.configFiles.secretsLocation + try { + this.logger.info(`Saving secrets to: ${location}...`) + await fs.writeFile(location, JSON.stringify(this.secrets, null, 2)) + } catch (error) { + this.logger.error(`Failed saving secrets to: ${location}, error: ${error}.`) + throw error + } + } + + // typed variant of EventEmitter.emit + emit(event: K, data: ServerEvents[K]) { + this.app.emit(event, data) + } + + // typed variant of EventEmitter.on + on(event: K, listener: (_data: ServerEvents[K]) => void) { + // @ts-expect-error + return this.app.on(event, listener) + } + + // typed variant of EventEmitter.removeListener + removeListener(event: K, listener: (_data: ServerEvents[K]) => void) { + this.app.removeListener(event, listener) + } +} + +export interface ServerEvents { + loaderevent: { type: AppStateActionType; data: any } + settingsChanged: AppConfig + upnpDiscoveryDidStart: {} + upnpDiscoveryDidStop: {} + upnpDiscovered: DiscoveredDevice + vrmStatus: VRMStatus + vrmDiscovered: DiscoveredDevice[] +} + +export interface ServerMock { + getLogger: (_label: string) => LoggerMock + emit: (_type: string, _data: any) => void +} + +export interface LoggerMock { + log: (_level: LogLevel, _message: string) => void +} diff --git a/src/server/vrm.ts b/src/server/vrm.ts index 3af784b..9cf8436 100644 --- a/src/server/vrm.ts +++ b/src/server/vrm.ts @@ -158,6 +158,7 @@ export class VRM { interface VRMAPIUsersInstallationsRecord { identifier: string name: string + mqtt_host: string } interface VRMAPIUsersInstallations { @@ -175,7 +176,7 @@ export class VRM { if (res.status === 200) { const devices = response.records.map((record) => { - return { portalId: String(record.identifier), name: record.name } + return { portalId: String(record.identifier), name: record.name, address: record.mqtt_host } }) this.server.emit("vrmDiscovered", devices) this.good("Installations Retrieved") diff --git a/src/server/websocket.ts b/src/server/websocket.ts index 1953301..51ca585 100644 --- a/src/server/websocket.ts +++ b/src/server/websocket.ts @@ -35,14 +35,14 @@ export class WebSocketChannel { spark.write(event) } - this.server.app.on("serverevent", onServerEvent) + this.server.on("loaderevent", onServerEvent) // @ts-ignore spark.onDisconnects.push(() => { - this.server.app.removeListener("serverevent", onServerEvent) + this.server.removeListener("loaderevent", onServerEvent) }) - Object.entries(this.server.lastServerEvents).forEach(([type, event]) => { + Object.entries(this.server.loaderState).forEach(([type, event]) => { if (type !== "LOG") { spark.write(event) } diff --git a/src/shared/state.ts b/src/shared/state.ts index e135bc2..d0bc667 100644 --- a/src/shared/state.ts +++ b/src/shared/state.ts @@ -34,7 +34,15 @@ export interface AppStateWebSocketOpenAction extends AppStateBaseAction { data: WebSocket } -export type DiscoveredDevice = { portalId: string; name?: string; address?: string } +// for discovered device we always know portalId and address, and derive name from MQTT +export type DiscoveredDevice = { portalId: string; name?: string; address: string } +// for manually configured device we know address, and derive name and portalId from MQTT +export type ConfiguredDevice = { + type: "UPNP" | "VRM" | "IP" + portalId?: string + name?: string + address: string +} export interface AppStateUPNPDiscoveryAction extends AppStateBaseAction { type: "UPNPDISCOVERY" @@ -56,21 +64,25 @@ export interface AppStateVRMStatusAction extends AppStateBaseAction { data: VRMStatus } -export interface DeviceDetails { +export interface DeviceStatisticsDetails { + type: "UPNP" | "VRM" | "IP" + address: string name: string + isConnected: boolean measurementRate: number - measurementCount: number + totalMeasurementsCount: number lastIntervalCount: number - lastMeasurement: string // TODO: Date + distinctMeasurementsCount: number + lastMeasurement?: Date } export interface DeviceStatistics { - [key: string]: DeviceDetails + [key: string]: DeviceStatisticsDetails } export interface LoaderStatistics { measurementRate: number - measurementCount: number + distinctMeasurementsCount: number deviceStatistics: DeviceStatistics }