-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15627: KIP-951's Leader discovery optimisations on the client #14564
Conversation
@@ -150,7 +150,7 @@ MetadataCache mergeWith(String newClusterId, | |||
// We want the most recent topic ID. We start with the previous ID stored for retained topics and then | |||
// update with newest information from the MetadataResponse. We always take the latest state, removing existing | |||
// topic IDs if the latest state contains the topic name but not a topic ID. | |||
Map<String, Uuid> newTopicIds = topicIds.entrySet().stream() | |||
Map<String, Uuid> newTopicIds = this.topicIds.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was a bug, now has a test in MetadataCachTest.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😱 What was the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Net effect of the bug was that in the merged cache, the IDs of retained topics(from pre-existing metadata) would be lost in the newly built cache(via merging).
As the current comment explains the intention of the code is to get merged list of topic-ids.
We start with the previous ID stored for retained topics and then update with newest information from the MetadataResponse.
This should be done by initialising the merged list of topic-ids with retained topic(this.topicIds
). And then updating with newest information(topicIds
). But the code uses topicIds
even to get retained topic-ids. The bug got introduced inadvertently here, due to same-named variables at object Vs method scope. Now it has a test to catch regressions in behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename the argument currently called topicIds
too, just to try to prevent a similar bug being re-introduced by accident.
@@ -83,4 +84,69 @@ public void testMissingLeaderEndpoint() { | |||
assertEquals(nodesById.get(7), replicas.get(7)); | |||
} | |||
|
|||
@Test | |||
public void testMergeWithThatPreExistingPartitionIsRetainedPostMerge() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test for bug fixed in MetadataCache
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pull request, @msn-tldr!
I didn't anything too egregious, but a handful of changes I'd like to see. A good number of them are subjective, so feel free to ignore 😄
for (Entry partitionLeader: partitionLeaders.entrySet()) { | ||
TopicPartition partition = (TopicPartition) partitionLeader.getKey(); | ||
Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); | ||
Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) partitionLeader.getValue(); | ||
if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { | ||
log.trace("For {}, incoming leader information is incomplete {}", partition, newLeader); | ||
continue; | ||
} | ||
if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= currentLeader.epoch.get()) { | ||
log.trace("For {}, incoming leader({}) is not-newer than the one in the existing metadata {}, so ignoring.", partition, newLeader, currentLeader); | ||
continue; | ||
} | ||
if (!newNodes.containsKey(newLeader.leaderId.get())) { | ||
log.trace("For {}, incoming leader({}), the corresponding node information for node-id {} is missing, so ignoring.", partition, newLeader, newLeader.leaderId.get()); | ||
continue; | ||
} | ||
if (!this.cache.partitionMetadata(partition).isPresent()) { | ||
log.trace("For {}, incoming leader({}), no longer has cached metadata so ignoring.", partition, newLeader); | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these checks are to handle cases where the incoming leader information is not usable, right? I'm wondering—and this is a stylistic choice—if it makes sense to throw these into a separate method shouldUpdatePartitionLeader(. . .)
? Might make it more easily accessible for a dedicated unit test, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a shame that use of Optional
is so noisy, what with its isPresent()
s and get()
s everywhere 😦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it makes sense to throw these into a separate method shouldUpdatePartitionLeader(. . .)?
Right now conditions to not update are fairly contained, so i will skip adding the method now.
continue; | ||
} | ||
|
||
MetadataResponse.PartitionMetadata existingMetadata = this.cache.partitionMetadata(partition).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way that existingMetadata
could be null
at this point? I assume not, due to the use of the Optional
wrapper here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call-out, but no, due to check at line 392 making sure optional.isPresent()
if (!this.cache.partitionMetadata(partition).isPresent()) {
log.trace("For {}, incoming leader({}), no longer has cached metadata so ignoring.", partition, newLeader);
continue;
}
@@ -171,6 +179,15 @@ protected void handleFetchResponse(final Node fetchTarget, | |||
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", | |||
fetchConfig.isolationLevel, fetchOffset, partition, partitionData); | |||
|
|||
Errors partitionError = Errors.forCode(partitionData.errorCode()); | |||
if (requestVersion >= 16 && (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a constant, with some comments, for the magic value of 16
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't needed, so removed(from the Sender code path too). As new leader is coming through tagged fields, for version < 16, tagged fields would be initialised with default values that would be ignored.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
final short requestVersion = response.requestHeader().apiVersion(); | ||
if (requestVersion >= 16 && !partitionsWithUpdatedLeaderInfo.isEmpty()) { | ||
List<Node> leaderNodes = produceResponse.data().nodeEndpoints().stream() | ||
.map(e -> new Node(e.nodeId(), e.host(), e.port(), e.rack())) | ||
.filter(e -> !e.equals(Node.noNode())) | ||
.collect( | ||
Collectors.toList()); | ||
Set<TopicPartition> updatedPartitions = metadata.updatePartially(partitionsWithUpdatedLeaderInfo, leaderNodes); | ||
if (log.isTraceEnabled()) { | ||
updatedPartitions.forEach( | ||
part -> log.trace("For {} leader was updated.", part) | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this worth the effort to generalize and share among Sender
and AbstractFetch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, as there are subtle differences as in different response types & Fetcher has extra actions after leader is updated. So the common part is really is just 1-2 statements, so not worth it IMO.
58c167a
to
bc268a5
Compare
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
@@ -150,7 +150,7 @@ MetadataCache mergeWith(String newClusterId, | |||
// We want the most recent topic ID. We start with the previous ID stored for retained topics and then | |||
// update with newest information from the MetadataResponse. We always take the latest state, removing existing | |||
// topic IDs if the latest state contains the topic name but not a topic ID. | |||
Map<String, Uuid> newTopicIds = topicIds.entrySet().stream() | |||
Map<String, Uuid> newTopicIds = this.topicIds.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename the argument currently called topicIds
too, just to try to prevent a similar bug being re-introduced by accident.
4888702
to
49bad17
Compare
@AndrewJSchofield & @kirktrue addressed the feedback so far! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple of questions, but I don't think they need to block the PR
Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); | ||
Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue(); | ||
if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { | ||
log.debug("For {}, incoming leader information is incomplete {}", partition, newLeader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we only log these if debug is enabled? Or should we move them to INFO
@@ -198,6 +215,20 @@ protected void handleFetchSuccess(final Node fetchTarget, | |||
fetchBuffer.add(completedFetch); | |||
} | |||
|
|||
if (!partitionsWithUpdatedLeaderInfo.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused why we collect the partitionsWithUpdatedLeaderInfo
above when it looks like all we do with them is validate them to the subscriptions later. Is there any other use for having it out of the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionsWithUpdatedLeaderInfo
are collected above to update metadata once with all updates via metadata.updatePartitionLeadership
.
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Outdated
Show resolved
Hide resolved
f4c91da
to
7ef5e8b
Compare
Rebasing my fork seems to have removed the PR commits from the remote branch |
NOTE - This PR has since been moved here, as it got auto-closed with github workflows as explained in this comment.
This implements the leader discovery optimisations for the client on KIP-951.
This PR focuses on optimisation2 from above. Additionally it fixes a bug that got introduced to
MetadataCache.java
, details inline in a comment.IGNORE files - *.json,
FetchResponse.java
&ProduceResponse.java
, they will be removed from this PR once #14627 is merged.Committer Checklist (excluded from commit message)