diff --git a/db/patch-added-kafka-producer.sql b/db/patch-added-kafka-producer.sql new file mode 100644 index 0000000000..933d30b8f1 --- /dev/null +++ b/db/patch-added-kafka-producer.sql @@ -0,0 +1,22 @@ +-- You should not modify if this have pushed to Github, unless it does serious wrong with the db. +BEGIN TRANSACTION; + +ALTER TABLE monitor + ADD kafka_producer_topic VARCHAR(255); + +ALTER TABLE monitor + ADD kafka_producer_brokers TEXT; + +ALTER TABLE monitor + ADD kafka_producer_ssl INTEGER; + +ALTER TABLE monitor + ADD kafka_producer_allow_auto_topic_creation VARCHAR(255); + +ALTER TABLE monitor + ADD kafka_producer_sasl_options TEXT; + +ALTER TABLE monitor + ADD kafka_producer_message TEXT; + +COMMIT; diff --git a/package-lock.json b/package-lock.json index 9623da63d0..9b2c464413 100644 --- a/package-lock.json +++ b/package-lock.json @@ -41,6 +41,7 @@ "jsonata": "^2.0.3", "jsonwebtoken": "~9.0.0", "jwt-decode": "~3.1.2", + "kafkajs": "^2.2.4", "limiter": "~2.1.0", "liquidjs": "^10.7.0", "mongodb": "~4.14.0", @@ -13009,6 +13010,14 @@ "resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-3.1.2.tgz", "integrity": "sha512-UfpWE/VZn0iP50d8cz9NrZLM9lSWhcJ+0Gt/nm4by88UL+J1SiKN8/5dkjMmbEzwL2CAe+67GsegCbIKtbp75A==" }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.2", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.2.tgz", diff --git a/package.json b/package.json index 9f3737704e..4d5e58b330 100644 --- a/package.json +++ b/package.json @@ -101,6 +101,7 @@ "jsonata": "^2.0.3", "jsonwebtoken": "~9.0.0", "jwt-decode": "~3.1.2", + "kafkajs": "^2.2.4", "limiter": "~2.1.0", "liquidjs": "^10.7.0", "mongodb": "~4.14.0", diff --git a/server/database.js b/server/database.js index 3e2d0350de..0f4f783252 100644 --- a/server/database.js +++ b/server/database.js @@ -73,6 +73,7 @@ class Database { "patch-add-parent-monitor.sql": true, "patch-add-invert-keyword.sql": true, "patch-added-json-query.sql": true, + "patch-added-kafka-producer.sql": true, }; /** diff --git a/server/model/monitor.js b/server/model/monitor.js index 26da3f34c1..06f6a7d5ea 100644 --- a/server/model/monitor.js +++ b/server/model/monitor.js @@ -6,7 +6,7 @@ const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, TimeLogger, MAX_INTERVA SQL_DATETIME_FORMAT } = require("../../src/util"); const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, mqttAsync, setSetting, httpNtlm, radius, grpcQuery, - redisPingAsync, mongodbPing, + redisPingAsync, mongodbPing, kafkaProducerAsync } = require("../util-server"); const { R } = require("redbean-node"); const { BeanModel } = require("redbean-node/dist/bean-model"); @@ -129,6 +129,11 @@ class Monitor extends BeanModel { httpBodyEncoding: this.httpBodyEncoding, jsonPath: this.jsonPath, expectedValue: this.expectedValue, + kafkaProducerTopic: this.kafkaProducerTopic, + kafkaProducerBrokers: JSON.parse(this.kafkaProducerBrokers), + kafkaProducerSsl: this.kafkaProducerSsl === "1" && true || false, + kafkaProducerAllowAutoTopicCreation: this.kafkaProducerAllowAutoTopicCreation === "1" && true || false, + kafkaProducerMessage: this.kafkaProducerMessage, screenshot, }; @@ -153,6 +158,7 @@ class Monitor extends BeanModel { tlsCa: this.tlsCa, tlsCert: this.tlsCert, tlsKey: this.tlsKey, + kafkaProducerSaslOptions: JSON.parse(this.kafkaProducerSaslOptions), }; } @@ -792,6 +798,24 @@ class Monitor extends BeanModel { bean.ping = dayjs().valueOf() - startTime; } + } else if (this.type === "kafka-producer") { + let startTime = dayjs().valueOf(); + + bean.msg = await kafkaProducerAsync( + JSON.parse(this.kafkaProducerBrokers), + this.kafkaProducerTopic, + this.kafkaProducerMessage, + { + allowAutoTopicCreation: this.kafkaProducerAllowAutoTopicCreation, + ssl: this.kafkaProducerSsl, + clientId: `Uptime-Kuma/${version}`, + interval: this.interval, + }, + JSON.parse(this.kafkaProducerSaslOptions), + ); + bean.status = UP; + bean.ping = dayjs().valueOf() - startTime; + } else { throw new Error("Unknown Monitor Type"); } diff --git a/server/server.js b/server/server.js index d83b7da5a3..5f4ccc4682 100644 --- a/server/server.js +++ b/server/server.js @@ -643,6 +643,9 @@ let needSetup = false; monitor.accepted_statuscodes_json = JSON.stringify(monitor.accepted_statuscodes); delete monitor.accepted_statuscodes; + monitor.kafkaProducerBrokers = JSON.stringify(monitor.kafkaProducerBrokers); + monitor.kafkaProducerSaslOptions = JSON.stringify(monitor.kafkaProducerSaslOptions); + bean.import(monitor); bean.user_id = socket.userID; @@ -757,6 +760,11 @@ let needSetup = false; bean.httpBodyEncoding = monitor.httpBodyEncoding; bean.expectedValue = monitor.expectedValue; bean.jsonPath = monitor.jsonPath; + bean.kafkaProducerTopic = monitor.kafkaProducerTopic; + bean.kafkaProducerBrokers = JSON.stringify(monitor.kafkaProducerBrokers); + bean.kafkaProducerAllowAutoTopicCreation = monitor.kafkaProducerAllowAutoTopicCreation; + bean.kafkaProducerSaslOptions = JSON.stringify(monitor.kafkaProducerSaslOptions); + bean.kafkaProducerMessage = monitor.kafkaProducerMessage; bean.validate(); diff --git a/server/util-server.js b/server/util-server.js index 4ddb6ce35e..031d8b6723 100644 --- a/server/util-server.js +++ b/server/util-server.js @@ -28,8 +28,11 @@ const { } = require("node-radius-utils"); const dayjs = require("dayjs"); -const isWindows = process.platform === /^win/.test(process.platform); +// SASLOptions used in JSDoc +// eslint-disable-next-line no-unused-vars +const { Kafka, SASLOptions } = require("kafkajs"); +const isWindows = process.platform === /^win/.test(process.platform); /** * Init or reset JWT secret * @returns {Promise} @@ -196,6 +199,94 @@ exports.mqttAsync = function (hostname, topic, okMessage, options = {}) { }); }; +/** + * Monitor Kafka using Producer + * @param {string} topic Topic name to produce into + * @param {string} message Message to produce + * @param {Object} [options={interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma"}] + * Kafka client options. Contains ssl, clientId, allowAutoTopicCreation and + * interval (interval defaults to 20, allowAutoTopicCreation defaults to false, clientId defaults to "Uptime-Kuma" + * and ssl defaults to false) + * @param {string[]} brokers List of kafka brokers to connect, host and port joined by ':' + * @param {SASLOptions} [saslOptions={}] Options for kafka client Authentication (SASL) (defaults to + * {}) + * @returns {Promise} + */ +exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, saslOptions = {}) { + return new Promise((resolve, reject) => { + const { interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma" } = options; + + let connectedToKafka = false; + + const timeoutID = setTimeout(() => { + log.debug("kafkaProducer", "KafkaProducer timeout triggered"); + connectedToKafka = true; + reject(new Error("Timeout")); + }, interval * 1000 * 0.8); + + if (saslOptions.mechanism === "None") { + saslOptions = undefined; + } + + let client = new Kafka({ + brokers: brokers, + clientId: clientId, + sasl: saslOptions, + retry: { + retries: 0, + }, + ssl: ssl, + }); + + let producer = client.producer({ + allowAutoTopicCreation: allowAutoTopicCreation, + retry: { + retries: 0, + } + }); + + producer.connect().then( + () => { + try { + producer.send({ + topic: topic, + messages: [{ + value: message, + }], + }); + connectedToKafka = true; + clearTimeout(timeoutID); + resolve("Message sent successfully"); + } catch (e) { + connectedToKafka = true; + producer.disconnect(); + clearTimeout(timeoutID); + reject(new Error("Error sending message: " + e.message)); + } + } + ).catch( + (e) => { + connectedToKafka = true; + producer.disconnect(); + clearTimeout(timeoutID); + reject(new Error("Error in producer connection: " + e.message)); + } + ); + + producer.on("producer.network.request_timeout", (_) => { + clearTimeout(timeoutID); + reject(new Error("producer.network.request_timeout")); + }); + + producer.on("producer.disconnect", (_) => { + if (!connectedToKafka) { + clearTimeout(timeoutID); + reject(new Error("producer.disconnect")); + } + }); + }); +}; + /** * Use NTLM Auth for a http request. * @param {Object} options The http request options diff --git a/src/lang/en.json b/src/lang/en.json index ceafe06cd8..2766591f21 100644 --- a/src/lang/en.json +++ b/src/lang/en.json @@ -768,6 +768,20 @@ "Badge URL": "Badge URL", "Group": "Group", "Monitor Group": "Monitor Group", + "Kafka Brokers": "Kafka Brokers", + "Enter the list of brokers": "Enter the list of brokers", + "Press Enter to add broker": "Press Enter to add broker", + "Kafka Topic Name": "Kafka Topic Name", + "Kafka Producer Message": "Kafka Producer Message", + "Enable Kafka SSL": "Enable Kafka SSL", + "Enable Kafka Producer Auto Topic Creation": "Enable Kafka Producer Auto Topic Creation", + "Kafka SASL Options": "Kafka SASL Options", + "Mechanism": "Mechanism", + "Pick a SASL Mechanism...": "Pick a SASL Mechanism...", + "Authorization Identity": "Authorization Identity", + "AccessKey Id": "AccessKey Id", + "Secret AccessKey": "Secret AccessKey", + "Session Token": "Session Token", "noGroupMonitorMsg": "Not Available. Create a Group Monitor First.", "Close": "Close", "Request Body": "Request Body" diff --git a/src/pages/EditMonitor.vue b/src/pages/EditMonitor.vue index 1ce6227932..0ffef8fe10 100644 --- a/src/pages/EditMonitor.vue +++ b/src/pages/EditMonitor.vue @@ -61,6 +61,9 @@ + @@ -166,6 +169,57 @@ + +
@@ -512,6 +566,56 @@
+ + + +