Skip to content

Commit

Permalink
Refactor topics using reference list
Browse files Browse the repository at this point in the history
  • Loading branch information
hi-liang committed Jun 18, 2024
1 parent ac38228 commit aa8777f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
19 changes: 13 additions & 6 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const jose = require('jose');

const {runExpress} = require('./express_server');
const {asyncForEach, asyncMapForEach, filterNulls, flatten} = require('./utils');
import TOPICS from './topics';

let jwk;
if (config.jwt_public_keyfile) {
Expand Down Expand Up @@ -83,7 +84,6 @@ async function runMQTT() {
connectOpts.password = config.jwt_service_token;
}
mqttClient = await mqtt.connectAsync(config.mqtt.uri, connectOpts);
const SCENE_TOPIC_BASE = config.mqtt.topic_realm + '/s/#';
console.log('Connected to MQTT');
mqttClient.on('offline', async () => {
if (expireTimer) {
Expand Down Expand Up @@ -119,7 +119,10 @@ async function runMQTT() {
console.log(err);
});
try {
await mqttClient.subscribe(SCENE_TOPIC_BASE, {
await mqttClient.subscribe(TOPICS.SUBSCRIBE.SCENE_PUBLIC.formatStr({
nameSpace: '+',
sceneName: '+',
}), {
qos: 1,
}).then(async () => {
expirations = new Map();
Expand Down Expand Up @@ -371,7 +374,6 @@ async function handleLoadTemplate(arenaObj) {
const createArenaObj = async (
// eslint-disable-next-line camelcase
object_id, type, realm, namespace, sceneId, attributes, persist, ttl) => {
const topic = `realm/s/${namespace}/${sceneId}`;
let expireAt;
const msg = {
// eslint-disable-next-line camelcase
Expand Down Expand Up @@ -409,7 +411,10 @@ const createArenaObj = async (
} catch (err) {
console.log('Error creating arena object', object_id, err);
}
await mqttClient.publish(topic, JSON.stringify(msg));
await mqttClient.publish(TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: namespace,
sceneName: sceneId,
}), JSON.stringify(msg));
};

/**
Expand Down Expand Up @@ -487,12 +492,14 @@ const publishExpires = async () => {
const now = new Date();
await asyncMapForEach(expirations, async (obj, key) => {
if (obj.expireAt < now) {
const topic = `${obj.realm}/s/${obj.namespace}/${obj.sceneId}`;
const msg = {
object_id: obj.object_id,
action: 'delete',
};
await mqttClient.publish(topic, JSON.stringify(msg));
await mqttClient.publish(TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: obj.namespace,
sceneName: obj.sceneId,
}), JSON.stringify(msg));
expirations.delete(key);
persists.delete(key);
await ArenaObject.deleteMany({
Expand Down
6 changes: 6 additions & 0 deletions utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ exports.filterNulls = (obj) => {
}
return [sets, unSets];
};

// eslint-disable-next-line no-extend-native
String.prototype.formatStr = function formatStr(...args) {
const params = arguments.length === 1 && typeof args[0] === 'object' ? args[0] : args;
return this.replace(/\{([^}]+)\}/g, (match, key) => (typeof params[key] !== 'undefined' ? params[key] : match));
};

0 comments on commit aa8777f

Please sign in to comment.