Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New admin method for fetching group offsets for multiple topics #992

Merged
merged 17 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,32 @@ await admin.fetchTopicOffsetsByTimestamp(topic, timestamp)

## <a name="fetch-offsets"></a> Fetch consumer group offsets

`fetchOffsets` returns the consumer group offset for a topic.
`fetchOffsets` returns the consumer group offset for a list of topics.

```javascript
await admin.fetchOffsets({ groupId, topic, })
await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] })
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// {
// topic: 'topic1',
// partitions: [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ],
// },
// {
// topic: 'topic2',
// partitions: [
// { partition: 0, offset: '1234' },
// { partition: 1, offset: '4567' },
// ],
// },
// ]
```

Omit `topics` altogether if you want to get the consumer group offsets for all topics with committed offsets.

Include the optional `resolveOffsets` flag to resolve the offsets without having to start a consumer, useful when fetching directly after calling [resetOffets](#a-name-reset-offsets-a-reset-consumer-group-offsets):

```javascript
Expand Down
78 changes: 71 additions & 7 deletions src/admin/__tests__/fetchOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ const {
generateMessages,
testIfKafkaAtLeast_0_11,
} = require('testHelpers')
const { KafkaJSNonRetriableError } = require('../../errors')

describe('Admin', () => {
let admin, cluster, groupId, logger, topicName
let admin, cluster, groupId, logger, topicName, anotherTopicName, yetAnotherTopicName

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
anotherTopicName = `another-topic-${secureRandom()}`
yetAnotherTopicName = `yet-another-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`

await createTopic({ topic: topicName })
Expand All @@ -34,19 +37,25 @@ describe('Admin', () => {

describe('fetchOffsets', () => {
test('throws an error if the groupId is invalid', async () => {
await expect(admin.fetchOffsets({ groupId: null })).rejects.toHaveProperty(
'message',
await expect(admin.fetchOffsets({ groupId: null })).rejects.toThrow(
KafkaJSNonRetriableError,
'Invalid groupId null'
)
})

test('throws an error if the topic name is not a valid string', async () => {
await expect(admin.fetchOffsets({ groupId: 'groupId', topic: null })).rejects.toHaveProperty(
'message',
'Invalid topic null'
test('throws an error if the topics argument is not a valid list', async () => {
await expect(admin.fetchOffsets({ groupId: 'groupId', topics: topicName })).rejects.toThrow(
KafkaJSNonRetriableError,
'Expected topic or topics array to be set'
)
})

test('throws an error if both topic and topics are set', async () => {
await expect(
admin.fetchOffsets({ groupId: 'groupId', topic: topicName, topics: [topicName] })
).rejects.toThrow(KafkaJSNonRetriableError, 'Either topic or topics must be set, not both')
})

test('returns unresolved consumer group offsets', async () => {
const offsets = await admin.fetchOffsets({
groupId,
Expand All @@ -71,6 +80,61 @@ describe('Admin', () => {
expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
})

test('returns consumer group offsets for all topics', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 23 }],
})
await admin.setOffsets({
groupId,
topic: yetAnotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchOffsets({
groupId,
})

expect(offsets).toIncludeSameMembers([
{
topic: yetAnotherTopicName,
partitions: [{ partition: 0, offset: '42', metadata: null }],
},
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '23', metadata: null }] },
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

test('returns consumer group offsets for list of topics', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchOffsets({
groupId,
topics: [topicName, anotherTopicName],
})

// There's no guarantee for the order of topics so we compare sets to avoid flaky tests.
expect(offsets).toIncludeSameMembers([
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '42', metadata: null }] },
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

describe('when used with the resolvedOffsets option', () => {
let producer, consumer

Expand Down
104 changes: 68 additions & 36 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,62 +369,94 @@ module.exports = ({
}

/**
* Fetch offsets for a topic or multiple topics
*
* Note: set either topic or topics but not both.
*
* @param {string} groupId
* @param {string} topic
* @param {string} topic - deprecated, use the `topics` parameter. Topic to fetch offsets for.
* @param {string[]} topics - list of topics to fetch offsets for, defaults to `[]` which fetches all topics for `groupId`.
* @param {boolean} [resolveOffsets=false]
* @return {Promise}
*/
const fetchOffsets = async ({ groupId, topic, resolveOffsets = false }) => {
const fetchOffsets = async ({ groupId, topic, topics, resolveOffsets = false }) => {
if (!groupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`)
}

if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
if (!topic && !topics) {
topics = []
}

const partitions = await findTopicPartitions(cluster, topic)
const coordinator = await cluster.findGroupCoordinator({ groupId })
const partitionsToFetch = partitions.map(partition => ({ partition }))
if (!topic && !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Expected topic or topics array to be set`)
}

if (topic && topics) {
throw new KafkaJSNonRetriableError(`Either topic or topics must be set, not both`)
}

if (topic) {
topics = [topic]
}

const coordinator = await cluster.findGroupCoordinator({ groupId })
const topicsToFetch = await Promise.all(
topics.map(async topic => {
const partitions = await findTopicPartitions(cluster, topic)
const partitionsToFetch = partitions.map(partition => ({ partition }))
return { topic, partitions: partitionsToFetch }
})
)
let { responses: consumerOffsets } = await coordinator.offsetFetch({
groupId,
topics: [{ topic, partitions: partitionsToFetch }],
topics: topicsToFetch,
})

if (resolveOffsets) {
const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic))
consumerOffsets = consumerOffsets.map(({ topic, partitions }) => ({
topic,
partitions: partitions.map(({ offset, partition, ...props }) => {
let resolvedOffset = offset
if (Number(offset) === EARLIEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].low
}
if (Number(offset) === LATEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].high
}
consumerOffsets = await Promise.all(
consumerOffsets.map(async ({ topic, partitions }) => {
const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic))
const recalculatedPartitions = partitions.map(({ offset, partition, ...props }) => {
let resolvedOffset = offset
if (Number(offset) === EARLIEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].low
}
if (Number(offset) === LATEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].high
}
return {
partition,
offset: resolvedOffset,
...props,
}
})

await setOffsets({ groupId, topic, partitions: recalculatedPartitions })

return {
partition,
offset: resolvedOffset,
...props,
topic,
partitions: recalculatedPartitions,
}
}),
}))
const [{ partitions }] = consumerOffsets
await setOffsets({ groupId, topic, partitions })
})
)
}

return consumerOffsets
.filter(response => response.topic === topic)
.map(({ partitions }) =>
partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
}))
)
.pop()
const result = consumerOffsets.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
Nevon marked this conversation as resolved.
Show resolved Hide resolved
}))

return { topic, partitions: completePartitions }
})

if (topic) {
return result.pop().partitions
} else {
return result
}
}

/**
Expand Down
8 changes: 7 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ export type RequestQueueSizeEvent = InstrumentationEvent<{

export type SeekEntry = PartitionOffset

export type FetchOffsetsPartitions = Array<PartitionOffset & { metadata: string | null }>
export interface Acl {
principal: string
host: string
Expand Down Expand Up @@ -436,7 +437,12 @@ export type Admin = {
groupId: string
topic: string
resolveOffsets?: boolean
}): Promise<Array<SeekEntry & { metadata: string | null }>>
}): Promise<FetchOffsetsPartitions>
fetchOffsets(options: {
groupId: string
topics?: string[]
resolveOffsets?: boolean
}): Promise<Array<{topic: string, partitions: FetchOffsetsPartitions}>>
fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>>
fetchTopicOffsetsByTimestamp(topic: string, timestamp?: number): Promise<Array<SeekEntry>>
describeCluster(): Promise<{
Expand Down
4 changes: 4 additions & 0 deletions types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ const runAdmin = async () => {

await admin.listTopics()

await admin.fetchOffsets({ groupId: 'test-group' })
await admin.fetchOffsets({ groupId: 'test-group', topic: 'topic1' })
nirga marked this conversation as resolved.
Show resolved Hide resolved
await admin.fetchOffsets({ groupId: 'test-group', topics: ['topic1', 'topic2'] })

await admin.createTopics({
topics: [{ topic, numPartitions: 10, replicationFactor: 1 }],
timeout: 30000,
Expand Down