-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better consumer offset handling #378
Conversation
28bd4ba
to
fc1a1ee
Compare
Is that even possible? It's never explicitly stated, but I'd always assumed based on the layout of the protocol that it wasn't really supported to include multiple blocks for a single partition. |
Apparently it works in Kafka 0.8.1 and 0.8.2. The OffsetRequest documentation is especially unclear, but this led me to believe that is was supported:
You can even do more than two, as long as you request the timestamps in descending order. |
fc1a1ee
to
f2e95e4
Compare
@@ -292,6 +297,42 @@ func (client *client) RefreshMetadata(topics ...string) error { | |||
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max) | |||
} | |||
|
|||
func (client *client) GetOffsetRange(topic string, partitionID int32) (int64, int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do all this work when you could have just made two GetOffset
calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 request vs. 2 requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it only happens once on startup, doesn't seem like a big deal (premature optimization and all that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I find this something that is generally useful. E.g., for monitoring purposes, you'd also want to know the available range. Of course, we could do two requests for this purpose as well.
f2e95e4
to
d779673
Compare
WTF :/ Github marked this as merged after I rebased. |
Closing in favor of #418 |
Client.GetOffsetRange
to get the available range of offsets for a partition in one request. I had to make some changes toOffsetRequest
andOffsetResponse
to make it possible to ask for more than one offset per partition, but no changes to the public API were necessary.ConsumePartition
call, even if the offset is provided. The good news is that the method will immediately return an error and never start a goroutine, instead of starting the goroutine and returning an error in theErrors()
channel which you can easily ignore.@Shopify/kafka