-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-8806 Reduce calls to validateOffsetsIfNeeded #7222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8806 Reduce calls to validateOffsetsIfNeeded #7222
Conversation
hachikuji
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a couple comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
| * Create fetch requests for all nodes for which we have assigned partitions | ||
| * that have no existing requests in flight. | ||
| */ | ||
| private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fetchablePartitions method used below is probably another nice opportunity to use something like forEachAssignedPartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it saves anything in this case. The main benefit of forEachAssignedPartition is avoiding making a copy of the assignment set. Since fetchablePartitions iterates across the internal set directly I don't think it would help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid a copy in the case @hachikuji mentions as well, right? See below:
synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
return assignment.stream()
.filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable())
.map(PartitionStates.PartitionState::topicPartition)
.collect(Collectors.toList());
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, I see what he means. I'll look into this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a pass at this and it wasn't so simple. Deferring for now
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
|
There are a number of test failures that could be related. |
|
@ijuma, they are related, looking at them now. |
Previouslly, this would only update the offset and rely on future calls to Fetcher#maybeValidatePositionForCurrentLeader to get the leader information. Now that we are only calling maybeValidatePositionForCurrentLeader when the metadata has updated, we would get stuck after a reset.
…tadata-update-calls
hachikuji
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM. I'll merge after the build completes.
|
retest this please |
|
Two jobs timed out, one had flaky test failures:
|
|
retest this please |
1 similar comment
|
retest this please |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Show resolved
Hide resolved
|
retest this please |
2 similar comments
|
retest this please |
|
retest this please |
|
It's a bit concerning that the tests are so flaky in this PR, have we been checking the failures to see if they're related (before Jenkins deletes them)? |
…tadata-update-calls
…tadata-update-calls
|
retest this please |
…duce-metadata-update-calls
|
Reviving this PR cc @hachikuji @ijuma @andrewchoi5 |
|
retest this please |
|
retest this please |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
| .filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable()) | ||
| .map(PartitionStates.PartitionState::topicPartition) | ||
| .collect(Collectors.toList()); | ||
| List<TopicPartition> result = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a small comment that this is in the hotpath and is written the "ugly" way for a reason. It's also probably worth mentioning that we do the cheap isFetchable check first.
|
Here are some results from the new JMH added (note that the units are milliseconds) So for very high partition counts, there seems to be a decent improvement |
hachikuji
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I did not review the benchmark at depth, but it looks reasonable. For the future, I think we can consider smarter bookkeeping to avoid the need for these loops in the first place. We can create an index in SubscriptionState which is keyed by the state of the partition so so that we do not need a pass to discover the resetting/validating partitions (for example).
For the particular case of validation, I am also looking forward to the improvements that are possible with the Fetch changes in KIP-595. Basically validation can be piggybacked on the Fetch API and we can avoid the need for a separate validating state.
| return false; | ||
| } | ||
| } | ||
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mumrah I now understand why the previous version was slower, it was allocating a PartitionState instance per element. But we only use the value here. So, we could still use allMatch without the performance penalty.
Remove unused PartitionState. It was unused after #7222. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>


Remember the
updateVersionof the last update to Metadata so we can avoid unnecessarily checking each partition for leader epoch changes on every call toKafkaConsumer#poll