From 0f19e68786fdc78d15edcc4091b4e78891cca492 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 2 Jun 2025 21:55:28 +0530 Subject: [PATCH 1/6] Implemented KIP 848 changes --- examples/kafkajs/admin/list-groups.js | 13 ++++++++- lib/admin.js | 2 ++ lib/kafkajs/_admin.js | 4 ++- src/admin.cc | 31 ++++++++++++++++++-- src/admin.h | 2 ++ src/common.cc | 33 ++++++++++++++++++++++ src/common.h | 4 +++ src/workers.cc | 5 ++++ src/workers.h | 4 +++ test/promisified/admin/list_groups.spec.js | 5 +++- types/kafkajs.d.ts | 7 +++-- types/rdkafka.d.ts | 3 +- 12 files changed, 105 insertions(+), 8 deletions(-) diff --git a/examples/kafkajs/admin/list-groups.js b/examples/kafkajs/admin/list-groups.js index 3e287a4c..becde8b7 100644 --- a/examples/kafkajs/admin/list-groups.js +++ b/examples/kafkajs/admin/list-groups.js @@ -20,13 +20,20 @@ async function adminStart() { short: 's', multiple: true, default: [], - } + }, + 'types': { + type: 'string', + short: 't', + multiple: true, + default: [], + }, }, }); let { 'bootstrap-servers': bootstrapServers, states: matchConsumerGroupStates, + types: matchConsumerGroupTypes, timeout, } = args.values; @@ -36,6 +43,9 @@ async function adminStart() { matchConsumerGroupStates = matchConsumerGroupStates.map( state => ConsumerGroupStates[state]); + matchConsumerGroupTypes = matchConsumerGroupTypes.map( + type => ConsumerGroupTypes[type]); + const kafka = new Kafka({ kafkaJS: { brokers: [bootstrapServers], @@ -55,6 +65,7 @@ async function adminStart() { console.log(`\tType: ${group.protocolType}`); console.log(`\tIs simple: ${group.isSimpleConsumerGroup}`); console.log(`\tState: ${group.state}`); + console.log(`\tType: ${group.type}`); } } catch(err) { console.log('List topics failed', err); diff --git a/lib/admin.js b/lib/admin.js index 5bd8d25d..9dc91887 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -373,6 +373,8 @@ AdminClient.prototype.createPartitions = function (topic, totalPartitions, timeo * May be unset (default: 5000). * @param {Array?} options.matchConsumerGroupStates - * A list of consumer group states to match. May be unset, fetches all states (default: unset). + * @param {Array?} options.matchConsumerGroupTypes - + * A list of consumer group types to match. May be unset, fetches all types (default: unset). * @param {function} cb - The callback to be executed when finished. * @example * // Valid ways to call this function: diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index 89d22e7d..ed713546 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -387,7 +387,9 @@ class Admin { * May be unset (default: 5000). * @param {Array?} options.matchConsumerGroupStates - * A list of consumer group states to match. May be unset, fetches all states (default: unset). - * @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array }>} + * @param {Array?} options.matchConsumerGroupTypes - + * A list of consumer group types to match. May be unset, fetches all types (default: unset). + * @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array }>} * Resolves with the list of consumer groups, rejects on error. */ async listGroups(options = {}) { diff --git a/src/admin.cc b/src/admin.cc index 568cf710..64fd8303 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -483,7 +483,9 @@ Baton AdminClient::CreatePartitions( Baton AdminClient::ListGroups( bool is_match_states_set, - std::vector &match_states, int timeout_ms, + std::vector &match_states, + bool is_match_types_set, + std::vector &match_types, int timeout_ms, /* out */ rd_kafka_event_t **event_response) { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -515,6 +517,15 @@ Baton AdminClient::ListGroups( } } + if (is_match_types_set) { + rd_kafka_error_t *error = + rd_kafka_AdminOptions_set_match_consumer_group_types( + options, &match_types[0], match_types.size()); + if (error) { + return Baton::BatonFromErrorAndDestroy(error); + } + } + // Create queue just for this operation. rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr()); @@ -1195,9 +1206,25 @@ NAN_METHOD(AdminClient::NodeListGroups) { } } + std::vector match_types; + v8::Local match_consumer_group_types_key = + Nan::New("matchConsumerGroupTypes").ToLocalChecked(); + bool is_match_types_set = + Nan::Has(config, match_consumer_group_types_key).FromMaybe(false); + v8::Local match_types_array = Nan::New(); + + if (is_match_types_set) { + match_types_array = GetParameter>( + config, "matchConsumerGroupTypes", match_types_array); + if (match_types_array->Length()) { + match_types = Conversion::Admin::FromV8GroupTypeArray( + match_types_array); + } + } + // Queue the work. Nan::AsyncQueueWorker(new Workers::AdminClientListGroups( - callback, client, is_match_states_set, match_states, timeout_ms)); + callback, client, is_match_states_set, match_states, is_match_types_set, match_types, timeout_ms)); } /** diff --git a/src/admin.h b/src/admin.h index 9a269134..1557c263 100644 --- a/src/admin.h +++ b/src/admin.h @@ -54,6 +54,8 @@ class AdminClient : public Connection { // Baton DescribeConfig(rd_kafka_NewTopic_t* topic, int timeout_ms); Baton ListGroups(bool is_match_states_set, std::vector& match_states, + bool is_match_types_set, + std::vector& match_types, int timeout_ms, rd_kafka_event_t** event_response); Baton DescribeGroups(std::vector& groups, diff --git a/src/common.cc b/src/common.cc index bbd5be9e..e44abcf0 100644 --- a/src/common.cc +++ b/src/common.cc @@ -908,6 +908,35 @@ std::vector FromV8GroupStateArray( return returnVec; } +/** + * @brief Converts a v8 array of group types into a vector of + * rd_kafka_consumer_group_type_t. + */ +std::vector FromV8GroupTypeArray( + v8::Local array) { + v8::Local parameter = array.As(); + std::vector returnVec; + if (parameter->Length() >= 1) { + for (unsigned int i = 0; i < parameter->Length(); i++) { + v8::Local v; + if (!Nan::Get(parameter, i).ToLocal(&v)) { + continue; + } + Nan::Maybe maybeT = Nan::To(v); + if (maybeT.IsNothing()) { + continue; + } + int64_t type_number = maybeT.FromJust(); + if (type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) { + continue; + } + returnVec.push_back( + static_cast(type_number)); + } + } + return returnVec; +} + /** * @brief Converts a rd_kafka_ListConsumerGroups_result_t* into a v8 object. */ @@ -920,6 +949,7 @@ v8::Local FromListConsumerGroupsResult( protocolType: string, isSimpleConsumerGroup: boolean, state: ConsumerGroupState (internally a number) + type: ConsumerGroupType (internally a number) }[], errors: LibrdKafkaError[] } @@ -957,6 +987,9 @@ v8::Local FromListConsumerGroupsResult( Nan::Set(groupObject, Nan::New("state").ToLocalChecked(), Nan::New(rd_kafka_ConsumerGroupListing_state(group))); + Nan::Set(groupObject, Nan::New("type").ToLocalChecked(), + Nan::New(rd_kafka_ConsumerGroupListing_type(group))); + Nan::Set(groups, i, groupObject); } diff --git a/src/common.h b/src/common.h index 121a5cda..ea557b3c 100644 --- a/src/common.h +++ b/src/common.h @@ -116,6 +116,10 @@ rd_kafka_NewTopic_t **FromV8TopicObjectArray(v8::Local); std::vector FromV8GroupStateArray( v8::Local); +// ListGroups: request +std::vector FromV8GroupTypeArray( + v8::Local array); + // ListGroups: response v8::Local FromListConsumerGroupsResult( const rd_kafka_ListConsumerGroups_result_t *); diff --git a/src/workers.cc b/src/workers.cc index 4655458d..76fdca6b 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -1317,11 +1317,15 @@ void AdminClientCreatePartitions::HandleErrorCallback() { AdminClientListGroups::AdminClientListGroups( Nan::Callback* callback, AdminClient* client, bool is_match_states_set, std::vector& match_states, + bool is_match_types_set, + std::vector& match_types, const int& timeout_ms) : ErrorAwareWorker(callback), m_client(client), m_is_match_states_set(is_match_states_set), m_match_states(match_states), + m_is_match_types_set(is_match_types_set), + m_match_types(match_types), m_timeout_ms(timeout_ms) {} AdminClientListGroups::~AdminClientListGroups() { @@ -1332,6 +1336,7 @@ AdminClientListGroups::~AdminClientListGroups() { void AdminClientListGroups::Execute() { Baton b = m_client->ListGroups(m_is_match_states_set, m_match_states, + m_is_match_types_set, m_match_types, m_timeout_ms, &m_event_response); if (b.err() != RdKafka::ERR_NO_ERROR) { SetErrorBaton(b); diff --git a/src/workers.h b/src/workers.h index b9583823..4f8a00ad 100644 --- a/src/workers.h +++ b/src/workers.h @@ -532,6 +532,8 @@ class AdminClientListGroups : public ErrorAwareWorker { public: AdminClientListGroups(Nan::Callback *, NodeKafka::AdminClient *, bool, std::vector &, + bool, + std::vector &, const int &); ~AdminClientListGroups(); @@ -543,6 +545,8 @@ class AdminClientListGroups : public ErrorAwareWorker { NodeKafka::AdminClient *m_client; const bool m_is_match_states_set; std::vector m_match_states; + const bool m_is_match_types_set; + std::vector m_match_types; const int m_timeout_ms; rd_kafka_event_t *m_event_response; }; diff --git a/test/promisified/admin/list_groups.spec.js b/test/promisified/admin/list_groups.spec.js index 11e7a271..bafdbfdb 100644 --- a/test/promisified/admin/list_groups.spec.js +++ b/test/promisified/admin/list_groups.spec.js @@ -7,7 +7,7 @@ const { waitFor, createAdmin, } = require('../testhelpers'); -const { ConsumerGroupStates, ErrorCodes } = require('../../../lib').KafkaJS; +const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes } = require('../../../lib').KafkaJS; describe('Admin > listGroups', () => { let topicName, groupId, consumer, admin; @@ -50,6 +50,7 @@ describe('Admin > listGroups', () => { await admin.connect(); let listGroupsResult = await admin.listGroups({ matchConsumerGroupStates: undefined, + matchConsumerGroupTypes: undefined, }); expect(listGroupsResult.errors).toEqual([]); expect(listGroupsResult.groups).toEqual( @@ -59,6 +60,7 @@ describe('Admin > listGroups', () => { isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.STABLE, + type: ConsumerGroupTypes.CLASSIC, }), ]) ); @@ -76,6 +78,7 @@ describe('Admin > listGroups', () => { isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.EMPTY, + type: ConsumerGroupTypes.CLASSIC, }), ]) ); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index d78ba6ef..56832092 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -14,7 +14,8 @@ import { Node, AclOperationTypes, Uuid, - IsolationLevel + IsolationLevel, + ConsumerGroupTypes } from './rdkafka' import { @@ -34,6 +35,7 @@ export { AclOperationTypes, Uuid, IsolationLevel, + ConsumerGroupTypes } from './rdkafka' export interface OauthbearerProviderResponse { @@ -413,7 +415,8 @@ export type Admin = { listTopics(options?: { timeout?: number }): Promise listGroups(options?: { timeout?: number, - matchConsumerGroupStates?: ConsumerGroupStates[] + matchConsumerGroupStates?: ConsumerGroupStates[], + matchConsumerGroupTypes?: ConsumerGroupTypes[] }): Promise<{ groups: GroupOverview[], errors: LibrdKafkaError[] }> describeGroups( groups: string[], diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts index 99a16183..5619d403 100644 --- a/types/rdkafka.d.ts +++ b/types/rdkafka.d.ts @@ -363,6 +363,7 @@ export interface GroupOverview { protocolType: string; isSimpleConsumerGroup: boolean; state: ConsumerGroupStates; + type: ConsumerGroupTypes; } export enum AclOperationTypes { @@ -504,7 +505,7 @@ export interface IAdminClient { listTopics(options?: { timeout?: number }, cb?: (err: LibrdKafkaError, topics: string[]) => any): void; listGroups(cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void; - listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] }, + listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[], matchConsumerGroupTypes?: ConsumerGroupTypes[] }, cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void; describeGroups(groupIds: string[], cb?: (err: LibrdKafkaError, result: GroupDescriptions) => any): void; From 8f084656851d15aa382446bef0ee3b360bff4385 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 10 Jun 2025 22:00:45 +0530 Subject: [PATCH 2/6] style format --- src/admin.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/admin.cc b/src/admin.cc index 64fd8303..5bccccac 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -484,7 +484,7 @@ Baton AdminClient::CreatePartitions( Baton AdminClient::ListGroups( bool is_match_states_set, std::vector &match_states, - bool is_match_types_set, + bool is_match_types_set, std::vector &match_types, int timeout_ms, /* out */ rd_kafka_event_t **event_response) { if (!IsConnected()) { @@ -1224,7 +1224,8 @@ NAN_METHOD(AdminClient::NodeListGroups) { // Queue the work. Nan::AsyncQueueWorker(new Workers::AdminClientListGroups( - callback, client, is_match_states_set, match_states, is_match_types_set, match_types, timeout_ms)); + callback, client, is_match_states_set, match_states, is_match_types_set, + match_types, timeout_ms)); } /** From f3098040665667707787bbac0a00ae4cc06eb70f Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 11 Jun 2025 00:16:10 +0530 Subject: [PATCH 3/6] Expect apt groups --- test/promisified/admin/list_groups.spec.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/promisified/admin/list_groups.spec.js b/test/promisified/admin/list_groups.spec.js index bafdbfdb..3df93328 100644 --- a/test/promisified/admin/list_groups.spec.js +++ b/test/promisified/admin/list_groups.spec.js @@ -1,6 +1,7 @@ jest.setTimeout(30000); const { + testConsumerGroupProtocolClassic, createConsumer, secureRandom, createTopic, @@ -78,7 +79,7 @@ describe('Admin > listGroups', () => { isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.EMPTY, - type: ConsumerGroupTypes.CLASSIC, + type: testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER, }), ]) ); From 3c49ea1f50ffa36f7be5e21aa9af1ccf8064377c Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 11 Jun 2025 10:39:18 +0530 Subject: [PATCH 4/6] updated list group tests --- test/promisified/admin/list_groups.spec.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/promisified/admin/list_groups.spec.js b/test/promisified/admin/list_groups.spec.js index 3df93328..35b0b5fd 100644 --- a/test/promisified/admin/list_groups.spec.js +++ b/test/promisified/admin/list_groups.spec.js @@ -20,6 +20,7 @@ describe('Admin > listGroups', () => { consumer = createConsumer({ groupId, fromBeginning: true, + autoCommit: true, }); await createTopic({ topic: topicName, partitions: 2 }); @@ -47,11 +48,12 @@ describe('Admin > listGroups', () => { await consumer.run({ eachMessage: async () => {} }); await waitFor(() => consumer.assignment().length > 0, () => null, 1000); + const groupType = testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER; await admin.connect(); let listGroupsResult = await admin.listGroups({ - matchConsumerGroupStates: undefined, - matchConsumerGroupTypes: undefined, + matchConsumerGroupStates: [ConsumerGroupStates.STABLE,], + matchConsumerGroupTypes: [groupType,], }); expect(listGroupsResult.errors).toEqual([]); expect(listGroupsResult.groups).toEqual( @@ -61,7 +63,7 @@ describe('Admin > listGroups', () => { isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.STABLE, - type: ConsumerGroupTypes.CLASSIC, + type: groupType, }), ]) ); @@ -79,7 +81,7 @@ describe('Admin > listGroups', () => { isSimpleConsumerGroup: false, protocolType: 'consumer', state: ConsumerGroupStates.EMPTY, - type: testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER, + type: groupType, }), ]) ); From 1da1b52ffe5e7186c1243ca43e105245be10b54c Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 1 Jul 2025 20:02:19 +0530 Subject: [PATCH 5/6] Requested changes --- examples/kafkajs/admin/list-groups.js | 5 +++-- src/common.cc | 2 +- test/promisified/admin/list_groups.spec.js | 13 +++++++++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/examples/kafkajs/admin/list-groups.js b/examples/kafkajs/admin/list-groups.js index becde8b7..8cc1f6d8 100644 --- a/examples/kafkajs/admin/list-groups.js +++ b/examples/kafkajs/admin/list-groups.js @@ -1,5 +1,5 @@ // require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS. -const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS; +const { Kafka, ConsumerGroupStates, ConsumerGroupTypes } = require('@confluentinc/kafka-javascript').KafkaJS; const { parseArgs } = require('node:util'); async function adminStart() { @@ -58,7 +58,8 @@ async function adminStart() { try { const groupOverview = await admin.listGroups({ timeout, - matchConsumerGroupStates + matchConsumerGroupStates, + matchConsumerGroupTypes }); for (const group of groupOverview.groups) { console.log(`Group id: ${group.groupId}`); diff --git a/src/common.cc b/src/common.cc index e44abcf0..05731bab 100644 --- a/src/common.cc +++ b/src/common.cc @@ -927,7 +927,7 @@ std::vector FromV8GroupTypeArray( continue; } int64_t type_number = maybeT.FromJust(); - if (type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) { + if (type_number < 0 && type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) { continue; } returnVec.push_back( diff --git a/test/promisified/admin/list_groups.spec.js b/test/promisified/admin/list_groups.spec.js index 35b0b5fd..1718567a 100644 --- a/test/promisified/admin/list_groups.spec.js +++ b/test/promisified/admin/list_groups.spec.js @@ -52,8 +52,8 @@ describe('Admin > listGroups', () => { await admin.connect(); let listGroupsResult = await admin.listGroups({ - matchConsumerGroupStates: [ConsumerGroupStates.STABLE,], - matchConsumerGroupTypes: [groupType,], + matchConsumerGroupStates: undefined, + matchConsumerGroupTypes: undefined, }); expect(listGroupsResult.errors).toEqual([]); expect(listGroupsResult.groups).toEqual( @@ -68,6 +68,15 @@ describe('Admin > listGroups', () => { ]) ); + // Consumer group should not show up when filtering for opposite group type. + let oppositeGroupType = testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CONSUMER : ConsumerGroupTypes.CLASSIC; + listGroupsResult = await admin.listGroups({ + matchConsumerGroupTypes: [ oppositeGroupType ], + }); + expect(listGroupsResult.errors).toEqual([]); + expect(listGroupsResult.groups.map(group => group.groupId)).not.toContain(groupId); + + // Disconnect the consumer to make the group EMPTY. await consumer.disconnect(); consumer = null; From 2577872eb8cb2d1710f2feb05853ebf5167cf105 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 1 Jul 2025 20:19:30 +0530 Subject: [PATCH 6/6] Changelog changes and minor fix --- .semaphore/semaphore.yml | 2 +- CHANGELOG.md | 3 ++- src/common.cc | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index cb7fb06f..d7b1125c 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -171,7 +171,7 @@ blocks: - export BUILD_LIBRDKAFKA=0 - npm run install-from-source jobs: - - name: "Performance Test (Classic Protocol)" + - name: "Performance Test" commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' - docker compose -f test/docker/docker-compose.yml up -d && sleep 30 diff --git a/CHANGELOG.md b/CHANGELOG.md index 65e74b21..beadc7a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ v1.4.0 is a feature release. It is supported for all usage. ## Enhancements 1. References librdkafka v2.11.0. Refer to the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for more information. -2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined. +2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined (#329) +3. [KIP-848] Admin API for listing consumer groups now has an optional filter to return only groups of given types (#328) # confluent-kafka-javascript v1.3.2 diff --git a/src/common.cc b/src/common.cc index 05731bab..d4bd87c5 100644 --- a/src/common.cc +++ b/src/common.cc @@ -927,7 +927,7 @@ std::vector FromV8GroupTypeArray( continue; } int64_t type_number = maybeT.FromJust(); - if (type_number < 0 && type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) { + if (type_number < 0 || type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) { continue; } returnVec.push_back(