Skip to content

Commit

Permalink
feat!: Refactored topic structure for more granular flow and access (#26
Browse files Browse the repository at this point in the history
)

* Refactor topics using reference list

* Validate topic and obj payload ids

This is a core of our topic-based ACLs

* hotfix runtime errors

* Use defined topic positional tokens

* Update to latest core topics

- None of these affect persist, which only needs public objs

* Update to latest core topics (again)

* Add `private` `program_id` to schema

* Update device pub/sub topics

* update JWT checks for new topic schema

* Use formatStr for MQTT topics

* fix persist build error

* subscribe to new objects-only topic

* ttl seconds is expressed as float

---------

Co-authored-by: Ivan Liang <r33t@cmu.edu>
  • Loading branch information
mwfarb and hi-liang authored Oct 14, 2024
1 parent f331680 commit 6f2a5ed
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 22 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ persisted objects to load upon entering any scene.
- A persisted ARENA object can be updated if it has `action: update` and `persist: true` set in its MQTT message. The
properties in its `data` will be merged on top of the previously saved `data`.
- If an `update` message contains an explicit `overwrite: true`, then the `data` therein will **replace** what is saved in persistence.
- If an `update` message contains an explicit `persist: false`, then the `data` therein will not be updated to persistence.
- If an `update` message contains an explicit `persist: false`, then the `data` therein will not be updated to persistence.

### TTL
Adding a `ttl` (int seconds) to the top level MQTT message for any `create` action signals that the object
Adding a `ttl` (float seconds) to the top level MQTT message for any `create` action signals that the object
will be automatically deleted from peristence after set duration, as well as a correspdoning `delete` action message
sent over pubsub. `ttl` implies that `persist` is `true`.

### Templates

Any scene can be loaded as a **template** into another scene. This effectively clones all objects from the
source scene into the destination scene.
Any scene can be loaded as a **template** into another scene. This effectively clones all objects from the
source scene into the destination scene.

When a template is loaded, a parent container is first created in the target scene. This parent container follows the
object id naming scheme: `templateNamespace|templateSceneId::instanceId`, e.g. `public|lobby::instance_0`.

Then every object inside the designated @template scene is replicated as descendents of the parent container. In this
way, the parent can be repositioned, rotated, or scaled to adjust the template all at once. The objects within
the template follow the naming scheme `templateNamespace|templateSceneId::instanceId::objectId`, e.g. `public|lobby::instance_0::cube1`.
Expand All @@ -45,7 +45,7 @@ To clone an instance of a scene, send a POST request to `/persist/:targetNamespa
{
action: "clone",
namespace: <string>, // name of source scene namespace
sceneId: <string>, // name of source scene sceneId
sceneId: <string>, // name of source scene sceneId
allowNonEmptyTarget: <bool>, // (optional) - set to `true` allow templating into a non-empty destination scene
}
```
Expand All @@ -55,4 +55,4 @@ After the template load, all objects behave as typical in any scene.
*Notes:*

- If a template source scene is empty with no objects, or the instanceid already exists within a target scene, the template
load will fail.
load will fail.
36 changes: 30 additions & 6 deletions express_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const jose = require('jose');

const express = require('express');
const cookieParser = require('cookie-parser');
const {TOPICS} = require('./topics');

// TODO: Does any of this need to be parameterized?
const VERIFY_OPTIONS = {
Expand Down Expand Up @@ -92,15 +93,24 @@ exports.runExpress = async ({
});
checkJWTSubs = (req, res, next) => {
const {sceneId, namespace} = req.params;
const topic = `realm/s/${namespace}/${sceneId}`;
// This specific PUBLISH topic matches for objects
const topic = TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: namespace,
sceneName: sceneId,
objectId: '+',
});
if (!matchJWT(topic, req.jwtPayload.subs)) {
return tokenSubError(res);
}
next();
};
checkJWTPubs = (req, res, next) => {
const {sceneId, namespace} = req.params;
const topic = `realm/s/${namespace}/${sceneId}`;
const topic = TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: namespace,
sceneName: sceneId,
objectId: '+',
});
if (!matchJWT(topic, req.jwtPayload.publ)) {
return tokenPubError(res);
}
Expand All @@ -111,7 +121,12 @@ exports.runExpress = async ({
app.use(express.json());

app.get('/persist/!allscenes', (req, res) => {
if (jwk && !req.jwtPayload.subs.includes('realm/s/#')) { // Must have sub-all rights
const globalTopic = TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: '+',
sceneName: '+',
objectId: '+',
});
if (jwk && !matchJWT(globalTopic, req.jwtPayload.subs)) { // Must have sub-all rights
return tokenSubError(res);
}
ArenaObject.aggregate([
Expand All @@ -137,7 +152,12 @@ exports.runExpress = async ({

app.get('/persist/:namespace/!allscenes', (req, res) => {
const {namespace} = req.params;
if (jwk && !matchJWT(`realm/s/${namespace}/#`, req.jwtPayload.subs)) { // Must have sub-all public rights
const namespaceTopic = TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: namespace,
sceneName: '+',
objectId: '+', // arbitrary object
});
if (jwk && !matchJWT(namespaceTopic, req.jwtPayload.subs)) { // Must have sub-all public rights
return tokenSubError(res);
}
ArenaObject.aggregate([
Expand Down Expand Up @@ -181,8 +201,12 @@ exports.runExpress = async ({
res.status(400);
return res.json('No namespace or sceneId specified');
}
if (!matchJWT(`realm/s/${sourceNamespace}/${sourceSceneId}`,
req.jwtPayload.subs)) {
const srcTopic = TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: sourceNamespace,
sceneName: sourceSceneId,
objectId: '+',
});
if (!matchJWT(srcTopic, req.jwtPayload.subs)) {
return tokenSubError(res);
}
const sourceObjectCount = await ArenaObject.countDocuments(
Expand Down
42 changes: 33 additions & 9 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const jose = require('jose');

const {runExpress} = require('./express_server');
const {asyncForEach, asyncMapForEach, filterNulls, flatten} = require('./utils');
const {TOPICS} = require('./topics');

let jwk;
if (config.jwt_public_keyfile) {
Expand All @@ -32,6 +33,8 @@ const arenaSchema = new mongoose.Schema({
realm: {type: String, required: true, index: true},
namespace: {type: String, required: true, index: true, default: 'public'},
sceneId: {type: String, required: true, index: true},
private: {type: Boolean},
program_id: {type: String},
}, {
timestamps: true,
minimize: false, // Try to enforce attributes being valid object for $set and $unset
Expand Down Expand Up @@ -83,7 +86,6 @@ async function runMQTT() {
connectOpts.password = config.jwt_service_token;
}
mqttClient = await mqtt.connectAsync(config.mqtt.uri, connectOpts);
const SCENE_TOPIC_BASE = config.mqtt.topic_realm + '/s/#';
console.log('Connected to MQTT');
mqttClient.on('offline', async () => {
if (expireTimer) {
Expand Down Expand Up @@ -119,7 +121,11 @@ async function runMQTT() {
console.log(err);
});
try {
await mqttClient.subscribe(SCENE_TOPIC_BASE, {
await mqttClient.subscribe(TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: '+',
sceneName: '+',
objectId: '+',
}), {
qos: 1,
}).then(async () => {
expirations = new Map();
Expand Down Expand Up @@ -150,20 +156,30 @@ async function arenaMsgHandler(topic, message) {
- 1: type [s, n, r, topology, flows]
- 2: namespace
- 3: sceneId
- 4: sceneMsg type
- 5: object_id
- 6: toUid (not relevant for persist)
*/
let msgJSON;
let arenaObj;
const now = new Date();
try {
msgJSON = JSON.parse(message.toString());

// Verify topicObjId is same as json payload id
const topicObjId = topicSplit[TOPICS.TOKENS.UUID];
if (msgJSON.object_id !== topicObjId) {
return;
}

arenaObj = new ArenaObject({
object_id: msgJSON.object_id,
attributes: msgJSON.data,
expireAt: undefined,
type: msgJSON.type,
realm: topicSplit[0],
namespace: topicSplit[2],
sceneId: topicSplit[3],
realm: topicSplit[TOPICS.TOKENS.REALM],
namespace: topicSplit[TOPICS.TOKENS.NAMESPACE],
sceneId: topicSplit[TOPICS.TOKENS.SCENENAME],
});
if (msgJSON.ttl) {
if (msgJSON.persist && msgJSON.persist !== false) {
Expand Down Expand Up @@ -371,7 +387,6 @@ async function handleLoadTemplate(arenaObj) {
const createArenaObj = async (
// eslint-disable-next-line camelcase
object_id, type, realm, namespace, sceneId, attributes, persist, ttl) => {
const topic = `realm/s/${namespace}/${sceneId}`;
let expireAt;
const msg = {
// eslint-disable-next-line camelcase
Expand Down Expand Up @@ -409,7 +424,12 @@ const createArenaObj = async (
} catch (err) {
console.log('Error creating arena object', object_id, err);
}
await mqttClient.publish(topic, JSON.stringify(msg));
await mqttClient.publish(TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: namespace,
sceneName: sceneId,
// eslint-disable-next-line camelcase
objectId: object_id,
}), JSON.stringify(msg));
};

/**
Expand Down Expand Up @@ -487,12 +507,16 @@ const publishExpires = async () => {
const now = new Date();
await asyncMapForEach(expirations, async (obj, key) => {
if (obj.expireAt < now) {
const topic = `${obj.realm}/s/${obj.namespace}/${obj.sceneId}`;
const msg = {
object_id: obj.object_id,
action: 'delete',
};
await mqttClient.publish(topic, JSON.stringify(msg));
await mqttClient.publish(TOPICS.PUBLISH.SCENE_OBJECTS.formatStr({
nameSpace: obj.namespace,
sceneName: obj.sceneId,
// eslint-disable-next-line camelcase
objectId: obj.object_id,
}), JSON.stringify(msg));
expirations.delete(key);
persists.delete(key);
await ArenaObject.deleteMany({
Expand Down
77 changes: 77 additions & 0 deletions topics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* @fileoverview Topic names for ARENA pubsub messages.
*
* Open source software under the terms in /LICENSE
* Copyright (c) 2024 ARENAXR. All rights reserved.
* @date 2024
*/

const config = require('./config.json');

/**
* ARENA pubsub topic variables
* - nameSpace - namespace of the scene
* - sceneName - name of the scene
* - userName - name of the user per arena-auth (e.g. jdoe)
* - idTag - username prefixed with a uuid (e.g. 1448081341_jdoe)
* - camName - idTag prefixed with camera_ (e.g. camera_1448081341_jdoe)
*/

const REALM = config.mqtt.topic_realm;

/* eslint-disable key-spacing */
// prettier-ignore
exports.TOPICS = Object.freeze({
TOKENS: {
REALM: 0,
TYPE: 1,
NAMESPACE: 2,
SCENENAME: 3,
SCENE_MSGTYPE: 4,
UUID: 5,
TO_UID: 6,
},
SCENE_MSGTYPES: {
PRESENCE: 'x',
CHAT: 'c',
USER: 'u',
OBJECTS: 'o',
RENDER: 'r',
ENV: 'e',
PROGRAM: 'p',
DEBUG: 'd',
},
SUBSCRIBE: {
NETWORK: '$NETWORK',
DEVICE: `${REALM}/d/{deviceName}/#`, // All client placeholder
PROC_REG: `${REALM}/proc/reg`,
PROC_CTL: `${REALM}/proc/control/{uuid}/#`,
PROC_DBG: `${REALM}/proc/debug/{uuid}`,
SCENE_PUBLIC: `${REALM}/s/{nameSpace}/{sceneName}/+/+`,
SCENE_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/+/+/{idTag}/#`,
},
PUBLISH: {
NETWORK_LATENCY: '$NETWORK/latency',
DEVICE: `${REALM}/d/{deviceName}/{idTag}`,
PROC_REG: `${REALM}/proc/reg`,
PROC_CTL: `${REALM}/proc/control`,
PROC_DBG: `${REALM}/proc/debug/{uuid}`,
SCENE_PRESENCE: `${REALM}/s/{nameSpace}/{sceneName}/x/{idTag}`,
SCENE_PRESENCE_PRIVATE:`${REALM}/s/{nameSpace}/{sceneName}/x/{idTag}/{toUid}`,
SCENE_CHAT: `${REALM}/s/{nameSpace}/{sceneName}/c/{idTag}`,
SCENE_CHAT_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/c/{idTag}/{toUid}`,
SCENE_USER: `${REALM}/s/{nameSpace}/{sceneName}/u/{userObj}`,
SCENE_USER_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/u/{userObj}/{toUid}`, // Need to add face_ privs
SCENE_OBJECTS: `${REALM}/s/{nameSpace}/{sceneName}/o/{objectId}`, // All client placeholder
SCENE_OBJECTS_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/o/{objectId}/{toUid}`,
SCENE_RENDER: `${REALM}/s/{nameSpace}/{sceneName}/r/{idTag}`,
SCENE_RENDER_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/r/{idTag}/-`, // To avoid unpriv sub
SCENE_ENV: `${REALM}/s/{nameSpace}/{sceneName}/e/{idTag}`,
SCENE_ENV_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/e/{idTag}/-`, // To avoid unpriv sub
SCENE_PROGRAM: `${REALM}/s/{nameSpace}/{sceneName}/p/{idTag}`,
SCENE_PROGRAM_PRIVATE: `${REALM}/s/{nameSpace}/{sceneName}/p/{idTag}/{toUid}`,
SCENE_DEBUG: `${REALM}/s/{nameSpace}/{sceneName}/d/{idTag}/-`, // To avoid unpriv sub
},
});

// export default TOPICS;
6 changes: 6 additions & 0 deletions utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ exports.filterNulls = (obj) => {
}
return [sets, unSets];
};

// eslint-disable-next-line no-extend-native
String.prototype.formatStr = function formatStr(...args) {
const params = arguments.length === 1 && typeof args[0] === 'object' ? args[0] : args;
return this.replace(/\{([^}]+)\}/g, (match, key) => (typeof params[key] !== 'undefined' ? params[key] : match));
};

0 comments on commit 6f2a5ed

Please sign in to comment.