-
Notifications
You must be signed in to change notification settings - Fork 4.1k
[STORM-2607] Offset consumer + 1 #2181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@tiodollar Nice find. The fix is not workable though. We need to update OffsetManager to handle that the commit offset should be the last processed offset +1, as per the Kafka docs for commitSync: "The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1". Would you like to take a look at fixing this? I'd also be happy to take a stab at it. |
|
@srdo I'll take look, and try fixing this! |
| preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); | ||
|
|
||
| return numCommittedOffsets; | ||
| return numCommittedOffsets - 1;// The committed offset should be the next message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this in L144 instead, so the log messages above print the right number as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo Ok!
| OffsetAndMetadata nextCommitOffsetAndMetadata = null; | ||
| if (nextCommitMsg != null) { | ||
| //The committed offset should be the next message | ||
| nextCommitOffset = nextCommitOffset + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer that we fix the offsets in the code block above (L76-117) instead of adjusting the offset here. That way the log messages won't be printing the wrong offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo Ok!
… instead of last committed offset for compatibility with commitSync consumer API
STORM-2607: Switch OffsetManager to track earliest uncommitted offset… by @srdo
srdo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tiodollar Thanks. Tested this with an example topology, and the spout seems to commit all the messages now.
@hmcl Could I get you to review this?
| nextCommitMsg.getMetadata(Thread.currentThread())); | ||
| LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", | ||
| tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); | ||
| tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, missed a -1 here. It should be nextCommitOffsetAndMetadata.offset() - 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo I corrected that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
|
@hmcl Could I get you to review this? |
|
@tiodollar ok |
|
@tiodollar Seems like there's a conflict with master. Could you fix it? |
|
@srdo i'll fix this! Tks! |
|
@tiodollar Any updates? |
|
@HeartSaVioR i'll fix this merge today! |
|
@tiodollar once you do the merge, can you please squash all the commits. Thanks. |
|
@tiodollar Please let me know if you're having trouble resolving the conflicts. I'd be happy to help. |
|
Is there any reason this merge is pending? Can I help? |
|
Anyone could take this over since this PR looks like inactive. |
|
Yes, we should get this in soon if @tiodollar is busy. @Chandan83 if you'd like to work on this please create an account at https://issues.apache.org/jira and let us know, so we can assign the issue to you. With regard to squashing, I think it should be possible to squash the existing commits into one for @tiodollar's commits and one for mine and preserve authorship information that way. |
|
@srdo I have created an apache jira account with id ChandanKrSingh. Please assign the issue to me. |
|
@srdo i'll resolve this issue today! I have been busy, but i reseved some hour to do this. |
|
@tiodollar Great. Thanks. |
|
@tiodollar Since there is still a conflict here, and we'd like this to go in 1.1.2 which we hope to release before too long, I've resolved the conflicts and addressed the review comments. There's a PR with the full set of changes here #2367. Note that your commits are still present, so you should still be credited with the fix. Let me know if this is not okay with you. |
|
@srdo tks for this! I'm very busy, and is better that! For me it's ok! |
|
@tiodollar Happy to hear it :) |
When i put a message in a partition, the storm-kafka-client consume this message.
But storm-kafka-client commit the offset -1.
storm-kafka-client: 1.1.0
storm-core : 1.1.0
kafka: 0.10.2.0
Steps to bug
#1 - Insert message in kafka
#2 - Read with storm Spout this topic
#3 - Get the offset for the consumer group and the offset is always offset -1
https://issues.apache.org/jira/browse/STORM-2607