Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add json-query to MQTT monitor type #3857

Merged
merged 6 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions db/knex_migrations/2023-10-08-0000-mqtt-query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
exports.up = function (knex) {
// Add new column monitor.mqtt_check_type
return knex.schema
.alterTable("monitor", function (table) {
table.string("mqtt_check_type", 255).notNullable().defaultTo("keyword");
});

};

exports.down = function (knex) {
// Drop column monitor.mqtt_check_type
return knex.schema
.alterTable("monitor", function (table) {
table.dropColumn("mqtt_check_type");
});
};
11 changes: 2 additions & 9 deletions server/model/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { Prometheus } = require("../prometheus");
const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, MAX_INTERVAL_SECOND, MIN_INTERVAL_SECOND,
SQL_DATETIME_FORMAT, isDev, sleep, getRandomInt
} = require("../../src/util");
const { tcping, ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, mqttAsync, setSetting, httpNtlm, radius, grpcQuery,
const { tcping, ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, setSetting, httpNtlm, radius, grpcQuery,
redisPingAsync, mongodbPing, kafkaProducerAsync, getOidcTokenClientCredentials, rootCertificatesFingerprints, axiosAbortSignal
} = require("../util-server");
const { R } = require("redbean-node");
Expand Down Expand Up @@ -133,6 +133,7 @@ class Monitor extends BeanModel {
maintenance: await Monitor.isUnderMaintenance(this.id),
mqttTopic: this.mqttTopic,
mqttSuccessMessage: this.mqttSuccessMessage,
mqttCheckType: this.mqttCheckType,
databaseQuery: this.databaseQuery,
authMethod: this.authMethod,
grpcUrl: this.grpcUrl,
Expand Down Expand Up @@ -755,14 +756,6 @@ class Monitor extends BeanModel {
} else {
throw Error("Container State is " + res.data.State.Status);
}
} else if (this.type === "mqtt") {
bean.msg = await mqttAsync(this.hostname, this.mqttTopic, this.mqttSuccessMessage, {
port: this.port,
username: this.mqttUsername,
password: this.mqttPassword,
interval: this.interval,
});
bean.status = UP;
} else if (this.type === "sqlserver") {
let startTime = dayjs().valueOf();

Expand Down
121 changes: 121 additions & 0 deletions server/monitor-types/mqtt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
const { MonitorType } = require("./monitor-type");
const { log, UP } = require("../../src/util");
const mqtt = require("mqtt");
const jsonata = require("jsonata");

class MqttMonitorType extends MonitorType {

name = "mqtt";

/**
* Run the monitoring check on the MQTT monitor
* @param {Monitor} monitor Monitor to check
* @param {Heartbeat} heartbeat Monitor heartbeat to update
* @param {UptimeKumaServer} server Uptime Kuma server
* @returns {Promise<void>}
*/
async check(monitor, heartbeat, server) {
const receivedMessage = await this.mqttAsync(monitor.hostname, monitor.mqttTopic, {
port: monitor.port,
username: monitor.mqttUsername,
password: monitor.mqttPassword,
interval: monitor.interval,
});

if (monitor.mqttCheckType == null || monitor.mqttCheckType === "") {
// use old default
monitor.mqttCheckType = "keyword";
}

if (monitor.mqttCheckType === "keyword") {
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
heartbeat.msg = `Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`;
heartbeat.status = UP;
} else {
throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
}
} else if (monitor.mqttCheckType === "json-query") {
const parsedMessage = JSON.parse(receivedMessage);

let expression = jsonata(monitor.jsonPath);

let result = await expression.evaluate(parsedMessage);

if (result?.toString() === monitor.expectedValue) {
heartbeat.msg = "Message received, expected value is found";
heartbeat.status = UP;
} else {
throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]");
}
} else {
throw Error("Unknown MQTT Check Type");
}
}

/**
* Connect to MQTT Broker, subscribe to topic and receive message as String
* @param {string} hostname Hostname / address of machine to test
* @param {string} topic MQTT topic
* @param {object} options MQTT options. Contains port, username,
* password and interval (interval defaults to 20)
* @returns {Promise<string>} Received MQTT message
*/
mqttAsync(hostname, topic, options = {}) {
return new Promise((resolve, reject) => {
const { port, username, password, interval = 20 } = options;

// Adds MQTT protocol to the hostname if not already present
if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) {
hostname = "mqtt://" + hostname;
}

const timeoutID = setTimeout(() => {
log.debug("mqtt", "MQTT timeout triggered");
client.end();
reject(new Error("Timeout, Message not received"));
}, interval * 1000 * 0.8);

const mqttUrl = `${hostname}:${port}`;

log.debug("mqtt", `MQTT connecting to ${mqttUrl}`);

let client = mqtt.connect(mqttUrl, {
username,
password
});

client.on("connect", () => {
log.debug("mqtt", "MQTT connected");

try {
client.subscribe(topic, () => {
log.debug("mqtt", "MQTT subscribed to topic");
});
} catch (e) {
client.end();
clearTimeout(timeoutID);
reject(new Error("Cannot subscribe topic"));
}
});

client.on("error", (error) => {
client.end();
clearTimeout(timeoutID);
reject(error);
});

client.on("message", (messageTopic, message) => {
if (messageTopic === topic) {
client.end();
clearTimeout(timeoutID);
resolve(message.toString("utf8"));
}
});

});
}
}

module.exports = {
MqttMonitorType,
};
1 change: 1 addition & 0 deletions server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ let needSetup = false;
bean.mqttPassword = monitor.mqttPassword;
bean.mqttTopic = monitor.mqttTopic;
bean.mqttSuccessMessage = monitor.mqttSuccessMessage;
bean.mqttCheckType = monitor.mqttCheckType;
bean.databaseConnectionString = monitor.databaseConnectionString;
bean.databaseQuery = monitor.databaseQuery;
bean.authMethod = monitor.authMethod;
Expand Down
2 changes: 2 additions & 0 deletions server/uptime-kuma-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
UptimeKumaServer.monitorTypeList["real-browser"] = new RealBrowserMonitorType();
UptimeKumaServer.monitorTypeList["tailscale-ping"] = new TailscalePing();
UptimeKumaServer.monitorTypeList["dns"] = new DnsMonitorType();
UptimeKumaServer.monitorTypeList["mqtt"] = new MqttMonitorType();

this.io = new Server(this.httpServer);
}
Expand Down Expand Up @@ -460,7 +461,7 @@
return await this.startMonitor(monitorID);
}

/**

Check warning on line 464 in server/uptime-kuma-server.js

View workflow job for this annotation

GitHub Actions / check-linters

Missing JSDoc @returns declaration
* Check if monitors are running properly
*/
async checkMonitors() {
Expand Down Expand Up @@ -521,3 +522,4 @@
const { RealBrowserMonitorType } = require("./monitor-types/real-browser-monitor-type");
const { TailscalePing } = require("./monitor-types/tailscale-ping");
const { DnsMonitorType } = require("./monitor-types/dns");
const { MqttMonitorType } = require("./monitor-types/mqtt");
68 changes: 0 additions & 68 deletions server/util-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const { Resolver } = require("dns");
const childProcess = require("child_process");
const iconv = require("iconv-lite");
const chardet = require("chardet");
const mqtt = require("mqtt");
const chroma = require("chroma-js");
const { badgeConstants } = require("./config");
const mssql = require("mssql");
Expand Down Expand Up @@ -173,73 +172,6 @@ exports.pingAsync = function (hostname, ipv6 = false, size = 56) {
});
};

/**
* MQTT Monitor
* @param {string} hostname Hostname / address of machine to test
* @param {string} topic MQTT topic
* @param {string} okMessage Expected result
* @param {object} options MQTT options. Contains port, username,
* password and interval (interval defaults to 20)
* @returns {Promise<string>} Received MQTT message
*/
exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
return new Promise((resolve, reject) => {
const { port, username, password, interval = 20 } = options;

// Adds MQTT protocol to the hostname if not already present
if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) {
hostname = "mqtt://" + hostname;
}

const timeoutID = setTimeout(() => {
log.debug("mqtt", "MQTT timeout triggered");
client.end();
reject(new Error("Timeout"));
}, interval * 1000 * 0.8);

const mqttUrl = `${hostname}:${port}`;

log.debug("mqtt", `MQTT connecting to ${mqttUrl}`);

let client = mqtt.connect(mqttUrl, {
username,
password
});

client.on("connect", () => {
log.debug("mqtt", "MQTT connected");

try {
log.debug("mqtt", "MQTT subscribe topic");
client.subscribe(topic);
} catch (e) {
client.end();
clearTimeout(timeoutID);
reject(new Error("Cannot subscribe topic"));
}
});

client.on("error", (error) => {
client.end();
clearTimeout(timeoutID);
reject(error);
});

client.on("message", (messageTopic, message) => {
if (messageTopic === topic) {
client.end();
clearTimeout(timeoutID);
if (okMessage != null && okMessage !== "" && message.toString() !== okMessage) {
reject(new Error(`Message Mismatch - Topic: ${messageTopic}; Message: ${message.toString()}`));
} else {
resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`);
}
}
});

});
};

/**
* Monitor Kafka using Producer
* @param {string[]} brokers List of kafka brokers to connect, host and
Expand Down
4 changes: 2 additions & 2 deletions src/lang/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@
"Current User": "Current User",
"topic": "Topic",
"topicExplanation": "MQTT topic to monitor",
"successMessage": "Success Message",
"successMessageExplanation": "MQTT message that will be considered as success",
"successKeyword": "Success Keyword",
"successKeywordExplanation": "MQTT Keyword that will be considered as success",
"recent": "Recent",
"Reset Token": "Reset Token",
"Done": "Done",
Expand Down
29 changes: 26 additions & 3 deletions src/pages/EditMonitor.vue
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,33 @@
</div>

<div class="my-3">
<label for="mqttSuccessMessage" class="form-label">MQTT {{ $t("successMessage") }}</label>
<input id="mqttSuccessMessage" v-model="monitor.mqttSuccessMessage" type="text" class="form-control">
<label for="mqttCheckType" class="form-label">MQTT {{ $t("Check Type") }}</label>
<select id="mqttCheckType" v-model="monitor.mqttCheckType" class="form-select" required>
<option value="keyword">{{ $t("Keyword") }}</option>
<option value="json-query">{{ $t("Json Query") }}</option>
</select>
</div>

<div v-if="monitor.mqttCheckType === 'keyword'" class="my-3">
<label for="mqttSuccessKeyword" class="form-label">MQTT {{ $t("successKeyword") }}</label>
<input id="mqttSuccessKeyword" v-model="monitor.mqttSuccessMessage" type="text" class="form-control">
<div class="form-text">
{{ $t("successMessageExplanation") }}
{{ $t("successKeywordExplanation") }}
</div>
</div>

<!-- Json Query -->
<div v-if="monitor.mqttCheckType === 'json-query'" class="my-3">
<label for="jsonPath" class="form-label">{{ $t("Json Query") }}</label>
<input id="jsonPath" v-model="monitor.jsonPath" type="text" class="form-control" required>

<!-- eslint-disable-next-line vue/no-v-html -->
<div class="form-text" v-html="$t('jsonQueryDescription')">
</div>
<br>

<label for="expectedValue" class="form-label">{{ $t("Expected Value") }}</label>
<input id="expectedValue" v-model="monitor.expectedValue" type="text" class="form-control" required>
</div>
</template>

Expand Down Expand Up @@ -884,6 +906,7 @@ const monitorDefaults = {
mqttPassword: "",
mqttTopic: "",
mqttSuccessMessage: "",
mqttCheckType: "keyword",
authMethod: null,
oauth_auth_method: "client_secret_basic",
httpBodyEncoding: "json",
Expand Down
Loading