KAFKA-10545: Create topic IDs and propagate to brokers#9626
KAFKA-10545: Create topic IDs and propagate to brokers#9626rajinisivaram merged 16 commits intoapache:trunkfrom
Conversation
dengziming
left a comment
There was a problem hiding this comment.
ApiVersionTest need to be changed because you changed ApiVersion, and a few suggesions.
5999b00 to
1926bf4
Compare
|
KAFKA-10729 add |
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
Show resolved
Hide resolved
rite2nikhil
left a comment
There was a problem hiding this comment.
thanks for the pr, left a few commrnts
dajac
left a comment
There was a problem hiding this comment.
Thanks for the PR. I have left some comments. I have only looked at the request/response and the controller so far. I will look at the remaining parts tomorrow.
clients/src/main/resources/common/message/LeaderAndIsrRequest.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/LeaderAndIsrRequest.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/LeaderAndIsrResponse.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/LeaderAndIsrResponse.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/LeaderAndIsrResponse.json
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
Outdated
Show resolved
Hide resolved
| offlineReplicas += tp | ||
| else if (partition.errorCode == Errors.NONE.code) | ||
| onlineReplicas += tp | ||
| if (leaderAndIsrResponse.topics().isEmpty) { |
There was a problem hiding this comment.
It may be better to also have an explicit handling of the version here. Alternatively, we could push this into the LeaderAndIsrResponse and provides a method Map<TopicPartition, ...> partitions() which handles the version.
There was a problem hiding this comment.
Actually, this may not work as we don't have the topic name in the latest version...
There was a problem hiding this comment.
Yeah. One option I thought of would be to do something like partitions(Map<id, string> topicNames) and handle it inside the response with the version. That might be a little cleaner but I'm not sure.
rajinisivaram
left a comment
There was a problem hiding this comment.
@jolshan Thanks for the PR, looks good. Left some minor comments.
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| leaderAndIsrResponse.topics.forEach { topic => | ||
| val topicName = controllerContext.topicNames.get(topic.topicId).get |
There was a problem hiding this comment.
Should we do controllerContext.topicNames.get(topic.topicId).foreach instead of get to avoid throwing exception if topic is not in the context?
There was a problem hiding this comment.
I moved this logic to LeaderAndIsrResponse which is java code, so it behaves a bit differently. (We may want to move back though, and if so, I'll do this.) For now I do a null check in the java code.
One question I have though is what does it mean to not have a topic in the context? Is it possible to send a leaderAndIsrRequest with an unknown partition/one not in the context? I can only think maybe the topic gets deleted while handling the request. What would the normal error handling case be in that scenario?
| log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) | ||
| log.topicId = topicIds.get(topicPartition.topic) | ||
| } else { | ||
| stateChangeLogger.warn("Partition metadata file already contains content.") |
There was a problem hiding this comment.
why is this a warning?
There was a problem hiding this comment.
I think that if we reach here, we are in an unexpected state. The partitionMetadata file should have loaded in the topic ID if the file already contained content. I left as a warn rather than an exception.
There was a problem hiding this comment.
Hmm, looking at the conditional statements here, it looks like we would write the file the first time we get here because log.partitionMetadataFile.get.isEmpty() and the second time we would print a warning even if the id in the file matches the expected id. Unless I missed something.
There was a problem hiding this comment.
Oops. I think I cleaned up this block and deleted something. There should be a check if log.topicId.equals(id). If so, then the file exists and we shouldn't go in to the block that says "// There is not yet a topic ID stored in the log."
I should also fix the topicIds.get(topicPartition.topic) above and replace with id.
There was a problem hiding this comment.
I might think about making this code cleaner in general to avoid so many nested if statements
There was a problem hiding this comment.
So I thought through these cases some more and realized that the metadata file will fail to open if formatted incorrectly. So the only case where there could be data written to the file is if the ID is the zero UUID. So I decided to just fail on reading the file if the zero ID is provided. (We will never write zero ID to file.) The rest of this cleaned up pretty nicely.
rajinisivaram
left a comment
There was a problem hiding this comment.
@jolshan Thanks for the updates, looks good, just one comment/question left. The changes to process errors in LeaderAndIsrResponse.java seem fine. As you said, the only case where topic is not in the controller context would be where a topic was deleted. Don't think we need any other special handling for that. The PR needs rebasing to resolve conflicts btw.
| log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) | ||
| log.topicId = topicIds.get(topicPartition.topic) | ||
| } else { | ||
| stateChangeLogger.warn("Partition metadata file already contains content.") |
There was a problem hiding this comment.
Hmm, looking at the conditional statements here, it looks like we would write the file the first time we get here because log.partitionMetadataFile.get.isEmpty() and the second time we would print a warning even if the id in the file matches the expected id. Unless I missed something.
|
@rajinisivaram Yup, will look at the rebase next. |
|
@jolshan Thanks for the PR, the last build was good, so merging to trunk |
This change takes the topic IDs created in #9473 and propagates them to brokers using LeaderAndIsr Request. It also removes the topic name from the LeaderAndIsr Response, reorganizes the response to be sorted by topic, and includes the topic ID.
In addition, the topic ID is persisted to each replica in Log as well as in a file on disk. This file is read on startup and if the topic ID exists, it will be reloaded.
This PR bumps the IBP and is expected to be merged at the same time as #9622 as to not bump the protocol twice
Committer Checklist (excluded from commit message)