diff --git a/CHANGELOG.md b/CHANGELOG.md index b651bdc8..65be3d8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ -# confluent-kafka-javascript v0.3.1 +# confluent-kafka-javascript v0.4.0 -v0.3.1 is a limited availability maintenance release. It is supported for all usage. +v0.4.0 is a limited availability feature release. It is supported for all usage. ## Enhancements 1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130). +2. Add support for an Admin API to list a consumer group's offsets (#49). # confluent-kafka-javascript v0.3.0 diff --git a/MIGRATION.md b/MIGRATION.md index 3c28761b..3f57f2d1 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -331,6 +331,8 @@ The admin-client only has support for a limited subset of methods, with more to * The `describeGroups` method is supported with additional `timeout` and `includeAuthorizedOperations` options. A number of additional properties have been added to the returned groups. * The `deleteGroups` method is supported with an additional `timeout` option. + * The `fetchOffsets` method is supported with additional `timeout` and + `requireStableOffsets` option but `resolveOffsets` option is not yet supported. ### Using the Schema Registry diff --git a/examples/kafkajs/admin/fetch-offsets.js b/examples/kafkajs/admin/fetch-offsets.js new file mode 100644 index 00000000..470a5965 --- /dev/null +++ b/examples/kafkajs/admin/fetch-offsets.js @@ -0,0 +1,96 @@ +const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS; +const { parseArgs } = require('node:util'); + +async function fetchOffsets() { + const args = parseArgs({ + allowPositionals: true, + options: { + 'bootstrap-servers': { + type: 'string', + short: 'b', + default: 'localhost:9092', + }, + 'timeout': { + type: 'string', + short: 'm', + default: '5000', + }, + 'require-stable-offsets': { + type: 'boolean', + short: 'r', + default: false, + }, + }, + }); + + const { + 'bootstrap-servers': bootstrapServers, + timeout, + 'require-stable-offsets': requireStableOffsets, + } = args.values; + + const [groupId, ...rest] = args.positionals; + + if (!groupId) { + console.error('Group ID is required'); + process.exit(1); + } + + const kafka = new Kafka({ + kafkaJS: { + brokers: [bootstrapServers], + }, + }); + + const admin = kafka.admin(); + await admin.connect(); + + try { + // Parse topics and partitions from remaining arguments + const topicInput = parseTopicsAndPartitions(rest); + + // Fetch offsets for the specified consumer group + const offsets = await admin.fetchOffsets({ + groupId: groupId, + topics: topicInput, + requireStableOffsets, + timeout: Number(timeout), + }); + + console.log(`Offsets for Consumer Group "${groupId}":`, JSON.stringify(offsets, null, 2)); + } catch (err) { + console.error('Error fetching consumer group offsets:', err); + } finally { + await admin.disconnect(); + } +} + +// Helper function to parse topics and partitions from arguments +function parseTopicsAndPartitions(args) { + if (args.length === 0) return undefined; + + const topicInput = []; + let i = 0; + + while (i < args.length) { + const topic = args[i]; + i++; + + const partitions = []; + while (i < args.length && !isNaN(args[i])) { + partitions.push(Number(args[i])); + i++; + } + + // Add topic with partitions (or an empty array if no partitions specified) + if (partitions.length > 0) { + topicInput.push({ topic, partitions }); + } else { + topicInput.push(topic); // Add as a string if no partitions specified + } + } + + return topicInput; +} + +fetchOffsets(); diff --git a/lib/admin.js b/lib/admin.js index fe8750cc..40e57312 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -437,3 +437,50 @@ AdminClient.prototype.listTopics = function (options, cb) { } }); }; + +/** + * List offsets for topic partition(s) for consumer group(s). + * + * @param {import("../../types/rdkafka").ListGroupOffsets} listGroupOffsets - The list of groupId, partitions to fetch offsets for. + * If partitions is null, list offsets for all partitions + * in the group. + * @param {number?} options.timeout - The request timeout in milliseconds. + * May be unset (default: 5000) + * @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets + * (transaction-committed). (default: false) + * + * @param {function} cb - The callback to be executed when finished. + */ +AdminClient.prototype.listConsumerGroupOffsets = function (listGroupOffsets, options, cb) { + + if (!this._isConnected) { + throw new Error('Client is disconnected'); + } + + if (!listGroupOffsets[0].groupId) { + throw new Error('groupId must be provided'); + } + + + if (!Object.hasOwn(options, 'timeout')) { + options.timeout = 5000; + } + + if (!Object.hasOwn(options, 'requireStableOffsets')) { + options.requireStableOffsets = false; + } + + this._client.listConsumerGroupOffsets(listGroupOffsets, options, function (err, offsets) { + if (err) { + if (cb) { + cb(LibrdKafkaError.create(err)); + } + return; + } + + if (cb) { + cb(null, offsets); + } + }); +}; + diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index d265174c..4c456f72 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -417,6 +417,123 @@ class Admin { }); }); } + + /** + * Fetch the offsets for topic partition(s) for consumer group(s). + * + * @param {string} options.groupId - The group ID to fetch offsets for. + * @param {import("../../types/kafkajs").TopicInput} options.topics - The topics to fetch offsets for. + * @param {boolean} options.resolveOffsets - not yet implemented + * @param {number?} options.timeout - The request timeout in milliseconds. + * May be unset (default: 5000) + * @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets + * (transaction-committed). (default: false) + * + * @returns {Promise>} + */ + async fetchOffsets(options = {}) { + if (this.#state !== AdminState.CONNECTED) { + throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE }); + } + + if (Object.hasOwn(options, "resolveOffsets")) { + throw new error.KafkaJSError("resolveOffsets is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED }); + } + + const { groupId, topics } = options; + + if (!groupId) { + throw new error.KafkaJSError("groupId is required.", { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + + let partitions = null; + let originalTopics = null; + + /* + If the input is a list of topic string, the user expects us to + fetch offsets for all all partitions of all the input topics. In + librdkafka, we can only fetch offsets by topic partitions, or else, + we can fetch all of them. Thus, we must fetch offsets for all topic + partitions (by settings partitions to null) and filter by the topic strings later. + */ + if (topics && Array.isArray(topics)) { + if (typeof topics[0] === 'string') { + originalTopics = topics; + partitions = null; + } else if (typeof topics[0] === 'object' && Array.isArray(topics[0].partitions)) { + partitions = topics.flatMap(topic => topic.partitions.map(partition => ({ + topic: topic.topic, + partition + }))); + } else { + throw new error.KafkaJSError("Invalid topics format.", { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + } + + const listGroupOffsets = [{ + groupId, + partitions + }]; + + + return new Promise((resolve, reject) => { + this.#internalClient.listConsumerGroupOffsets(listGroupOffsets, options, (err, offsets) => { + if (err) { + reject(createKafkaJsErrorFromLibRdKafkaError(err)); + } else { + + /** + * Offsets is an array of group results, each containing a group id, + * an error and an array of partitions. + * We need to convert it to the required format of an array of topics, each + * containing an array of partitions. + */ + const topicPartitionMap = new Map(); + + if (offsets.length !== 1) { + reject(new error.KafkaJSError("Unexpected number of group results.")); + return; + } + + const groupResult = offsets[0]; + + if (groupResult.error) { + reject(createKafkaJsErrorFromLibRdKafkaError(groupResult.error)); + return; + } + + // Traverse the partitions and group them by topic + groupResult.partitions.forEach(partitionObj => { + const { topic, partition, offset, leaderEpoch, metadata, error } = partitionObj; + const fetchOffsetsPartition = { + partition: partition, + offset: String(offset), + metadata: metadata || null, + leaderEpoch: leaderEpoch || null, + error: error || null + }; + + // Group partitions by topic + if (!topicPartitionMap.has(topic)) { + topicPartitionMap.set(topic, []); + } + topicPartitionMap.get(topic).push(fetchOffsetsPartition); + }); + + // Convert the map back to the desired array format + let convertedOffsets = Array.from(topicPartitionMap, ([topic, partitions]) => ({ + topic, + partitions + })); + + if (originalTopics !== null) { + convertedOffsets = convertedOffsets.filter(convertedOffset => originalTopics.includes(convertedOffset.topic)); + } + resolve(convertedOffsets); + } + }); + }); + } } module.exports = { diff --git a/src/admin.cc b/src/admin.cc index fccae5f4..ab3ef567 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -120,6 +120,8 @@ void AdminClient::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups); Nan::SetPrototypeMethod(tpl, "describeGroups", NodeDescribeGroups); Nan::SetPrototypeMethod(tpl, "deleteGroups", NodeDeleteGroups); + Nan::SetPrototypeMethod(tpl, "listConsumerGroupOffsets", + NodeListConsumerGroupOffsets); Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); @@ -670,6 +672,77 @@ Baton AdminClient::DeleteGroups(rd_kafka_DeleteGroup_t **group_list, } } +Baton AdminClient::ListConsumerGroupOffsets( + rd_kafka_ListConsumerGroupOffsets_t **req, size_t req_cnt, + bool require_stable_offsets, int timeout_ms, + rd_kafka_event_t **event_response) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + { + scoped_shared_write_lock lock(m_connection_lock); + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + // Make admin options to establish that we are fetching offsets + rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new( + m_client->c_ptr(), RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS); + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout( + options, timeout_ms, errstr, sizeof(errstr)); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + return Baton(static_cast(err), errstr); + } + + if (require_stable_offsets) { + rd_kafka_error_t *error = + rd_kafka_AdminOptions_set_require_stable_offsets( + options, require_stable_offsets); + if (error) { + return Baton::BatonFromErrorAndDestroy(error); + } + } + + // Create queue just for this operation. + rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr()); + + rd_kafka_ListConsumerGroupOffsets(m_client->c_ptr(), req, req_cnt, options, + rkqu); + + // Poll for an event by type in that queue + // DON'T destroy the event. It is the out parameter, and ownership is + // the caller's. + *event_response = PollForEvent( + rkqu, RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT, timeout_ms); + + // Destroy the queue since we are done with it. + rd_kafka_queue_destroy(rkqu); + + // Destroy the options we just made because we polled already + rd_kafka_AdminOptions_destroy(options); + + // If we got no response from that operation, this is a failure + // likely due to time out + if (*event_response == NULL) { + return Baton(RdKafka::ERR__TIMED_OUT); + } + + // Now we can get the error code from the event + if (rd_kafka_event_error(*event_response)) { + // If we had a special error code, get out of here with it + const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response); + return Baton(static_cast(errcode)); + } + + // At this point, event_response contains the result, which needs + // to be parsed/converted by the caller. + return Baton(RdKafka::ERR_NO_ERROR); + } +} + void AdminClient::ActivateDispatchers() { // Listen to global config m_gconfig->listen(); @@ -993,4 +1066,105 @@ NAN_METHOD(AdminClient::NodeDeleteGroups) { callback, client, group_list, group_names_vector.size(), timeout_ms)); } +/** + * List Consumer Group Offsets. + */ +NAN_METHOD(AdminClient::NodeListConsumerGroupOffsets) { + Nan::HandleScope scope; + + if (info.Length() < 3 || !info[2]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } + + if (!info[0]->IsArray()) { + return Nan::ThrowError("Must provide an array of 'listGroupOffsets'"); + } + + v8::Local listGroupOffsets = info[0].As(); + + if (listGroupOffsets->Length() == 0) { + return Nan::ThrowError("'listGroupOffsets' cannot be empty"); + } + + /** + * The ownership of this is taken by + * Workers::AdminClientListConsumerGroupOffsets and freeing it is also handled + * by that class. + */ + rd_kafka_ListConsumerGroupOffsets_t **requests = + static_cast( + malloc(sizeof(rd_kafka_ListConsumerGroupOffsets_t *) * + listGroupOffsets->Length())); + + for (uint32_t i = 0; i < listGroupOffsets->Length(); ++i) { + v8::Local listGroupOffsetValue = + Nan::Get(listGroupOffsets, i).ToLocalChecked(); + if (!listGroupOffsetValue->IsObject()) { + return Nan::ThrowError("Each entry must be an object"); + } + v8::Local listGroupOffsetObj = + listGroupOffsetValue.As(); + + v8::Local groupIdValue; + if (!Nan::Get(listGroupOffsetObj, Nan::New("groupId").ToLocalChecked()) + .ToLocal(&groupIdValue)) { + return Nan::ThrowError("Each entry must have 'groupId'"); + } + + Nan::MaybeLocal groupIdMaybe = + Nan::To(groupIdValue); + if (groupIdMaybe.IsEmpty()) { + return Nan::ThrowError("'groupId' must be a string"); + } + Nan::Utf8String groupIdUtf8(groupIdMaybe.ToLocalChecked()); + std::string groupIdStr = *groupIdUtf8; + + v8::Local partitionsValue; + rd_kafka_topic_partition_list_t *partitions = NULL; + + if (Nan::Get(listGroupOffsetObj, Nan::New("partitions").ToLocalChecked()) + .ToLocal(&partitionsValue) && + partitionsValue->IsArray()) { + v8::Local partitionsArray = partitionsValue.As(); + + if (partitionsArray->Length() > 0) { + partitions = Conversion::TopicPartition:: + TopicPartitionv8ArrayToTopicPartitionList(partitionsArray, false); + if (partitions == NULL) { + return Nan::ThrowError( + "Failed to convert partitions to list, provide proper object in " + "partitions"); + } + } + } + + requests[i] = + rd_kafka_ListConsumerGroupOffsets_new(groupIdStr.c_str(), partitions); + + if (partitions != NULL) { + rd_kafka_topic_partition_list_destroy(partitions); + } + } + + // Now process the second argument: options (timeout and requireStableOffsets) + v8::Local options = Nan::New(); + if (info.Length() > 2 && info[1]->IsObject()) { + options = info[1].As(); + } + + bool require_stable_offsets = + GetParameter(options, "requireStableOffsets", false); + int timeout_ms = GetParameter(options, "timeout", 5000); + + // Create the final callback object + v8::Local cb = info[2].As(); + Nan::Callback *callback = new Nan::Callback(cb); + AdminClient *client = ObjectWrap::Unwrap(info.This()); + + // Queue the worker to process the offset fetch request asynchronously + Nan::AsyncQueueWorker(new Workers::AdminClientListConsumerGroupOffsets( + callback, client, requests, listGroupOffsets->Length(), + require_stable_offsets, timeout_ms)); +} + } // namespace NodeKafka diff --git a/src/admin.h b/src/admin.h index 3af30f21..7e8ed19e 100644 --- a/src/admin.h +++ b/src/admin.h @@ -61,6 +61,10 @@ class AdminClient : public Connection { rd_kafka_event_t** event_response); Baton DeleteGroups(rd_kafka_DeleteGroup_t** group_list, size_t group_cnt, int timeout_ms, rd_kafka_event_t** event_response); + Baton ListConsumerGroupOffsets(rd_kafka_ListConsumerGroupOffsets_t** req, + size_t req_cnt, + bool require_stable_offsets, int timeout_ms, + rd_kafka_event_t** event_response); protected: static Nan::Persistent constructor; @@ -82,6 +86,7 @@ class AdminClient : public Connection { static NAN_METHOD(NodeListGroups); static NAN_METHOD(NodeDescribeGroups); static NAN_METHOD(NodeDeleteGroups); + static NAN_METHOD(NodeListConsumerGroupOffsets); static NAN_METHOD(NodeConnect); static NAN_METHOD(NodeDisconnect); diff --git a/src/common.cc b/src/common.cc index d83525cf..7ed79775 100644 --- a/src/common.cc +++ b/src/common.cc @@ -440,6 +440,43 @@ std::vector FromV8Array( return array; } +/** + * @brief v8 Array of Topic Partitions to rd_kafka_topic_partition_list_t + * + * @note Converts a v8 array of type [{topic: string, partition: number, + * offset?: number}] to a rd_kafka_topic_partition_list_t + */ +rd_kafka_topic_partition_list_t* TopicPartitionv8ArrayToTopicPartitionList( + v8::Local parameter, bool include_offset) { + rd_kafka_topic_partition_list_t* newList = + rd_kafka_topic_partition_list_new(parameter->Length()); + + for (unsigned int i = 0; i < parameter->Length(); i++) { + v8::Local v; + if (!Nan::Get(parameter, i).ToLocal(&v)) { + continue; + } + + if (!v->IsObject()) { + return NULL; // Return NULL to indicate an error + } + + v8::Local item = v.As(); + + std::string topic = GetParameter(item, "topic", ""); + int partition = GetParameter(item, "partition", -1); + + rd_kafka_topic_partition_t* toppar = + rd_kafka_topic_partition_list_add(newList, topic.c_str(), partition); + + if (include_offset) { + int offset = GetParameter(item, "offset", 0); + toppar->offset = offset; + } + } + return newList; +} + /** * @brief v8::Object to RdKafka::TopicPartition * @@ -1093,6 +1130,118 @@ v8::Local FromDeleteGroupsResult( return returnArray; } +/** + * @brief Converts a rd_kafka_ListConsumerGroupOffsets_result_t* + * into a v8 Array. + */ +v8::Local FromListConsumerGroupOffsetsResult( + const rd_kafka_ListConsumerGroupOffsets_result_t* result) { + /* Return Object type: + GroupResults[] = [{ + groupId : string, + error? : LibrdKafkaError, + partitions : TopicPartitionOffset[] + }] + + TopicPartitionOffset: + { + topic : string, + partition : number, + offset : number, + metadata : string | null, + leaderEpoch? : number, + error? : LibrdKafkaError + } + */ + + v8::Local returnArray = Nan::New(); + size_t result_cnt; + const rd_kafka_group_result_t** res = + rd_kafka_ListConsumerGroupOffsets_result_groups(result, &result_cnt); + + for (size_t i = 0; i < result_cnt; i++) { + const rd_kafka_group_result_t* group_result = res[i]; + + // Create group result object + v8::Local group_object = Nan::New(); + + // Set groupId + std::string groupId = rd_kafka_group_result_name(group_result); + Nan::Set(group_object, Nan::New("groupId").ToLocalChecked(), + Nan::New(groupId.c_str()).ToLocalChecked()); + + // Set group-level error (if any) + const rd_kafka_error_t* group_error = + rd_kafka_group_result_error(group_result); + if (group_error) { + RdKafka::ErrorCode code = + static_cast(rd_kafka_error_code(group_error)); + const char* msg = rd_kafka_error_string(group_error); + Nan::Set(group_object, Nan::New("error").ToLocalChecked(), + RdKafkaError(code, msg)); + } + + // Get the list of partitions for this group + const rd_kafka_topic_partition_list_t* partitionList = + rd_kafka_group_result_partitions(group_result); + + // Prepare array for TopicPartitionOffset[] + v8::Local partitionsArray = Nan::New(); + int partitionIndex = 0; + + for (int j = 0; j < partitionList->cnt; j++) { + const rd_kafka_topic_partition_t* partition = &partitionList->elems[j]; + + // Create the TopicPartitionOffset object + v8::Local partition_object = Nan::New(); + + // Set topic, partition, and offset + Nan::Set(partition_object, Nan::New("topic").ToLocalChecked(), + Nan::New(partition->topic).ToLocalChecked()); + Nan::Set(partition_object, Nan::New("partition").ToLocalChecked(), + Nan::New(partition->partition)); + Nan::Set(partition_object, Nan::New("offset").ToLocalChecked(), + Nan::New(partition->offset)); + + // Set metadata (if available) + if (partition->metadata != nullptr) { + Nan::Set( + partition_object, Nan::New("metadata").ToLocalChecked(), + Nan::New(static_cast(partition->metadata)) + .ToLocalChecked()); + } else { + Nan::Set(partition_object, Nan::New("metadata").ToLocalChecked(), + Nan::Null()); + } + + // Set leaderEpoch (if available) + int32_t leader_epoch = + rd_kafka_topic_partition_get_leader_epoch(partition); + if (leader_epoch >= 0) { + Nan::Set(partition_object, Nan::New("leaderEpoch").ToLocalChecked(), + Nan::New(leader_epoch)); + } + + // Set partition-level error (if any) + if (partition->err != RD_KAFKA_RESP_ERR_NO_ERROR) { + RdKafka::ErrorCode code = + static_cast(partition->err); + Nan::Set(group_object, Nan::New("error").ToLocalChecked(), + RdKafkaError(code, rd_kafka_err2str(partition->err))); + } + + Nan::Set(partitionsArray, partitionIndex++, partition_object); + } + + Nan::Set(group_object, Nan::New("partitions").ToLocalChecked(), + partitionsArray); + + Nan::Set(returnArray, i, group_object); + } + + return returnArray; +} + } // namespace Admin } // namespace Conversion diff --git a/src/common.h b/src/common.h index d98508e3..e9027eb5 100644 --- a/src/common.h +++ b/src/common.h @@ -130,6 +130,10 @@ v8::Local FromDescribeConsumerGroupsResult( // DeleteGroups: Response v8::Local FromDeleteGroupsResult( const rd_kafka_DeleteGroups_result_t *); + +// ListConsumerGroupOffsets: Request +v8::Local FromListConsumerGroupOffsetsResult( + const rd_kafka_ListConsumerGroupOffsets_result_t *result); } // namespace Admin namespace TopicPartition { @@ -139,6 +143,8 @@ v8::Local ToTopicPartitionV8Array( const rd_kafka_topic_partition_list_t *, bool include_offset); RdKafka::TopicPartition *FromV8Object(v8::Local); std::vector FromV8Array(const v8::Local &); // NOLINT +rd_kafka_topic_partition_list_t *TopicPartitionv8ArrayToTopicPartitionList( + v8::Local parameter, bool include_offset); } // namespace TopicPartition diff --git a/src/workers.cc b/src/workers.cc index 571cc1e7..3df8ece6 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -1483,6 +1483,68 @@ void AdminClientDeleteGroups::HandleErrorCallback() { callback->Call(argc, argv); } +/** + * @brief List Consumer Group Offsets in an asynchronous worker + * + * This callback will list all the consumer group offsets for the specified + * group's topic partitions. + * + */ +AdminClientListConsumerGroupOffsets::AdminClientListConsumerGroupOffsets( + Nan::Callback* callback, NodeKafka::AdminClient* client, + rd_kafka_ListConsumerGroupOffsets_t **req, + size_t req_cnt, + const bool require_stable_offsets, + const int& timeout_ms) + : ErrorAwareWorker(callback), + m_client(client), + m_req(req), + m_req_cnt(req_cnt), + m_require_stable_offsets(require_stable_offsets), + m_timeout_ms(timeout_ms) {} + +AdminClientListConsumerGroupOffsets::~AdminClientListConsumerGroupOffsets() { + if (m_req) { + rd_kafka_ListConsumerGroupOffsets_destroy_array(m_req, m_req_cnt); + free(m_req); + } + + if (this->m_event_response) { + rd_kafka_event_destroy(this->m_event_response); + } +} + +void AdminClientListConsumerGroupOffsets::Execute() { + Baton b = m_client->ListConsumerGroupOffsets(m_req, m_req_cnt, + m_require_stable_offsets, + m_timeout_ms, &m_event_response); + if (b.err() != RdKafka::ERR_NO_ERROR) { + SetErrorBaton(b); + } +} + +void AdminClientListConsumerGroupOffsets::HandleOKCallback() { + Nan::HandleScope scope; + const unsigned int argc = 2; + v8::Local argv[argc]; + + argv[0] = Nan::Null(); + argv[1] = Conversion::Admin::FromListConsumerGroupOffsetsResult( + rd_kafka_event_ListConsumerGroupOffsets_result(m_event_response)); + + callback->Call(argc, argv); +} + +void AdminClientListConsumerGroupOffsets::HandleErrorCallback() { + Nan::HandleScope scope; + + const unsigned int argc = 1; + v8::Local argv[argc] = {GetErrorObject()}; + + callback->Call(argc, argv); +} + + } // namespace Workers } // namespace NodeKafka diff --git a/src/workers.h b/src/workers.h index e103163f..94cac0ce 100644 --- a/src/workers.h +++ b/src/workers.h @@ -589,6 +589,26 @@ class AdminClientDeleteGroups : public ErrorAwareWorker { rd_kafka_event_t *m_event_response; }; +class AdminClientListConsumerGroupOffsets : public ErrorAwareWorker { + public: + AdminClientListConsumerGroupOffsets(Nan::Callback *, NodeKafka::AdminClient *, + rd_kafka_ListConsumerGroupOffsets_t **, size_t, bool, + const int &); + ~AdminClientListConsumerGroupOffsets(); + + void Execute(); + void HandleOKCallback(); + void HandleErrorCallback(); + + private: + NodeKafka::AdminClient *m_client; + rd_kafka_ListConsumerGroupOffsets_t **m_req; + size_t m_req_cnt; + const bool m_require_stable_offsets; + const int m_timeout_ms; + rd_kafka_event_t *m_event_response; +}; + } // namespace Workers } // namespace NodeKafka diff --git a/test/promisified/admin/fetch_offsets.spec.js b/test/promisified/admin/fetch_offsets.spec.js new file mode 100644 index 00000000..08e5a81a --- /dev/null +++ b/test/promisified/admin/fetch_offsets.spec.js @@ -0,0 +1,340 @@ +jest.setTimeout(30000); + +const { ErrorCodes } = require("../../../lib").KafkaJS; +const { + secureRandom, + createTopic, + createProducer, + createConsumer, + waitForMessages, + createAdmin, +} = require("../testhelpers"); + +describe("fetchOffset function", () => { + let topicName, topicName2, groupId, producer, consumer, admin; + let topicsToDelete = []; + + beforeEach(async () => { + groupId = `consumer-group-id-${secureRandom()}`; + + producer = createProducer({ + clientId: "test-producer-id", + }); + + consumer = createConsumer({ + groupId, + fromBeginning: true, + clientId: "test-consumer-id", + autoCommit: false, + }); + + admin = createAdmin({}); + + await producer.connect(); + await consumer.connect(); + + await admin.connect(); + + topicName = `test-topic-${secureRandom()}`; + topicName2 = `test-topic-${secureRandom()}`; + + topicsToDelete = []; + }); + + afterEach(async () => { + await admin.deleteTopics({ + topics: topicsToDelete, + }); + await admin.disconnect(); + producer && (await producer.disconnect()); + consumer && (await consumer.disconnect()); + }); + + it("should timeout when fetching offsets", async () => { + + await createTopic({ topic: topicName, partitions: 1 }); + topicsToDelete.push(topicName); + + await expect( + admin.fetchOffsets({ groupId, topic: topicName, timeout: 0 }) + ).rejects.toHaveProperty("code", ErrorCodes.ERR__TIMED_OUT); + }); + + it("should return correct offset after consuming messages", async () => { + + await createTopic({ topic: topicName, partitions: 1 }); + topicsToDelete.push(topicName); + + const messages = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + })); + await producer.send({ topic: topicName, messages: messages }); + + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = []; // Define messagesConsumed + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + + messagesConsumed.push(message); // Populate messagesConsumed + if (messagesConsumed.length === 5) { + await consumer.commitOffsets([ + { + topic, + partition, + offset: (parseInt(message.offset, 10) + 1).toString(), + }, + ]); + } + }, + }); + + await waitForMessages(messagesConsumed, { number: 5 }); + + // Fetch offsets after all messages have been consumed + const offsets = await admin.fetchOffsets({ + groupId: groupId, + topics: [topicName], + }); + expect(messagesConsumed.length).toEqual(5); + + const resultWithPartitionAndOffset = offsets.map(({ partitions, ...rest }) => { + const newPartitions = partitions.map(({ partition, offset }) => ({ + partition, + offset, + })); + return { ...rest, partitions: newPartitions }; + }); + + expect(resultWithPartitionAndOffset).toEqual([ + { + topic: topicName, + partitions: [{ partition: 0, offset: "5" }], + }, + ]); + }); + + it("should return correct offset after consuming messages with specific partitions", async () => { + + await createTopic({ topic: topicName, partitions: 1 }); + topicsToDelete.push(topicName); + + const messages = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + })); + await producer.send({ topic: topicName, messages: messages }); + + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = []; // Define messagesConsumed + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + + messagesConsumed.push(message); // Populate messagesConsumed + if (messagesConsumed.length === 5) { + await consumer.commitOffsets([ + { + topic, + partition, + offset: (parseInt(message.offset, 10) + 1).toString(), + }, + ]); + } + }, + }); + + await waitForMessages(messagesConsumed, { number: 5 }); + + // Fetch offsets after all messages have been consumed + const offsets = await admin.fetchOffsets({ + groupId, + topics: [{ topic: topicName, partitions: [0] }], + }); + + const resultWithPartitionAndOffset = offsets.map(({ partitions, ...rest }) => { + const newPartitions = partitions.map(({ partition, offset }) => ({ + partition, + offset, + })); + return { ...rest, partitions: newPartitions }; + }); + + expect(messagesConsumed.length).toEqual(5); + expect(resultWithPartitionAndOffset).toEqual([ + { + topic: topicName, + partitions: [{ partition: 0, offset: "5" }], + }, + ]); + }); + + it("should handle unset or null topics", async () => { + + await createTopic({ topic: topicName, partitions: 1 }); + topicsToDelete.push(topicName); + + const messages = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + })); + await producer.send({ topic: topicName, messages: messages }); + + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = []; // Define messagesConsumed + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + messagesConsumed.push(message); // Populate messagesConsumed + if (messagesConsumed.length === 5) { + await consumer.commitOffsets([ + { + topic, + partition, + offset: (parseInt(message.offset, 10) + 1).toString(), + }, + ]); + } + }, + }); + + await waitForMessages(messagesConsumed, { number: 5 }); + + // Fetch offsets after all messages have been consumed + const offsets = await admin.fetchOffsets({ + groupId, + }); + + const resultWithPartitionAndOffset = offsets.map(({ partitions, ...rest }) => { + const newPartitions = partitions.map(({ partition, offset }) => ({ + partition, + offset, + })); + return { ...rest, partitions: newPartitions }; + }); + expect(messagesConsumed.length).toEqual(5); + expect(resultWithPartitionAndOffset).toEqual([ + { + topic: topicName, + partitions: [{ partition: 0, offset: "5" }], + }, + ]); + + const offsets2 = await admin.fetchOffsets({ + groupId, + topics: null, + }); + + const resultWithPartitionAndOffset2 = offsets2.map(({ partitions, ...rest }) => { + const newPartitions = partitions.map(({ partition, offset }) => ({ + partition, + offset, + })); + return { ...rest, partitions: newPartitions }; + }); + expect(resultWithPartitionAndOffset2).toEqual([ + { + topic: topicName, + partitions: [{ partition: 0, offset: "5" }], + }, + ]); + }); + + it("should handle multiple topics each with more than 1 partition", async () => { + + await createTopic({ topic: topicName, partitions: 2 }); + await createTopic({ topic: topicName2, partitions: 2 }); + topicsToDelete.push(topicName, topicName2); + + await consumer.subscribe({ + topics: [topicName, topicName2], + }); + + const messages = Array.from({ length: 10 }, (_, i) => ({ + value: `message${i}`, + partition: i % 2, // alternates between 0 and 1 for even and odd i + })); + + await producer.send({ topic: topicName, messages }); + await producer.send({ topic: topicName2, messages }); + + let messagesConsumed = []; // Define messagesConsumed + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + + messagesConsumed.push(message); // Populate messagesConsumed + + // Check the offset of the message and commit only if the offset is 4 + if (parseInt(message.offset, 10) === 4) { + await consumer.commitOffsets([ + { + topic, + partition, + offset: (parseInt(message.offset, 10) + 1).toString(), + }, + ]); + } + }, + }); + + await waitForMessages(messagesConsumed, { number: 20 }); + + // Fetch offsets with multiple topics each with more than 1 partition + const offsets = await admin.fetchOffsets({ + groupId, + }); + + // Sort the actual offsets array + const sortedOffsets = offsets.sort((a, b) => + a.topic.localeCompare(b.topic) + ); + + // remove leaderEpoch from the partitions + const resultWithPartitionAndOffset = sortedOffsets.map(({ partitions, ...rest }) => { + const newPartitions = partitions.map(({ partition, offset }) => ({ + partition, + offset, + })); + return { ...rest, partitions: newPartitions }; + }); + + expect(resultWithPartitionAndOffset.length).toEqual(2); + + resultWithPartitionAndOffset.forEach((item) => { + expect(item.partitions.length).toEqual(2); + }); + + expect(resultWithPartitionAndOffset).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + topic: topicName, + partitions: expect.arrayContaining([ + expect.objectContaining({ + partition: 0, + offset: "5", + }), + expect.objectContaining({ + partition: 1, + offset: "5", + }), + ]), + }), + expect.objectContaining({ + topic: topicName2, + partitions: expect.arrayContaining([ + expect.objectContaining({ + partition: 0, + offset: "5", + }), + expect.objectContaining({ + partition: 1, + offset: "5", + }), + ]), + }), + ]) + ); + }); +}); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 259fe07f..8ad26c9a 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -311,6 +311,10 @@ export interface OffsetsByTopicPartition { topics: TopicOffsets[] } +export type FetchOffsetsPartition = PartitionOffset & { metadata: string | null, leaderEpoch: number | null, error?: LibrdKafkaError }; + +export type TopicInput = string[] | { topic: string; partitions: number[] }[] + export type Consumer = Client & { subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise stop(): Promise @@ -369,4 +373,10 @@ export type Admin = { groups: string[], options?: { timeout?: number, includeAuthorizedOperations?: boolean }): Promise deleteGroups(groupIds: string[], options?: { timeout?: number }): Promise + fetchOffsets(options: { + groupId: string, + topics?: TopicInput, + timeout?: number, + requireStableOffsets?: boolean }): + Promise> } diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts index 6a46da26..626fc8e9 100644 --- a/types/rdkafka.d.ts +++ b/types/rdkafka.d.ts @@ -11,7 +11,6 @@ import { export * from './config'; export * from './errors'; -import { Kafka } from './kafkajs'; import * as errors from './errors'; export interface LibrdKafkaError { @@ -416,6 +415,17 @@ export type DeleteGroupsResult = { error?: LibrdKafkaError } +export type ListGroupOffsets = { + groupId: string + partitions?: TopicPartition[] +} + +export type GroupResults = { + groupId: string + error?: LibrdKafkaError + partitions: TopicPartitionOffsetAndMetadata[] +} + export interface IAdminClient { createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void; createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void; @@ -443,6 +453,10 @@ export interface IAdminClient { options?: { timeout?: number }, cb?: (err: LibrdKafkaError, result: DeleteGroupsResult[]) => any): void; + listConsumerGroupOffsets(listGroupOffsets : ListGroupOffsets[], + options?: { timeout?: number, requireStableOffsets?: boolean }, + cb?: (err: LibrdKafkaError, result: GroupResults[]) => any): void; + disconnect(): void; }