Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commitCb method #59

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,26 @@ KafkaConsumer.prototype.commitMessageSync = function(msg) {
return this;
};

/**
* Commits a list of offsets per topic partition, using provided callback.
*
* @param {TopicPartition[]} toppars - Topic partition list to commit
* offsets for. Defaults to the current assignment
* @param {Function} cb - Callback method to execute when finished
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.commitCb = function(toppars, cb) {
this._client.commitCb(toppars, function(err) {
if (err) {
cb(LibrdKafkaError.create(err));
return;
}

cb(null);
});
return this;
};

/**
* Get last known offsets from the client.
*
Expand Down
32 changes: 20 additions & 12 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -1339,19 +1339,27 @@ class Consumer {
throw new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
}

try {
if (topicPartitions === null) {
this.#internalClient.commitSync();
} else {
const topicPartitionsRdKafka = topicPartitions.map(
topicPartitionOffsetMetadataToRdKafka);
this.#internalClient.commitSync(topicPartitionsRdKafka);
}
} catch (e) {
if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET) {
throw createKafkaJsErrorFromLibRdKafkaError(e);
return new Promise((resolve, reject) => {
try {
let cb = (e) => {
if (e)
reject(createKafkaJsErrorFromLibRdKafkaError(e));
else
resolve();
};

if (topicPartitions)
topicPartitions = topicPartitions.map(topicPartitionOffsetMetadataToRdKafka);
else
topicPartitions = null;
this.#internalClient.commitCb(topicPartitions, cb);
} catch (e) {
if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET)
reject(createKafkaJsErrorFromLibRdKafkaError(e));
else
resolve();
}
}
});
}

/**
Expand Down
40 changes: 40 additions & 0 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {

Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
Nan::SetPrototypeMethod(tpl, "commitCb", NodeCommitCb);
Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore);
Nan::SetPrototypeMethod(tpl, "offsetsStoreSingle", NodeOffsetsStoreSingle);

Expand Down Expand Up @@ -1025,6 +1026,45 @@ NAN_METHOD(KafkaConsumer::NodeCommitSync) {
info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
}

NAN_METHOD(KafkaConsumer::NodeCommitCb) {
Nan::HandleScope scope;
int error_code;
std::optional<std::vector<RdKafka::TopicPartition *>> toppars = std::nullopt;
Nan::Callback *callback;

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

if (!consumer->IsConnected()) {
Nan::ThrowError("KafkaConsumer is disconnected");
return;
}

if (info.Length() != 2) {
Nan::ThrowError("Two arguments are required");
return;
}

if (!(
(info[0]->IsArray() || info[0]->IsNull()) &&
info[1]->IsFunction())) {
Nan::ThrowError(
"First argument should be an array or null and second one a callback");
return;
}

if (info[0]->IsArray()) {
toppars =
Conversion::TopicPartition::FromV8Array(info[0].As<v8::Array>());
}
callback = new Nan::Callback(info[1].As<v8::Function>());

Nan::AsyncQueueWorker(
new Workers::KafkaConsumerCommitCb(callback, consumer,
toppars));

info.GetReturnValue().Set(Nan::Null());
}

NAN_METHOD(KafkaConsumer::NodeSubscribe) {
Nan::HandleScope scope;

Expand Down
1 change: 1 addition & 0 deletions src/kafka-consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class KafkaConsumer : public Connection {
static NAN_METHOD(NodeUnsubscribe);
static NAN_METHOD(NodeCommit);
static NAN_METHOD(NodeCommitSync);
static NAN_METHOD(NodeCommitCb);
static NAN_METHOD(NodeOffsetsStore);
static NAN_METHOD(NodeOffsetsStoreSingle);
static NAN_METHOD(NodeCommitted);
Expand Down
52 changes: 52 additions & 0 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,58 @@ void KafkaConsumerCommitted::HandleErrorCallback() {
callback->Call(argc, argv);
}

/**
* @brief KafkaConsumer commit offsets with a callback function.
*
* The first callback argument is the commit error, or null on success.
*
* @see RdKafka::KafkaConsumer::commitSync
*/
KafkaConsumerCommitCb::KafkaConsumerCommitCb(Nan::Callback *callback,
KafkaConsumer* consumer,
std::optional<std::vector<RdKafka::TopicPartition*>> & t) :
ErrorAwareWorker(callback),
m_consumer(consumer),
m_topic_partitions(t) {}

KafkaConsumerCommitCb::~KafkaConsumerCommitCb() {
// Delete the underlying topic partitions as they are ephemeral or cloned
if (m_topic_partitions.has_value())
RdKafka::TopicPartition::destroy(m_topic_partitions.value());
}

void KafkaConsumerCommitCb::Execute() {
Baton b = Baton(NULL);
if (m_topic_partitions.has_value()) {
b = m_consumer->Commit(m_topic_partitions.value());
} else {
b = m_consumer->Commit();
}
if (b.err() != RdKafka::ERR_NO_ERROR) {
SetErrorBaton(b);
}
}

void KafkaConsumerCommitCb::HandleOKCallback() {
Nan::HandleScope scope;

const unsigned int argc = 1;
v8::Local<v8::Value> argv[argc];

argv[0] = Nan::Null();

callback->Call(argc, argv);
}

void KafkaConsumerCommitCb::HandleErrorCallback() {
Nan::HandleScope scope;

const unsigned int argc = 1;
v8::Local<v8::Value> argv[argc] = { GetErrorObject() };

callback->Call(argc, argv);
}

/**
* @brief KafkaConsumer seek
*
Expand Down
16 changes: 16 additions & 0 deletions src/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <uv.h>
#include <nan.h>
#include <string>
#include <optional>
#include <vector>

#include "src/common.h"
Expand Down Expand Up @@ -423,6 +424,21 @@ class KafkaConsumerCommitted : public ErrorAwareWorker {
const int m_timeout_ms;
};

class KafkaConsumerCommitCb : public ErrorAwareWorker {
public:
KafkaConsumerCommitCb(Nan::Callback*,
NodeKafka::KafkaConsumer*,
std::optional<std::vector<RdKafka::TopicPartition*>> &);
~KafkaConsumerCommitCb();

void Execute();
void HandleOKCallback();
void HandleErrorCallback();
private:
NodeKafka::KafkaConsumer * m_consumer;
std::optional<std::vector<RdKafka::TopicPartition*>> m_topic_partitions;
};

class KafkaConsumerSeek : public ErrorAwareWorker {
public:
KafkaConsumerSeek(Nan::Callback*, NodeKafka::KafkaConsumer*,
Expand Down