-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
High water mark offset support to the consumer #339
Conversation
need rebase |
3a200f5
to
3f853c6
Compare
@@ -238,6 +239,11 @@ type PartitionConsumer interface { | |||
// errors are logged and not returned over this channel. If you want to implement any custom errpr | |||
// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel. | |||
Errors() <-chan *ConsumerError | |||
|
|||
// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will | |||
// be used for the next message that will be produced. You can use this to determine high far behind |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/high/how
3f853c6
to
8e13cd0
Compare
I think we either need to expose the high water mark on the |
With the buffer on the return I don't think there's a reliable non-confusing way to expose that information though. I think in the common case you will have both in scope anyways: for msg := range pc.Messages() {
// do whatever, including msg.Offset and pc.HighWaterMark()
} |
As a counter example, in my consumergroup library, you will get messages from all partitions on the channel, and you don't have access to the partition consumers. I guess I could somehow build some code around that to make it work, but it feels painful. |
What about exposing a consumer.GetChild(topic, partition).HighWaterMark() I think this is potentially a useful method to have anyways, and is very easy to write. |
I still like this PR as it stands (needs a rebase, but). Maybe we need to expose this information in other places as well, but the current place on the |
7fd499b
to
5f78d90
Compare
Rebased. |
// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will | ||
// be used for the next message that will be produced. You can use this to determine how far behind | ||
// the processing is. | ||
HighWaterMarkOffset() int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, the kafka docs have "watermark" as one word, so I guess HighWatermarkOffset
would be more consistent, but YMMV
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I guess that's my fault for misnaming the variable in FetchResponseBlock
:(
Add it to the list of breaking changes I guess...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, grammatically I feel my capitalization closer to high-water mark
instead of the incorrect high watermark
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I have a problem with it being high-water mark offset; I'd have preferred high-water mark or high-water offset instead of this Frankensteinian composition. But that would be deviating from the protocol spec too much. Oh well :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for a different opinion, wikipedia doesn't even hyphenate, it just uses three distinct words ("high water mark") everywhere. So I guess this issue is contentious :)
This is fine as-is (and is consistent with FetchResponseBlock
). We can revisit if anybody complains.
This LGTM. if you're comfortable with the API. |
I am ok with this API as a low level building block, but for the high-level consumer we should definitely give this some more thought. |
High water mark offset support to the consumer
Expose high water marker offset in consumer for lag monitoring purposes.
@Shopify/kafka