-
-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New admin method for fetching group offsets for multiple topics #992
Conversation
src/admin/index.js
Outdated
} | ||
|
||
return consumerOffsets | ||
.filter(response => !topics.length || response.topic in topics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't topics
an array? in
checks for the existence of a key in an object, not whether an item is in an array. Maybe you meant topics.includes(response.topic)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it was just a draft PR and I'm not super familiar with javascript :)
I've removed this line altogether since I realized why would response not always include exactly the topics we requested?
src/admin/index.js
Outdated
}) | ||
consumerOffsets = responses | ||
} else { | ||
const { responses } = await coordinator.offsetFetch({ groupId }) | ||
const { responses } = await coordinator.offsetFetch({ groupId }, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's with the empty array here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to be passed to offsetFetch
so that it will fetch all topics (it actually sets it in the protocol it sends to Kafka from what I've seen).
I see two options:
I think I'm in favor of option 2 more than option 1, because it limits the number of methods on the client and migrating from the old signature to the new is very simple. Do you see any big problem with going with that approach, @nirga? |
src/admin/index.js
Outdated
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`) | ||
} | ||
|
||
if (!Array.isArray(topics)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous implementation if topic
wasn't passed an exception was thrown
if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
}
but here you're setting topics
's default value to pass your exception throwing ([]
), so you're actually not validating that this param was sent here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mm yeah but I think it's kinda weird to force users to set topics=[] if they want to just fetch all of topics of the consumer instead of just calling it with fetchConsumerGroupOffsets({groupId})
.
@Nevon I agree with you. I moved the changes back to |
src/admin/index.js
Outdated
@@ -371,60 +371,88 @@ module.exports = ({ | |||
/** | |||
* @param {string} groupId | |||
* @param {string} topic | |||
* @param {string[]} topics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to write the JSDoc for this, now that the interplay between topic
and topics
is more complicated.
Both topic
and topics
are optional. If neither is set, the default is for topics
to get the value []
. Do you know if there's any way to express that in JSDoc, @ankon? Not a big deal, but I just don't know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, from the top of my head: I think this might be too much to express with JSDoc. There could be a way to do it in the TypeScript definition, and then you could do magic with @type
here.
But it also might not be needed: If these are optional, then from the callers point that's all they need to know. The specific defaulting could just be considered an implementation detail.
I guess I would just go with
/**
* Fetch offsets for a topic or multiple topics
*
* Note that `topics` will be defaulted to `[]` if neither `topic` nor `topics` is provided.
*
* @param {object} options
* @param {string} options.groupId
* @param {string} [options.topic]
* @param {string[]} [options.topics]
* @param {boolean} [optins.resolveOffsets=false]
*/
I guess the reason you need both topic
and topics
is that the function is part of the API, i.e. you're providing backwards-compatibility. Could this not be achieved by adding a new function (say fetchTopicOffsets
), and then eventually deprecate the old one?
Another approach for consideration, closer to what the .d.ts would do given your later comment :)
/**
* @param {import("../../types").FetchOffsetsTopicOptions|import("../../types").FetchOffsetsTopicsOptions} options
* @returns {Promise<...>}
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added comments as @ankon suggested, inline in the parameter list as well.
types/index.d.ts
Outdated
@@ -434,7 +434,8 @@ export type Admin = { | |||
fetchTopicMetadata(options?: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }> | |||
fetchOffsets(options: { | |||
groupId: string | |||
topic: string | |||
topic?: string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github won't let me create a suggestion that goes outside of the lines of the diff, but there's an issue here where the return type of this function is different depending on the input, so we need to be a little more clever here. There's probably some fancy-pants thing you can do with conditional types, but we can also just use plain old method overloading:
interface FetchOffsetsBaseOptions {
groupId: string,
resolveOffsets?: boolean
}
export interface FetchOffsetsTopicOptions extends FetchOffsetsBaseOptions {
topic: string
}
export interface FetchOffsetsTopicsOptions extends FetchOffsetsBaseOptions {
topics?: string[]
}
export type Admin = {
/**
* @deprecated "topic: string" property now replaced by "topics: string[]"
*/
fetchOffsets(options: FetchOffsetsTopicOptions): Promise<Array<PartitionOffset & { metadata: string | null }>>,
fetchOffsets(options: FetchOffsetsTopicsOptions): Promise<Array<{ topic: string, partitions: Array<PartitionOffset & { metadata: string | null }>}>>,
}
This has a few benefits. The first is that the type system will prevent admin.fetchOffsets({ topic: 'foo', topics: ['bar'] })
, because the topic
and topics
properties are mutually exclusive. The second is that it allows us to define the relationship between the input type and the return type. The return type is a bit of an abomination. If we didn't return metadata
it could be simplified to just Promise<TopicOffsets[]>
and Promise<PartitionOffset[]>
, but alas, here we are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've opted to keep the options parameter inline though, but moved the Array<PartitionOffset & { metadata: string | null }>
to a new type. LMK if you prefer the options to be extracted to a type as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Just some fairly minor things to fix.
Co-authored-by: Tommy Brunn <tommy.brunn@gmail.com>
Co-authored-by: Tommy Brunn <tommy.brunn@gmail.com>
Co-authored-by: Tommy Brunn <tommy.brunn@gmail.com>
Co-authored-by: Tommy Brunn <tommy.brunn@gmail.com>
@Nevon Fixed everything. Thanks so much for your review and responsiveness! |
Great, thanks for your contribution! Sorry you had to wait so long for a review. |
I guess you were too fast @Nevon 😅 |
Fixes #989
This PR introduces a new method called
fetchConsumerGroupOffsets
that mimics the behaviorfetchOffsets
(which I now deprecated), but adds support for querying multiple topics at once, or omitting thetopics
parameter altogether and getting the group offsets in all consumed topics.I chose to re-implement this method instead of changing
fetchOffsets
because this introduces an API breaking change: the result is now a list of topics and offsets (instead of just the offsets as it was in the original function).