forked from mmagr/iotagent-mosca
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
343 lines (304 loc) · 9.99 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
#!/usr/bin/node
var mosca = require('mosca');
var iotalib = require('@dojot/iotagent-nodejs');
var dojotLogger = require("@dojot/dojot-module-logger");
var logger = dojotLogger.logger;
var config = require('./config');
// Base iot-agent
logger.debug("Initializing IoT agent...");
var iota = new iotalib.IoTAgent();
iota.init();
logger.debug("... IoT agent was initialized");
logger.debug("Initializing configuration endpoints...");
var bodyParser = require("body-parser");
var express = require("express");
var app = express();
app.use(bodyParser.json());
dojotLogger.addLoggerEndpoint(app);
app.listen(10001, () => {
logger.info(`Listening on port 10001.`);
});
logger.debug("... configuration endpoints were initialized");
// Local device cache
//
// Once a MQTT client is authorized by the server,
// its corresponding dojot device is added to the cache
// and kept there while the client is connected.
// The clientId which MUST match the pattern tenant:deviceId
// is used as the cache's key.
const cache = new Map();
// Mosca Settings
var moscaSettings = {};
var mosca_backend = {
type: 'redis',
redis: require('redis'),
db: 12,
port: config.backend_port,
return_buffers: true, // to handle binary payloads
host: config.backend_host
};
// MQTT with TLS and client certificate
if (config.mosca_tls.enabled === 'true') {
moscaSettings = {
backend: mosca_backend,
persistence: {
factory: mosca.persistence.Redis,
host: mosca_backend.host
},
type: "mqtts", // important to only use mqtts, not mqtt
credentials:
{ // contains all security information
keyPath: config.mosca_tls.key,
certPath: config.mosca_tls.cert,
caPaths: [config.mosca_tls.ca],
requestCert: true, // enable requesting certificate from clients
rejectUnauthorized: true // only accept clients with valid certificate
},
secure: {
port: 8883 // 8883 is the standard mqtts port
}
};
}
// MQTT without TLS
// (should only be used for debugging purposes or in a private environment)
else {
moscaSettings = {
port: 1883,
backend: mosca_backend,
persistence: {
factory: mosca.persistence.Redis,
host: mosca_backend.host
}
};
}
var server = new mosca.Server(moscaSettings);
// Fired when mosca server is ready
server.on('ready', () => {
logger.info('Mosca server is up and running');
// callbacks
if (config.mosca_tls.enabled === 'true') {
server.authenticate = authenticate;
}
// Always check whether device is doing the right thing.
server.authorizePublish = authorizePublish;
server.authorizeSubscribe = authorizeSubscribe;
});
// Helper Function to parse MQTT clientId
function parseClientIdOrTopic(clientId, topic) {
if (clientId && (typeof clientId === 'string')) {
let parsedData = clientId.match(/^(\w+):(\w+)$/);
if (parsedData) {
return { tenant: parsedData[1], device: parsedData[2] };
}
}
// If we're here, it means that TLS is not configured
// so fallback to topic-based id scheme
result = topic.match(/^\/([^/]+)\/([^/]+)/)
if (result) {
let exist = false;
logger.debug(`will attempt to use topic as tenant source ${result}`);
exist = iota.messenger.tenants.some((tenant) => {
if (result[1] === tenant) {
return true;
}
});
if (exist) {
return ({ tenant: result[1], device: result[2] });
}
logger.debug(`invalid tenant: ${result[1]}`);
return;
}
return;
}
// Function to authenticate the MQTT client
function authenticate(client, username, password, callback) {
logger.debug('Authenticating MQTT client', client.id);
// Condition 1: client.id follows the pattern tenant:deviceId
// Get tenant and deviceId from client.id
let ids = parseClientIdOrTopic(client.id);
if (!ids) {
//reject client connection
callback(null, false);
logger.warn(`Connection rejected for ${client.id}. Invalid clientId.`);
return;
}
// Condition 2: Client certificate belongs to the
// device identified in the clientId
// TODO: the clientId must contain the tenant too!
if (config.mosca_tls.enabled === 'true') {
clientCertificate = client.connection.stream.getPeerCertificate();
if (!clientCertificate.hasOwnProperty('subject') ||
!clientCertificate.subject.hasOwnProperty('CN') ||
clientCertificate.subject.CN !== ids.device) {
//reject client connection
callback(null, false);
logger.warn(`Connection rejected for ${client.id}. Invalid client certificate.`);
return;
}
}
// Condition 3: Device exists in dojot
iota.getDevice(ids.device, ids.tenant).then((device) => {
// add device to cache
cache.set(client.id, { client });
//authorize client connection
callback(null, true);
logger.debug('Connection authorized for', client.id);
}).catch((error) => {
//reject client connection
callback(null, false);
logger.warn(`Connection rejected for ${client.id}. Device doesn't exist in dojot.`);
})
}
// Function to authourize client to publish to
// topic: {tenant}/{deviceId}/attrs
function authorizePublish(client, topic, payload, callback) {
logger.debug(`Authorizing MQTT client ${client.id} to publish to ${topic}`);
let ids = parseClientIdOrTopic(client.id, topic);
if (!ids) {
callback(null, false);
logger.warn(`Rejected client ${client.id} to publish to topic ${topic}`);
return;
}
let expectedTopic = `/${ids.tenant}/${ids.device}/attrs`;
logger.debug(`Expected topic is ${expectedTopic}`);
logger.debug(`Device published on topic ${topic}`);
if (topic === expectedTopic) {
// authorize
callback(null, true);
logger.debug(`Authorized client ${client.id} to publish to topic ${topic}`);
return;
}
//reject
callback(null, false);
logger.warn(`Rejected client ${client.id} to publish to topic ${topic}`);
}
// Function to authorize client to subscribe to
// topic: {tenant}/{deviceId}/config
function authorizeSubscribe(client, topic, callback) {
logger.debug(`Authorizing client ${client.id} to subscribe to ${topic}`);
let ids = parseClientIdOrTopic(client.id, topic);
if (!ids) {
//reject client connection
callback(null, false);
logger.warn(`Connection rejected for ${client.id}. Invalid clientId.`);
return;
}
let expectedTopic = `/${ids.tenant}/${ids.device}/config`;
if (topic === expectedTopic) {
// authorize
callback(null, true);
logger.debug(`Authorized client ${client.id} to subscribe to topic ${topic}`);
return;
}
//reject
callback(null, false);
logger.warn(`Rejected client ${client.id} to subscribe to topic ${topic}`);
}
// Fired when a client connects to mosca server
server.on('clientConnected', function (client) {
logger.info('client up', client.id);
// TODO: notify dojot that device is online?
});
// Fired when a client disconnects from mosca server
server.on('clientDisconnected', function (client) {
logger.info('client down', client.id);
// delete device from cache
cache.delete(client.id);
});
// Fired when a message is received by mosca server
// (from device to dojot)
server.on('published', function (packet, client) {
// ignore meta (internal) topics
if ((packet.topic.split('/')[0] == '$SYS') ||
(client === undefined) || (client === null)) {
logger.debug('ignoring internal message', packet.topic, client);
return;
}
// handle packet
let data;
try {
data = JSON.parse(packet.payload.toString());
}
catch (e) {
logger.warn('Payload is not valid JSON. Ignoring.', packet.payload.toString(), e);
return;
}
logger.debug('Published', packet.topic, data, client.id);
//TODO: support only ISO string???
let metadata = {};
if ("timestamp" in data) {
metadata = { timestamp: 0 };
// If it is a number, just copy it. Probably Unix time.
if (typeof data.timestamp === "number") {
if (!isNaN(data.timestamp)) {
metadata.timestamp = data.timestamp;
}
else {
logger.warn("Received an invalid timestamp (NaN)");
metadata = {};
}
}
else {
// If it is a ISO string...
const parsed = Date.parse(data.timestamp);
if (!isNaN(parsed)) {
metadata.timestamp = parsed;
}
else {
// Invalid timestamp.
metadata = {};
}
}
}
//send data to dojot broker
let ids = parseClientIdOrTopic(client.id, packet.topic);
iota.updateAttrs(ids.device, ids.tenant, data, metadata);
});
// Fired when a device.configure event is received
// (from dojot to device)
iota.messenger.on('iotagent.device', 'device.configure', (tenant, event) => {
logger.debug('Got configure event from Device Manager', event)
// device id
let deviceId = event.data.id;
delete event.data.id;
// topic
// For now, we are still using slashes at the beginning. In the future,
// this will be removed (and topics will look like 'admin/efac/config')
// let topic = `${tenant}/${deviceId}/config`;
let topic = `/${tenant}/${deviceId}/config`;
// device
// let cacheEntry = cache.get(`${tenant}:${deviceId}`);
// if (cacheEntry) {
let message = {
'topic': topic,
'payload': JSON.stringify(event.data.attrs),
'qos': 0,
'retain': false
};
// send data to device
logger.debug('Publishing', message)
server.publish(message, () => { logger.debug('Message out!!') });
// TODO: send message/state(=sent) to history
// }
// else {
// logger.debug(`Discading event because device is disconnected`);
// // TODO: send message/state(=discarded) to history
// }
});
const deleteAndDisconnectCacheDevice = (event) => {
const id = event.data.id;
const tenant = event.meta.service;
let cacheEntry = cache.get(`${tenant}:${id}`);
if (cacheEntry) {
let { client } = cache.get(`${tenant}:${id}`);
if (client) {
client.close();
}
cache.delete(`${tenant}:${id}`);
}
}
// // Fired when a device.remove event is received
iota.messenger.on('iotagent.device', 'device.remove', (tenant, event) => {
logger.debug('Got device.remove event from Device Manager', tenant);
deleteAndDisconnectCacheDevice(event);
});