From 717f5a9b00a55228640acd155510d24ca1b936e6 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 11 Dec 2023 19:27:53 +0100 Subject: [PATCH 1/2] Add commitCb method to avoid blocking while committing and return a Promise without having to call consumer --- lib/kafka-consumer.js | 20 ++++++++++++++++ lib/kafkajs/_consumer.js | 36 +++++++++++++++++----------- src/kafka-consumer.cc | 40 +++++++++++++++++++++++++++++++ src/kafka-consumer.h | 1 + src/workers.cc | 52 ++++++++++++++++++++++++++++++++++++++++ src/workers.h | 15 ++++++++++++ 6 files changed, 150 insertions(+), 14 deletions(-) diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index dd981ade..ae7452ce 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -582,6 +582,26 @@ KafkaConsumer.prototype.commitMessageSync = function(msg) { return this; }; +/** + * Commits a list of offsets per topic partition, using provided callback. + * + * @param {TopicPartition[]} toppars - Topic partition list to commit + * offsets for. Defaults to the current assignment + * @param {Function} cb - Callback method to execute when finished + * @return {Client} - Returns itself + */ +KafkaConsumer.prototype.commitCb = function(toppars, cb) { + this._client.commitCb(toppars, function(err) { + if (err) { + cb(LibrdKafkaError.create(err)); + return; + } + + cb(null); + }); + return this; +}; + /** * Get last known offsets from the client. * diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index ca620b90..8823da81 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -326,24 +326,32 @@ class Consumer { * @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions * @returns {Promise} a promise that resolves when the offsets have been committed. */ - async commitOffsets(topicPartitions = null) { + commitOffsets(topicPartitions = null) { if (this.#state !== ConsumerState.CONNECTED) { - throw new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); + return Promise.reject(new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE })); } - try { - if (topicPartitions === null) { - this.#internalClient.commitSync(); - } else { - const topicPartitions = topicPartitions.map( - topicPartitionOffsetToRdKafka); - this.#internalClient.commitSync(topicPartitions); - } - } catch (e) { - if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET) { - throw createKafkaJsErrorFromLibRdKafkaError(e); + return new Promise((resolve, reject) => { + try { + let cb = (e) => { + if (e) + reject(createKafkaJsErrorFromLibRdKafkaError(e)); + else + resolve(); + }; + + if (topicPartitions) + topicPartitions = topicPartitions.map(topicPartitionOffsetToRdKafka); + else + topicPartitions = null; + this.#internalClient.commitCb(topicPartitions, cb); + } catch (e) { + if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET) + reject(createKafkaJsErrorFromLibRdKafkaError(e)); + else + resolve(); } - } + }); } /** diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index eccab3e9..f3331a98 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -532,6 +532,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); + Nan::SetPrototypeMethod(tpl, "commitCb", NodeCommitCb); Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore); constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext())) @@ -875,6 +876,45 @@ NAN_METHOD(KafkaConsumer::NodeCommitSync) { info.GetReturnValue().Set(Nan::New(error_code)); } +NAN_METHOD(KafkaConsumer::NodeCommitCb) { + Nan::HandleScope scope; + int error_code; + std::optional> toppars = std::nullopt; + Nan::Callback *callback; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + if (!consumer->IsConnected()) { + Nan::ThrowError("KafkaConsumer is disconnected"); + return; + } + + if (info.Length() != 2) { + Nan::ThrowError("Two arguments are required"); + return; + } + + if (!( + (info[0]->IsArray() || info[0]->IsNull()) && + info[1]->IsFunction() + )) { + Nan::ThrowError("First argument should be an array or null and second one a callback"); + return; + } + + if (info[0]->IsArray()) { + toppars = + Conversion::TopicPartition::FromV8Array(info[0].As()); + } + callback = new Nan::Callback(info[1].As()); + + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerCommitCb(callback, consumer, + toppars)); + + info.GetReturnValue().Set(Nan::Null()); +} + NAN_METHOD(KafkaConsumer::NodeSubscribe) { Nan::HandleScope scope; diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index d5991944..a25efd00 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -110,6 +110,7 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); static NAN_METHOD(NodeCommitSync); + static NAN_METHOD(NodeCommitCb); static NAN_METHOD(NodeOffsetsStore); static NAN_METHOD(NodeCommitted); static NAN_METHOD(NodePosition); diff --git a/src/workers.cc b/src/workers.cc index 749732d0..c6e5dca4 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -1036,6 +1036,58 @@ void KafkaConsumerCommitted::HandleErrorCallback() { callback->Call(argc, argv); } +/** + * @brief KafkaConsumer commit offsets with a callback function. + * + * The first callback argument is the commit error, or null on success. + * + * @see RdKafka::KafkaConsumer::commitSync + */ +KafkaConsumerCommitCb::KafkaConsumerCommitCb(Nan::Callback *callback, + KafkaConsumer* consumer, + std::optional> & t) : + ErrorAwareWorker(callback), + m_consumer(consumer), + m_topic_partitions(t) {} + +KafkaConsumerCommitCb::~KafkaConsumerCommitCb() { + // Delete the underlying topic partitions as they are ephemeral or cloned + if (m_topic_partitions.has_value()) + RdKafka::TopicPartition::destroy(m_topic_partitions.value()); +} + +void KafkaConsumerCommitCb::Execute() { + Baton b = Baton(NULL); + if (m_topic_partitions.has_value()) { + b = m_consumer->Commit(m_topic_partitions.value()); + } else { + b = m_consumer->Commit(); + } + if (b.err() != RdKafka::ERR_NO_ERROR) { + SetErrorBaton(b); + } +} + +void KafkaConsumerCommitCb::HandleOKCallback() { + Nan::HandleScope scope; + + const unsigned int argc = 1; + v8::Local argv[argc]; + + argv[0] = Nan::Null(); + + callback->Call(argc, argv); +} + +void KafkaConsumerCommitCb::HandleErrorCallback() { + Nan::HandleScope scope; + + const unsigned int argc = 1; + v8::Local argv[argc] = { GetErrorObject() }; + + callback->Call(argc, argv); +} + /** * @brief KafkaConsumer seek * diff --git a/src/workers.h b/src/workers.h index b290d253..36388c41 100644 --- a/src/workers.h +++ b/src/workers.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "src/common.h" @@ -417,6 +418,20 @@ class KafkaConsumerCommitted : public ErrorAwareWorker { const int m_timeout_ms; }; +class KafkaConsumerCommitCb : public ErrorAwareWorker { + public: + KafkaConsumerCommitCb(Nan::Callback*, + NodeKafka::KafkaConsumer*, std::optional> &); + ~KafkaConsumerCommitCb(); + + void Execute(); + void HandleOKCallback(); + void HandleErrorCallback(); + private: + NodeKafka::KafkaConsumer * m_consumer; + std::optional> m_topic_partitions; +}; + class KafkaConsumerSeek : public ErrorAwareWorker { public: KafkaConsumerSeek(Nan::Callback*, NodeKafka::KafkaConsumer*, From 0bca809b065cd672b60d854a727435f2647e30a1 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 12 Dec 2023 09:38:23 +0100 Subject: [PATCH 2/2] Address comment --- lib/kafkajs/_consumer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 8823da81..928a7a3d 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -326,7 +326,7 @@ class Consumer { * @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions * @returns {Promise} a promise that resolves when the offsets have been committed. */ - commitOffsets(topicPartitions = null) { + async commitOffsets(topicPartitions = null) { if (this.#state !== ConsumerState.CONNECTED) { return Promise.reject(new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE })); }