Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New admin method for fetching group offsets for multiple topics #992

Merged
merged 17 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 111 additions & 45 deletions src/admin/__tests__/fetchConsumerGroupOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ const {
} = require('testHelpers')

describe('Admin', () => {
let admin, cluster, groupId, logger, topicName, anotherTopicName
let admin, cluster, groupId, logger, topicName, anotherTopicName, yetAnotherTopicName

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
anotherTopicName = `another-topic-${secureRandom()}`
yetAnotherTopicName = `yet-another-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`

await createTopic({ topic: topicName })
await createTopic({ topic: anotherTopicName })
await createTopic({ topic: yetAnotherTopicName })

logger = newLogger()
cluster = createCluster()
Expand All @@ -34,18 +36,24 @@ describe('Admin', () => {
admin && (await admin.disconnect())
})

describe('fetchOffsets', () => {
describe('fetchConsumerGroupOffsets', () => {
test('throws an error if the groupId is invalid', async () => {
await expect(admin.fetchOffsets({ groupId: null })).rejects.toHaveProperty(
await expect(admin.fetchConsumerGroupOffsets({ groupId: null })).rejects.toHaveProperty(
'message',
'Invalid groupId null'
)
})

test('throws an error if the topics argument is not a valid list', async () => {
await expect(
admin.fetchConsumerGroupOffsets({ groupId: 'groupId', topics: topicName })
).rejects.toHaveProperty('message', `Expected topics array, got ${topicName}`)
})

test('returns unresolved consumer group offsets', async () => {
const offsets = await admin.fetchOffsets({
const offsets = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([
Expand All @@ -60,12 +68,14 @@ describe('Admin', () => {
partitions: [{ partition: 0, offset: 13 }],
})

const offsets = await admin.fetchOffsets({
const offsets = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

test('returns consumer group offsets for all topics', async () => {
Expand All @@ -77,16 +87,48 @@ describe('Admin', () => {
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 23 }],
})
await admin.setOffsets({
groupId,
topic: yetAnotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchOffsets({
const offsets = await admin.fetchConsumerGroupOffsets({
groupId,
})

expect(offsets).toEqual([
{
topic: yetAnotherTopicName,
partitions: [{ partition: 0, offset: '42', metadata: null }],
},
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '23', metadata: null }] },
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

test('returns consumer group offsets for list of topics', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchConsumerGroupOffsets({
groupId,
topics: [topicName, anotherTopicName],
})

expect(offsets).toEqual([
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '42', metadata: null }] },
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

Expand Down Expand Up @@ -139,67 +181,85 @@ describe('Admin', () => {
})

test('no reset: returns latest *committed* consumer offsets', async () => {
const offsetsBeforeResolving = await admin.fetchOffsets({
const offsetsBeforeResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
const offsetsUponResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
const offsetsAfterResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
})

test('reset to latest: returns latest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName })

const offsetsBeforeResolving = await admin.fetchOffsets({
const offsetsBeforeResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
const offsetsUponResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
const offsetsAfterResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '10', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '10', metadata: null }] },
])
})

test('reset to earliest: returns earliest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName, earliest: true })

const offsetsBeforeResolving = await admin.fetchOffsets({
const offsetsBeforeResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
const offsetsUponResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
const offsetsAfterResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '0', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '0', metadata: null }] },
])
})

testIfKafkaAtLeast_0_11(
Expand All @@ -216,23 +276,29 @@ describe('Admin', () => {
await admin.deleteTopicRecords({ topic: topicName, partitions: messagesToDelete })
await admin.resetOffsets({ groupId, topic: topicName, earliest: true })

const offsetsBeforeResolving = await admin.fetchOffsets({
const offsetsBeforeResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
const offsetsUponResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
const offsetsAfterResolving = await admin.fetchConsumerGroupOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '7', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '7', metadata: null }] },
])
}
)
})
Expand Down
38 changes: 22 additions & 16 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ module.exports = ({
}

/**
* @deprecated - This method was replaced by `fetchConsumerGroupOffsets`. This implementation
* is limited to fetching offsets to a single topic only, and lacks a default option of retrieving
* offsets for all topics for a consumer group.
* @param {string} groupId
* @param {string} topic
* @param {boolean} [resolveOffsets=false]
Expand Down Expand Up @@ -428,38 +431,43 @@ module.exports = ({

/**
* @param {string} groupId
* @param {string[]]} topics
* @param {string[]} topics
* @param {boolean} [resolveOffsets=false]
* @return {Promise}
*/
const fetchConsumerGroupOffsets = async ({ groupId, topics, resolveOffsets = false }) => {
const fetchConsumerGroupOffsets = async ({ groupId, topics = [], resolveOffsets = false }) => {
if (!groupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`)
}

if (!Array.isArray(topics)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous implementation if topic wasn't passed an exception was thrown

    if (!topic) {
      throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
    }

but here you're setting topics's default value to pass your exception throwing ([]), so you're actually not validating that this param was sent here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm yeah but I think it's kinda weird to force users to set topics=[] if they want to just fetch all of topics of the consumer instead of just calling it with fetchConsumerGroupOffsets({groupId}).

throw new KafkaJSNonRetriableError(`Expected topics array, got ${topics}`)
}

const coordinator = await cluster.findGroupCoordinator({ groupId })

let consumerOffsets
if (topics.length) {
const topicsToFetch = Promise.all(
const topicsToFetch = await Promise.all(
Nevon marked this conversation as resolved.
Show resolved Hide resolved
topics.map(async topic => {
const partitions = await findTopicPartitions(cluster, topic)
const partitionsToFetch = partitions.map(partition => ({ partition }))
return { topic, partitions: partitionsToFetch }
})
)

const { responses } = await coordinator.offsetFetch({
Nevon marked this conversation as resolved.
Show resolved Hide resolved
groupId,
topicsToFetch,
topics: topicsToFetch,
})
consumerOffsets = responses
} else {
nirga marked this conversation as resolved.
Show resolved Hide resolved
const { responses } = await coordinator.offsetFetch({ groupId })
const { responses } = await coordinator.offsetFetch({ groupId }, [])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's with the empty array here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be passed to offsetFetch so that it will fetch all topics (it actually sets it in the protocol it sends to Kafka from what I've seen).

consumerOffsets = responses
}

if (resolveOffsets) {
consumerOffsets = Promise.all(
consumerOffsets = await Promise.all(
consumerOffsets.map(async ({ topic, partitions }) => {
const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic))
const recalculatedPartitions = partitions.map(({ offset, partition, ...props }) => {
Expand Down Expand Up @@ -489,17 +497,15 @@ module.exports = ({
)
}

return consumerOffsets
.filter(response => !topics.length || response.topic in topics)
.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
}))
return consumerOffsets.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
Nevon marked this conversation as resolved.
Show resolved Hide resolved
}))

return { topic, partitions: completePartitions }
})
return { topic, partitions: completePartitions }
})
}

/**
Expand Down
Loading