-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19141; Persist topic id in OffsetCommit record #19683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for persisting the TopicId in OffsetCommit records to ensure that the persisted offset matches the requested one. The changes include updates to the JSON schema, modifications to the OffsetAndMetadata and record helper implementations, and corresponding adjustments in test cases across multiple modules.
- Updated JSON schema and production code to include the TopicId field.
- Modified OffsetAndMetadata construction, conversion methods (fromRecord/fromRequest), and record creation in GroupCoordinatorRecordHelpers.
- Revised and added tests in tools and group-coordinator modules to verify the correct handling of TopicId.
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java | Added TopicId to OffsetCommitValue in tests. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java | Updated OffsetAndMetadata instantiation with TopicId. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java | Modified tests to include TopicId in offset commit requests. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java | Included TopicId when constructing OffsetAndMetadata. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java | Converted several tests to parameterized ones and passed TopicId where required. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java | Passed TopicId to OffsetAndMetadata in expiration tests. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java | Added tests validating the TopicId field in OffsetAndMetadata. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java | Updated record creation tests to include TopicId. |
| group-coordinator/src/main/resources/common/message/OffsetCommitValue.json | Added a new TopicId field to the JSON schema. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java | Passed TopicId from the topic to OffsetAndMetadata. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java | Extended the OffsetAndMetadata class to include and handle TopicId. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java | Updated record creation to set TopicId from OffsetAndMetadata. |
| .setCommitTimestamp(1234L) | ||
| .setExpireTimestamp(-1L); | ||
| .setExpireTimestamp(-1L) | ||
| .setTopicId(uuid); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm: when we read an older OffsetCommitValue, will we get a ZERO_UUID instead of a null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. It uses ZERO_UUID by default.
| .setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)), | ||
| .setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) | ||
| .setTopicId(offsetAndMetadata.topicId), | ||
| version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update offsetCommitValueVersion to return version 4 if topic id is present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I missed this.
squah-confluent
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
| // Serialize with the highest supported non-flexible version | ||
| // until a tagged field is introduced or the version is bumped. | ||
| return 3; | ||
| return 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we can use OffsetCommitValue.highestSupportedVersion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use hard coded version here to ensure that they are not bumped unintentionally. This is what we have been doing for the other records too.
dongnuo123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! LGTM!
lianetm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
This patch adds the
TopicIdfield to theOffsetCommitValuerecord asa tagged field. It will be later used on the offset fetch path to ensure
that the persisted offset matches the requested one.
Reviewers: Dongnuo Lyu dlyu@confluent.io, Sean Quah
squah@confluent.io, Lianet Magrans lmagrans@confluent.io