-
Notifications
You must be signed in to change notification settings - Fork 23
List consumer group offsets #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
63f05de
4445321
28aff0a
0b3a617
3c4a907
76639a6
6ff253b
925fd21
5b769a8
195a922
380ee83
254765b
3ba54a1
ffe16f7
a98699a
160f762
2a99288
28e28e3
ffbbafd
42004b1
f949d88
7f24913
fbbf9f2
8c11d0e
98ba984
4f0f25b
ba0603b
fdf56ef
5ecf261
b0e4372
ac0bece
72305d2
f4b4aaf
31e325c
89e8227
f34e086
6c919ff
69daca9
4f2d255
14d33b6
9fe9571
1dcfe39
b69e87f
d73a14d
bc059a4
a85cda0
4b9b340
3aab3c2
2bbb2af
f724ed8
a348985
15fff05
ffae694
aceae76
eddaabc
8bd4940
3d54a18
34302ba
ad06919
9b88c91
5424a4a
3ca8437
d2b7227
546df33
cd0887a
5c637c0
cbc69be
ecdd836
1b77019
ac1367c
71c4aeb
ffbffe8
5adb821
b6379d3
49e12c6
5356f81
a8e5b39
8b41c1e
69b28a5
a8e3914
acc94a4
12cf126
12e33c9
98f12f8
9c7f096
63a949f
ad0ff8c
52944ea
228f64b
3431a92
5cc2dee
73ca334
4c7c8df
fad64ce
a86c3b4
b3712ba
1501a64
29bc526
4e42726
e3de7e4
0f3a167
1bde73a
ce5a4e9
beafa7c
a7c5aca
99b0252
f6f5b54
603ca2e
2f86c63
a758b90
7085111
b2e28fa
92f262d
2d90d5b
c435397
7f6dd40
1c1cfe8
3c494e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| // require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS. | ||
| const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS; | ||
|
||
|
|
||
| let producer, consumer, admin; | ||
| let Id = "newGroup"; | ||
| let topicName = "newTopic"; | ||
|
|
||
| const kafka = new Kafka({ | ||
| kafkaJS: { | ||
| brokers: ["localhost:9092"], | ||
| }, | ||
| }); | ||
|
|
||
| async function waitFor(check, resolveValue, { delay = 50 } = {}) { | ||
| return new Promise((resolve) => { | ||
| const interval = setInterval(() => { | ||
| if (check()) { | ||
| clearInterval(interval); | ||
| resolve(resolveValue()); | ||
| } | ||
| }, delay); | ||
| }); | ||
| } | ||
|
|
||
| async function waitForMessages(messagesConsumed, { number = 1, delay } = {}) { | ||
| return waitFor( | ||
| () => messagesConsumed.length >= number, | ||
| () => messagesConsumed, | ||
| { delay } | ||
| ); | ||
| } | ||
|
|
||
| async function adminStart() { | ||
| admin = kafka.admin(); | ||
| await admin.connect(); | ||
|
|
||
| producer = kafka.producer(); | ||
| consumer = kafka.consumer({ | ||
| kafkaJS: { | ||
| groupId: Id, | ||
| fromBeginning: true, | ||
| autoCommit: false, | ||
| }, | ||
| }); | ||
|
|
||
| await admin.createTopics({ | ||
| topics: [{ topic: topicName, numPartitions: 1 }], | ||
| }); | ||
| console.log("Topic created successfully"); | ||
|
|
||
| await producer.connect(); | ||
| await consumer.connect(); | ||
|
|
||
| console.log("Consumer Connected successfully"); | ||
|
|
||
| await consumer.subscribe({ | ||
| topics: [topicName], | ||
| }); | ||
| console.log("Consumer subscribed to topic"); | ||
|
|
||
| const messages = Array.from({ length: 5 }, (_, i) => ({ | ||
| value: `message${i}`, | ||
| })); | ||
|
|
||
| await producer.send({ topic: topicName, messages }); | ||
| console.log("Messages sent till offset 4"); | ||
|
|
||
| let messagesConsumed = []; // Define messagesConsumed | ||
|
|
||
| await consumer.run({ | ||
| eachMessage: async ({ topic, partition, message }) => { | ||
| try { | ||
| messagesConsumed.push(message); // Populate messagesConsumed | ||
| if (messagesConsumed.length === 5) { | ||
| await consumer.commitOffsets([ | ||
| { | ||
| topic, | ||
| partition, | ||
| offset: (parseInt(message.offset, 10) + 1).toString(), | ||
| }, | ||
| ]); | ||
| await consumer.stop(); | ||
| } | ||
| } catch (error) { | ||
| if (error.message.includes("Offset out of range")) { | ||
| await consumer.stop(); | ||
| } else { | ||
| throw error; // Re-throw the error if it's not an "Offset out of range" error | ||
| } | ||
| } | ||
| }, | ||
| }); | ||
|
|
||
| await waitForMessages(messagesConsumed, { number: 5 }); | ||
| console.log("Messages consumed successfully"); | ||
| await producer.disconnect(); | ||
| await consumer.disconnect(); | ||
| // Fetch offsets after all messages have been consumed | ||
| const offsets = await admin.fetchOffsets({ | ||
| groupId: Id, | ||
| topics: [ | ||
| { | ||
| topic: topicName, | ||
| partitions: [0], // replace with actual partition numbers | ||
| }, | ||
| ], | ||
| }); | ||
|
|
||
| console.log("Consumer group offsets: ", JSON.stringify(offsets, null, 2)); | ||
|
|
||
| await admin.deleteGroups([Id]); | ||
| console.log("Consumer group deleted successfully"); | ||
| await admin.deleteTopics({ | ||
| topics: [topicName], | ||
| }); | ||
|
|
||
| await admin.disconnect(); | ||
| } | ||
|
|
||
| adminStart(); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -437,3 +437,73 @@ AdminClient.prototype.listTopics = function (options, cb) { | |
| } | ||
| }); | ||
| } | ||
| /** | ||
| * Fetch Offsets | ||
| * | ||
| * @param {string} options.groupId - The group ID to fetch offsets for. | ||
| * @param {import('../types/rdkafka').TopicInput} options.topics - The topics to fetch offsets for. | ||
|
||
| * @param {number?} options.timeout - The request timeout in milliseconds. | ||
| * May be unset (default: 5000) | ||
| * @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets | ||
| * (transaction-committed). (default: false) | ||
|
||
| * | ||
| * @param {function} cb - The callback to be executed when finished. | ||
| */ | ||
| AdminClient.prototype.fetchOffsets = function (options, cb) { | ||
|
|
||
| if (!this._isConnected) { | ||
| throw new Error('Client is disconnected'); | ||
| } | ||
|
|
||
| if (typeof options === 'function' || !options) { | ||
| throw new Error('Options with groupId must be provided'); | ||
| } | ||
|
|
||
| if (!options.groupId) { | ||
| throw new Error('groupId must be provided'); | ||
| } | ||
|
|
||
|
|
||
| if (!Object.hasOwn(options, 'timeout')) { | ||
| options.timeout = 5000; | ||
| } | ||
|
|
||
| if(!Object.hasOwn(options, 'requireStableOffsets')){ | ||
| options.requireStableOffsets = false; | ||
| } | ||
|
|
||
| if(!Object.hasOwn(options, 'topics')){ | ||
| options.topics = null; | ||
| } | ||
|
|
||
| /* | ||
| If the topics array consists of strings, we will set it to NULL. | ||
| Consequently, the function will return results for all partitions | ||
| across all topics associated with the given group ID. Subsequently, | ||
| we will filter these results based on the original topics array, | ||
| thereby displaying only the relevant results. | ||
| */ | ||
| let originalTopics = null; | ||
|
||
| if (Array.isArray(options.topics) && options.topics.length > 0 && typeof options.topics[0] === 'string') { | ||
milindl marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| originalTopics = options.topics; | ||
| options.topics = null; | ||
| } | ||
|
|
||
| this._client.fetchOffsets(options, function (err, offsets) { | ||
| if (err) { | ||
| if (cb) { | ||
| cb(LibrdKafkaError.create(err)); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| if (originalTopics !== null) { | ||
| offsets = offsets.filter(offset => originalTopics.includes(offset.topic)); | ||
| } | ||
|
|
||
| if (cb) { | ||
| cb(null, offsets); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -366,6 +366,39 @@ class Admin { | |
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Fetch Offsets | ||
|
||
| * | ||
| * @param {string} options.groupId - The group ID to fetch offsets for. | ||
| * @param {import('../../types/rdkafka').TopicInput} options.topics - The topics to fetch offsets for. | ||
|
||
| * @param {boolean} options.resolveOffsets - not yet implemented | ||
| * @param {number?} options.timeout - The request timeout in milliseconds. | ||
| * May be unset (default: 5000) | ||
| * @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets | ||
| * (transaction-committed). (default: false) | ||
|
||
| * | ||
| * @returns {Promise<Array<topic: string, partitions: import('../../types/kafkajs').FetchOffsetsPartition>>} | ||
| */ | ||
| async fetchOffsets(options = {}) { | ||
| if (this.#state !== AdminState.CONNECTED) { | ||
| throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE }); | ||
| } | ||
|
|
||
| if (Object.hasOwn(options, "resolveOffsets")) { | ||
| throw new error.KafkaJSError("resolveOffsets is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED }); | ||
| } | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| this.#internalClient.fetchOffsets(options, (err, offsets) => { | ||
| if (err) { | ||
| reject(createKafkaJsErrorFromLibRdKafkaError(err)); | ||
| } else { | ||
| resolve(offsets); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| module.exports = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) { | |
| Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups); | ||
| Nan::SetPrototypeMethod(tpl, "describeGroups", NodeDescribeGroups); | ||
| Nan::SetPrototypeMethod(tpl, "deleteGroups", NodeDeleteGroups); | ||
| Nan::SetPrototypeMethod(tpl, "fetchOffsets", NodeFetchOffsets); | ||
|
|
||
| Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); | ||
| Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); | ||
|
|
@@ -666,6 +667,91 @@ Baton AdminClient::DeleteGroups(rd_kafka_DeleteGroup_t **group_list, | |
| } | ||
| } | ||
|
|
||
| Baton AdminClient::FetchOffsets(rd_kafka_ListConsumerGroupOffsets_t **req, | ||
| size_t req_cnt, bool require_stable_offsets, | ||
| int timeout_ms, | ||
| rd_kafka_event_t **event_response) { | ||
| if (!IsConnected()) { | ||
| return Baton(RdKafka::ERR__STATE); | ||
| } | ||
|
|
||
| { | ||
| scoped_shared_write_lock lock(m_connection_lock); | ||
| if (!IsConnected()) { | ||
| return Baton(RdKafka::ERR__STATE); | ||
| } | ||
|
|
||
| // Make admin options to establish that we are fetching offsets | ||
| rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new( | ||
| m_client->c_ptr(), RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS); | ||
|
|
||
| char errstr[512]; | ||
| rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout( | ||
| options, timeout_ms, errstr, sizeof(errstr)); | ||
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
| return Baton(static_cast<RdKafka::ErrorCode>(err), errstr); | ||
| } | ||
|
|
||
| if (require_stable_offsets) { | ||
| rd_kafka_error_t *error = | ||
| rd_kafka_AdminOptions_set_require_stable_offsets( | ||
| options, require_stable_offsets); | ||
| if (error) { | ||
| return Baton::BatonFromErrorAndDestroy(error); | ||
| } | ||
| } | ||
|
|
||
| // Create queue just for this operation. | ||
| rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr()); | ||
|
|
||
| rd_kafka_ListConsumerGroupOffsets(m_client->c_ptr(), req, req_cnt, options, | ||
| rkqu); | ||
|
|
||
| // Poll for an event by type in that queue | ||
| // DON'T destroy the event. It is the out parameter, and ownership is | ||
| // the caller's. | ||
| *event_response = PollForEvent( | ||
| rkqu, RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT, timeout_ms); | ||
|
|
||
| // Destroy the queue since we are done with it. | ||
| rd_kafka_queue_destroy(rkqu); | ||
|
|
||
| // Destroy the options we just made because we polled already | ||
| rd_kafka_AdminOptions_destroy(options); | ||
|
|
||
| // If we got no response from that operation, this is a failure | ||
| // likely due to time out | ||
| if (*event_response == NULL) { | ||
| return Baton(RdKafka::ERR__TIMED_OUT); | ||
| } | ||
|
|
||
| // Now we can get the error code from the event | ||
| if (rd_kafka_event_error(*event_response)) { | ||
| // If we had a special error code, get out of here with it | ||
| const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response); | ||
| return Baton(static_cast<RdKafka::ErrorCode>(errcode)); | ||
| } | ||
|
|
||
| const rd_kafka_ListConsumerGroupOffsets_result_t *result = | ||
| rd_kafka_event_ListConsumerGroupOffsets_result(*event_response); | ||
|
|
||
| size_t result_cnt; | ||
| const rd_kafka_group_result_t **results = | ||
| rd_kafka_ListConsumerGroupOffsets_result_groups(result, &result_cnt); | ||
|
|
||
| // Change the type of the 'error' pointer to 'const rd_kafka_error_t *' | ||
|
||
| const rd_kafka_error_t *error = rd_kafka_group_result_error(results[0]); | ||
| if (error) { | ||
| // Use the 'rd_kafka_error_code' function to get the error code | ||
| return Baton(static_cast<RdKafka::ErrorCode>(rd_kafka_error_code(error))); | ||
| } | ||
|
|
||
| // At this point, event_response contains the result, which needs | ||
| // to be parsed/converted by the caller. | ||
| return Baton(RdKafka::ERR_NO_ERROR); | ||
| } | ||
| } | ||
|
|
||
| void AdminClient::ActivateDispatchers() { | ||
| // Listen to global config | ||
| m_gconfig->listen(); | ||
|
|
@@ -986,4 +1072,63 @@ NAN_METHOD(AdminClient::NodeDeleteGroups) { | |
| callback, client, group_list, group_names_vector.size(), timeout_ms)); | ||
| } | ||
|
|
||
| NAN_METHOD(AdminClient::NodeFetchOffsets) { | ||
| Nan::HandleScope scope; | ||
| if (info.Length() < 2 || !info[1]->IsFunction()) { | ||
| return Nan::ThrowError("Need to specify a callback"); | ||
| } | ||
| if (!info[0]->IsObject()) { | ||
| return Nan::ThrowError("Must provide an options object"); | ||
| } | ||
|
|
||
| v8::Local<v8::Object> options = info[0].As<v8::Object>(); | ||
|
|
||
| v8::Local<v8::Value> groupIdValue; | ||
| if (!Nan::Get(options, Nan::New("groupId").ToLocalChecked()) | ||
| .ToLocal(&groupIdValue)) { | ||
| return Nan::ThrowError("Must provide 'groupId'"); | ||
| } | ||
|
|
||
| Nan::MaybeLocal<v8::String> groupIdMaybe = Nan::To<v8::String>(groupIdValue); | ||
| if (groupIdMaybe.IsEmpty()) { | ||
| return Nan::ThrowError("'groupId' must be a string"); | ||
| } | ||
| Nan::Utf8String groupIdUtf8(groupIdMaybe.ToLocalChecked()); | ||
| std::string groupIdStr = *groupIdUtf8; | ||
|
|
||
| v8::Local<v8::Array> topics = GetParameter<v8::Local<v8::Array>>( | ||
| options, "topics", Nan::New<v8::Array>()); | ||
|
|
||
| rd_kafka_topic_partition_list_t *partitions = NULL; | ||
|
|
||
| if (!topics->IsNull() && !topics->IsUndefined() && topics->Length() > 0) { | ||
| partitions = Conversion::TopicPartition:: | ||
| GroupedTopicPartitionv8ArrayToTopicPartitionList(topics); | ||
| } | ||
|
|
||
| rd_kafka_ListConsumerGroupOffsets_t **request = | ||
| static_cast<rd_kafka_ListConsumerGroupOffsets_t **>( | ||
| malloc(sizeof(rd_kafka_ListConsumerGroupOffsets_t *) * 1)); | ||
| request[0] = | ||
| rd_kafka_ListConsumerGroupOffsets_new(groupIdStr.c_str(), partitions); | ||
|
||
|
|
||
| if (partitions != NULL) { | ||
| rd_kafka_topic_partition_list_destroy(partitions); | ||
| } | ||
|
|
||
| // Get the timeout - default 5000 and require_stable_offsets parameter. | ||
|
|
||
| bool require_stable_offsets = | ||
| GetParameter<bool>(options, "requireStableOffsets", false); | ||
| int timeout_ms = GetParameter<int64_t>(options, "timeout", 5000); | ||
|
|
||
| // Create the final callback object | ||
| v8::Local<v8::Function> cb = info[1].As<v8::Function>(); | ||
| Nan::Callback *callback = new Nan::Callback(cb); | ||
| AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This()); | ||
|
|
||
| Nan::AsyncQueueWorker(new Workers::AdminClientFetchOffsets( | ||
| callback, client, request, 1, require_stable_offsets, timeout_ms)); | ||
| } | ||
|
|
||
| } // namespace NodeKafka | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file still needs to be moved to
examples/kafkajs/admin/fetch-offsets.js, and useparseArgsappropriately for argument parsing. You can follow this file for an idea of how to do it:examples/kafkajs/admin/describe-groups.jsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use arguments for bootstrap server, requireStableOffsets and timeout. Use positionals for the group name and the topic/partitions as you are currently doing, it will be something like