Skip to content

Commit

Permalink
✨ feat: added kafka producer (#3268)
Browse files Browse the repository at this point in the history
* ✨ feat: added kafka producer

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: eslint warn

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: typings and auth problems

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: better variable name to trrack disconnection

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: grouping Kafka Producer special settings into one template

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* ✨ feat: add kafka producer translations into `en.json`

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: disable close-on-select on kafka broker picker

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: `en.json` invalid json (conflict resolve)

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* Nostr dm notifications (#3051)

* Add nostr DM notification provider

* require crypto for node 18 compatibility

* remove whitespace

Co-authored-by: Frank Elsinga <frank@elsinga.de>

* move closer to where it is used

* simplify success or failure logic

* don't clobber the non-alert msg

* Update server/notification-providers/nostr.js

Co-authored-by: Frank Elsinga <frank@elsinga.de>

* polyfills required for node <= 18

* resolve linter warnings

* missing comma

---------

Co-authored-by: Frank Elsinga <frank@elsinga.de>

* Drop nostr

* Minor

* Fix a bug of clone

---------

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>
Co-authored-by: Frank Elsinga <frank@elsinga.de>
Co-authored-by: Louis Lam <louislam@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 17, 2023
1 parent 084cf01 commit 278b88a
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 3 deletions.
22 changes: 22 additions & 0 deletions db/patch-added-kafka-producer.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- You should not modify if this have pushed to Github, unless it does serious wrong with the db.
BEGIN TRANSACTION;

ALTER TABLE monitor
ADD kafka_producer_topic VARCHAR(255);

ALTER TABLE monitor
ADD kafka_producer_brokers TEXT;

ALTER TABLE monitor
ADD kafka_producer_ssl INTEGER;

ALTER TABLE monitor
ADD kafka_producer_allow_auto_topic_creation VARCHAR(255);

ALTER TABLE monitor
ADD kafka_producer_sasl_options TEXT;

ALTER TABLE monitor
ADD kafka_producer_message TEXT;

COMMIT;
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"jsonata": "^2.0.3",
"jsonwebtoken": "~9.0.0",
"jwt-decode": "~3.1.2",
"kafkajs": "^2.2.4",
"limiter": "~2.1.0",
"liquidjs": "^10.7.0",
"mongodb": "~4.14.0",
Expand Down
1 change: 1 addition & 0 deletions server/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Database {
"patch-add-parent-monitor.sql": true,
"patch-add-invert-keyword.sql": true,
"patch-added-json-query.sql": true,
"patch-added-kafka-producer.sql": true,
};

/**
Expand Down
26 changes: 25 additions & 1 deletion server/model/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, TimeLogger, MAX_INTERVA
SQL_DATETIME_FORMAT
} = require("../../src/util");
const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, mqttAsync, setSetting, httpNtlm, radius, grpcQuery,
redisPingAsync, mongodbPing,
redisPingAsync, mongodbPing, kafkaProducerAsync
} = require("../util-server");
const { R } = require("redbean-node");
const { BeanModel } = require("redbean-node/dist/bean-model");
Expand Down Expand Up @@ -129,6 +129,11 @@ class Monitor extends BeanModel {
httpBodyEncoding: this.httpBodyEncoding,
jsonPath: this.jsonPath,
expectedValue: this.expectedValue,
kafkaProducerTopic: this.kafkaProducerTopic,
kafkaProducerBrokers: JSON.parse(this.kafkaProducerBrokers),
kafkaProducerSsl: this.kafkaProducerSsl === "1" && true || false,
kafkaProducerAllowAutoTopicCreation: this.kafkaProducerAllowAutoTopicCreation === "1" && true || false,
kafkaProducerMessage: this.kafkaProducerMessage,
screenshot,
};

Expand All @@ -153,6 +158,7 @@ class Monitor extends BeanModel {
tlsCa: this.tlsCa,
tlsCert: this.tlsCert,
tlsKey: this.tlsKey,
kafkaProducerSaslOptions: JSON.parse(this.kafkaProducerSaslOptions),
};
}

Expand Down Expand Up @@ -792,6 +798,24 @@ class Monitor extends BeanModel {
bean.ping = dayjs().valueOf() - startTime;
}

} else if (this.type === "kafka-producer") {
let startTime = dayjs().valueOf();

bean.msg = await kafkaProducerAsync(
JSON.parse(this.kafkaProducerBrokers),
this.kafkaProducerTopic,
this.kafkaProducerMessage,
{
allowAutoTopicCreation: this.kafkaProducerAllowAutoTopicCreation,
ssl: this.kafkaProducerSsl,
clientId: `Uptime-Kuma/${version}`,
interval: this.interval,
},
JSON.parse(this.kafkaProducerSaslOptions),
);
bean.status = UP;
bean.ping = dayjs().valueOf() - startTime;

} else {
throw new Error("Unknown Monitor Type");
}
Expand Down
8 changes: 8 additions & 0 deletions server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,9 @@ let needSetup = false;
monitor.accepted_statuscodes_json = JSON.stringify(monitor.accepted_statuscodes);
delete monitor.accepted_statuscodes;

monitor.kafkaProducerBrokers = JSON.stringify(monitor.kafkaProducerBrokers);
monitor.kafkaProducerSaslOptions = JSON.stringify(monitor.kafkaProducerSaslOptions);

bean.import(monitor);
bean.user_id = socket.userID;

Expand Down Expand Up @@ -757,6 +760,11 @@ let needSetup = false;
bean.httpBodyEncoding = monitor.httpBodyEncoding;
bean.expectedValue = monitor.expectedValue;
bean.jsonPath = monitor.jsonPath;
bean.kafkaProducerTopic = monitor.kafkaProducerTopic;
bean.kafkaProducerBrokers = JSON.stringify(monitor.kafkaProducerBrokers);
bean.kafkaProducerAllowAutoTopicCreation = monitor.kafkaProducerAllowAutoTopicCreation;
bean.kafkaProducerSaslOptions = JSON.stringify(monitor.kafkaProducerSaslOptions);
bean.kafkaProducerMessage = monitor.kafkaProducerMessage;

bean.validate();

Expand Down
93 changes: 92 additions & 1 deletion server/util-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ const {
} = require("node-radius-utils");
const dayjs = require("dayjs");

const isWindows = process.platform === /^win/.test(process.platform);
// SASLOptions used in JSDoc
// eslint-disable-next-line no-unused-vars
const { Kafka, SASLOptions } = require("kafkajs");

const isWindows = process.platform === /^win/.test(process.platform);
/**
* Init or reset JWT secret
* @returns {Promise<Bean>}
Expand Down Expand Up @@ -196,6 +199,94 @@ exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
});
};

/**
* Monitor Kafka using Producer
* @param {string} topic Topic name to produce into
* @param {string} message Message to produce
* @param {Object} [options={interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma"}]
* Kafka client options. Contains ssl, clientId, allowAutoTopicCreation and
* interval (interval defaults to 20, allowAutoTopicCreation defaults to false, clientId defaults to "Uptime-Kuma"
* and ssl defaults to false)
* @param {string[]} brokers List of kafka brokers to connect, host and port joined by ':'
* @param {SASLOptions} [saslOptions={}] Options for kafka client Authentication (SASL) (defaults to
* {})
* @returns {Promise<string>}
*/
exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, saslOptions = {}) {
return new Promise((resolve, reject) => {
const { interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma" } = options;

let connectedToKafka = false;

const timeoutID = setTimeout(() => {
log.debug("kafkaProducer", "KafkaProducer timeout triggered");
connectedToKafka = true;
reject(new Error("Timeout"));
}, interval * 1000 * 0.8);

if (saslOptions.mechanism === "None") {
saslOptions = undefined;
}

let client = new Kafka({
brokers: brokers,
clientId: clientId,
sasl: saslOptions,
retry: {
retries: 0,
},
ssl: ssl,
});

let producer = client.producer({
allowAutoTopicCreation: allowAutoTopicCreation,
retry: {
retries: 0,
}
});

producer.connect().then(
() => {
try {
producer.send({
topic: topic,
messages: [{
value: message,
}],
});
connectedToKafka = true;
clearTimeout(timeoutID);
resolve("Message sent successfully");
} catch (e) {
connectedToKafka = true;
producer.disconnect();
clearTimeout(timeoutID);
reject(new Error("Error sending message: " + e.message));
}
}
).catch(
(e) => {
connectedToKafka = true;
producer.disconnect();
clearTimeout(timeoutID);
reject(new Error("Error in producer connection: " + e.message));
}
);

producer.on("producer.network.request_timeout", (_) => {
clearTimeout(timeoutID);
reject(new Error("producer.network.request_timeout"));
});

producer.on("producer.disconnect", (_) => {
if (!connectedToKafka) {
clearTimeout(timeoutID);
reject(new Error("producer.disconnect"));
}
});
});
};

/**
* Use NTLM Auth for a http request.
* @param {Object} options The http request options
Expand Down
14 changes: 14 additions & 0 deletions src/lang/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,20 @@
"Badge URL": "Badge URL",
"Group": "Group",
"Monitor Group": "Monitor Group",
"Kafka Brokers": "Kafka Brokers",
"Enter the list of brokers": "Enter the list of brokers",
"Press Enter to add broker": "Press Enter to add broker",
"Kafka Topic Name": "Kafka Topic Name",
"Kafka Producer Message": "Kafka Producer Message",
"Enable Kafka SSL": "Enable Kafka SSL",
"Enable Kafka Producer Auto Topic Creation": "Enable Kafka Producer Auto Topic Creation",
"Kafka SASL Options": "Kafka SASL Options",
"Mechanism": "Mechanism",
"Pick a SASL Mechanism...": "Pick a SASL Mechanism...",
"Authorization Identity": "Authorization Identity",
"AccessKey Id": "AccessKey Id",
"Secret AccessKey": "Secret AccessKey",
"Session Token": "Session Token",
"noGroupMonitorMsg": "Not Available. Create a Group Monitor First.",
"Close": "Close",
"Request Body": "Request Body"
Expand Down
Loading

0 comments on commit 278b88a

Please sign in to comment.