-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-13357: store producer IDs in broker snapshots #11527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When creating snapshots, controllers generate a ProducerIdsRecord indicating the highest producer ID that has been used so far. Brokers should generate the same record, so that the snapshots can be compared. Also, fix a bug in MetadataDelta#finishSnapshot. The current logic will produce the wrong result if all objects of a certain type are completely removed in the snapshot. The fix is to unconditionally create each delta object.
jsancio
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @cmccabe
| new ProducerIdsRecord(). | ||
| setBrokerId(-1). | ||
| setBrokerEpoch(-1). | ||
| setProducerIdsEnd(highestSeenProducerId), (short) 0))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Is it fair to assume that we will revisit this code regarding the version in ApiMessageAndVersionas part of KIP-778?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we will need to have some versioning code in after KIP-778, so that we generate a snapshot at the appropriate metadata.version. Probably this will take the form of adding a version parameter to the snapshot generation function. cc @mumrah
| out.accept(Collections.singletonList(new ApiMessageAndVersion( | ||
| new ProducerIdsRecord(). | ||
| setBrokerId(-1). | ||
| setBrokerEpoch(-1). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broker id and epoch are only used to debugging right? Neither the broker nor the controller use these values, right? Do you think it makes sense to just remember and persist this value in the snapshot just for consistency with the log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's useful in the log as a kind of record of what happened. I think it's much less useful in the snapshot since at most we would be just giving the very latest broker to request producer ids, which isn't very interesting for debugging, 99% of the time.
metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
Outdated
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
Outdated
Show resolved
Hide resolved
…Test.java Co-authored-by: José Armando García Sancio <jsancio@users.noreply.github.com>
…Test.java Co-authored-by: José Armando García Sancio <jsancio@users.noreply.github.com>
jsancio
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are getting unrelated test failures:
Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT
Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT
Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
|
Closing since I merged e8b53ca. I think GitHub didn't close this PR because I didn't reference the PR number in the title. |
When creating snapshots, controllers generate a ProducerIdsRecord indicating the highest producer ID
that has been used so far. Brokers should generate the same record, so that the snapshots can be
compared.
Also, fix a bug in MetadataDelta#finishSnapshot. The current logic will produce the wrong result if
all objects of a certain type are completely removed in the snapshot. The fix is to unconditionally
create each delta object.