-
Notifications
You must be signed in to change notification settings - Fork 23
List consumer group offsets #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 115 commits
63f05de
4445321
28aff0a
0b3a617
3c4a907
76639a6
6ff253b
925fd21
5b769a8
195a922
380ee83
254765b
3ba54a1
ffe16f7
a98699a
160f762
2a99288
28e28e3
ffbbafd
42004b1
f949d88
7f24913
fbbf9f2
8c11d0e
98ba984
4f0f25b
ba0603b
fdf56ef
5ecf261
b0e4372
ac0bece
72305d2
f4b4aaf
31e325c
89e8227
f34e086
6c919ff
69daca9
4f2d255
14d33b6
9fe9571
1dcfe39
b69e87f
d73a14d
bc059a4
a85cda0
4b9b340
3aab3c2
2bbb2af
f724ed8
a348985
15fff05
ffae694
aceae76
eddaabc
8bd4940
3d54a18
34302ba
ad06919
9b88c91
5424a4a
3ca8437
d2b7227
546df33
cd0887a
5c637c0
cbc69be
ecdd836
1b77019
ac1367c
71c4aeb
ffbffe8
5adb821
b6379d3
49e12c6
5356f81
a8e5b39
8b41c1e
69b28a5
a8e3914
acc94a4
12cf126
12e33c9
98f12f8
9c7f096
63a949f
ad0ff8c
52944ea
228f64b
3431a92
5cc2dee
73ca334
4c7c8df
fad64ce
a86c3b4
b3712ba
1501a64
29bc526
4e42726
e3de7e4
0f3a167
1bde73a
ce5a4e9
beafa7c
a7c5aca
99b0252
f6f5b54
603ca2e
2f86c63
a758b90
7085111
b2e28fa
92f262d
2d90d5b
c435397
7f6dd40
1c1cfe8
3c494e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| // require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS. | ||
|
||
| const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS; | ||
|
||
|
|
||
| async function fetchOffsets() { | ||
| const args = process.argv.slice(2); | ||
| if (args.length < 1) { | ||
| console.error("Usage: node fetchOffsets.js <group_id> [topic [partition ...] ...]"); | ||
| process.exit(1); | ||
| } | ||
|
|
||
| const [groupId, ...rest] = args; | ||
|
|
||
| const kafka = new Kafka({ | ||
| kafkaJS: { | ||
| brokers: ["localhost:9092"], | ||
| }, | ||
| }); | ||
|
|
||
| const admin = kafka.admin(); | ||
| await admin.connect(); | ||
|
|
||
| try { | ||
| // Parse topics and partitions from remaining arguments | ||
| const topicInput = parseTopicsAndPartitions(rest); | ||
|
|
||
| // Fetch offsets for the specified consumer group | ||
| const offsets = await admin.fetchOffsets({ | ||
| groupId: groupId, | ||
| topics: topicInput, | ||
| }); | ||
|
|
||
| console.log(`Offsets for Consumer Group "${groupId}":`, JSON.stringify(offsets, null, 2)); | ||
| } catch (err) { | ||
| console.error("Error fetching consumer group offsets:", err); | ||
| } finally { | ||
| await admin.disconnect(); | ||
| } | ||
| } | ||
|
|
||
| // Helper function to parse topics and partitions from arguments | ||
| function parseTopicsAndPartitions(args) { | ||
| if (args.length === 0) return undefined; | ||
|
|
||
| const topicInput = []; | ||
| let i = 0; | ||
|
|
||
| while (i < args.length) { | ||
| const topic = args[i]; | ||
| i++; | ||
|
|
||
| const partitions = []; | ||
| while (i < args.length && !isNaN(args[i])) { | ||
| partitions.push(Number(args[i])); | ||
| i++; | ||
| } | ||
|
|
||
| // Add topic with partitions (or an empty array if no partitions specified) | ||
| if (partitions.length > 0) { | ||
| topicInput.push({ topic, partitions }); | ||
| } else { | ||
| topicInput.push(topic); // Add as a string if no partitions specified | ||
| } | ||
| } | ||
|
|
||
| return topicInput; | ||
| } | ||
|
|
||
| fetchOffsets(); | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -417,6 +417,118 @@ class Admin { | |||||
| }); | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Fetch the offsets for topic partition(s) for consumer group(s). | ||||||
| * | ||||||
| * @param {string} options.groupId - The group ID to fetch offsets for. | ||||||
| * @param {import("../../types/kafkajs").TopicInput} options.topics - The topics to fetch offsets for. | ||||||
| * @param {boolean} options.resolveOffsets - not yet implemented | ||||||
| * @param {number?} options.timeout - The request timeout in milliseconds. | ||||||
| * May be unset (default: 5000) | ||||||
| * @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets | ||||||
| * (transaction-committed). (default: false) | ||||||
| * | ||||||
| * @returns {Promise<Array<topic: string, partitions: import('../../types/kafkajs').FetchOffsetsPartition>>} | ||||||
| */ | ||||||
| async fetchOffsets(options = {}) { | ||||||
| if (this.#state !== AdminState.CONNECTED) { | ||||||
| throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE }); | ||||||
| } | ||||||
|
|
||||||
| if (Object.hasOwn(options, "resolveOffsets")) { | ||||||
| throw new error.KafkaJSError("resolveOffsets is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED }); | ||||||
| } | ||||||
|
|
||||||
| const { groupId, topics } = options; | ||||||
|
|
||||||
| if (!groupId) { | ||||||
| throw new error.KafkaJSError("groupId is required.", { code: error.ErrorCodes.ERR__INVALID_ARG }); | ||||||
| } | ||||||
|
|
||||||
| let partitions = null; | ||||||
| let originalTopics = null; | ||||||
|
|
||||||
| /* | ||||||
| If the input is a list of topic string, the user expects us to | ||||||
| fetch offsets for all all partitions of all the input topics. In | ||||||
| librdkafka, we can only fetch offsets by topic partitions, or else, | ||||||
| we can fetch all of them. Thus, we must fetch offsets for all topic | ||||||
| partitions (by settings partitions to null) and filter by the topic strings later. | ||||||
| */ | ||||||
| if (topics && Array.isArray(topics)) { | ||||||
| if (typeof topics[0] === 'string') { | ||||||
| originalTopics = topics; | ||||||
| partitions = null; | ||||||
| } else if (typeof topics[0] === 'object' && Array.isArray(topics[0].partitions)) { | ||||||
| partitions = topics.flatMap(topic => topic.partitions.map(partition => ({ | ||||||
| topic: topic.topic, | ||||||
| partition | ||||||
| }))); | ||||||
| } else { | ||||||
| throw new error.KafkaJSError("Invalid topics format.", { code: error.ErrorCodes.ERR__INVALID_ARG }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| const listGroupOffsets = [{ | ||||||
| groupId, | ||||||
| partitions | ||||||
| }]; | ||||||
|
|
||||||
|
|
||||||
| return new Promise((resolve, reject) => { | ||||||
| this.#internalClient.listConsumerGroupOffsets(listGroupOffsets, options, (err, offsets) => { | ||||||
| if (err) { | ||||||
| reject(createKafkaJsErrorFromLibRdKafkaError(err)); | ||||||
| } else { | ||||||
|
|
||||||
| /** | ||||||
| * Offsets is an array of group results, each containing a group id, | ||||||
| * an error and an array of partitions. | ||||||
| * We need to convert it to the required format of an array of topics, each | ||||||
| * containing an array of partitions. | ||||||
| */ | ||||||
| const topicPartitionMap = new Map(); | ||||||
|
|
||||||
| offsets.forEach(groupResult => { | ||||||
|
||||||
| if (groupResult.error) { | ||||||
| reject(createKafkaJsErrorFromLibRdKafkaError(groupResult.error)); | ||||||
| return; | ||||||
| } | ||||||
|
|
||||||
| // Traverse the partitions and group them by topic | ||||||
| groupResult.partitions.forEach(partitionObj => { | ||||||
| const { topic, partition, offset, leaderEpoch, metadata, error } = partitionObj; | ||||||
| const fetchOffsetsPartition = { | ||||||
| partition: partition, | ||||||
| offset: String(offset), | ||||||
| metadata: metadata || null, | ||||||
| leaderEpoch: leaderEpoch || null, | ||||||
| error: error || null | ||||||
| }; | ||||||
|
|
||||||
| // Group partitions by topic | ||||||
| if (!topicPartitionMap.has(topic)) { | ||||||
| topicPartitionMap.set(topic, []); | ||||||
| } | ||||||
| topicPartitionMap.get(topic).push(fetchOffsetsPartition); | ||||||
| }); | ||||||
| }); | ||||||
|
|
||||||
| // Convert the map back to the desired array format | ||||||
| let convertedOffsets = Array.from(topicPartitionMap, ([topic, partitions]) => ({ | ||||||
| topic, | ||||||
| partitions | ||||||
| })); | ||||||
|
|
||||||
| if (originalTopics !== null) { | ||||||
| convertedOffsets = convertedOffsets.filter(convertedOffsets => originalTopics.includes(convertedOffsets.topic)); | ||||||
|
||||||
| convertedOffsets = convertedOffsets.filter(convertedOffsets => originalTopics.includes(convertedOffsets.topic)); | |
| convertedOffsets = convertedOffsets.filter(convertedOffset => originalTopics.includes(convertedOffset.topic)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.3.1 isn't released yet so remove this section and add "1. Fixes an issue where headers were not passed correctly to the
eachBatchcallback (#130)." within this list only. Call it "Enhancements" instead of features.