-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16229: Fix slow expired producer id deletion #15324
Conversation
Hey @jeqo thanks for taking a look and improving this area! Can we add the benchmarks from the ticket to the PR description? |
@jolshan sure! I just added it 👍🏽 |
Also @jeqo -- just curious which java version were you running? |
@@ -177,7 +177,7 @@ private void addProducerId(long producerId, ProducerStateEntry entry) { | |||
} | |||
|
|||
private void removeProducerIds(List<Long> keys) { | |||
producers.keySet().removeAll(keys); | |||
keys.forEach(producers.keySet()::remove); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the issue is that we specify a list here and not a set. (If both collections are a set, I believe should we iterate through the smaller one as you do here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, seems to be related. Attaching the flamegraph on high cpu utilization to spot the root method.
Looking at AbstractSet#remoteAll
implementation:
public boolean removeAll(Collection<?> c) {
Objects.requireNonNull(c);
boolean modified = false;
if (size() > c.size()) {
for (Object e : c)
modified |= remove(e);
} else {
for (Iterator<?> i = iterator(); i.hasNext(); ) {
if (c.contains(i.next())) {
i.remove();
modified = true;
}
}
}
return modified;
}
Seems that in my case it's hitting the second branch, as it's burning on AbstractList#contains
.
For the expiration removal to hit the second branch means the size of expired keys is the same as the size of producers (cannot be higher). This seems to be possible, as we have got this issue even when no producers were running (so no new producer ids created), but when rebalancing the cluster (ie. old producer id snapshots loaded).
In hindsight, the JDK implementation may have considered extending the first condition to include c.size <= size()
scenario, as it will not depend on the collections remove
implementation.
On the other hand, if it would used a HashSet keys
instead of current ArrayList
type, it would not pretty much the same as the proposed fix.
btw, will be simplyfing the expression even further to:
keys.forEach(producers::remove);
both lead to same HashMap#remove
implementation at the end.
We could even consider: if the size of expired producer ids it's the same as all producer ids, then we could consider to clean it all up instead of removing, as the source of expired ids is the same as producer. Something like:
if (keys.size() == producers.size()) {
clearProducerIds();
} else {
keys.forEach(producers::remove);
producerIdCount = producers.size();
}
but performance-wise, execution time is pretty much the same (linear, de-referencing each key) as to the fix version; and readability doesn't improve much.
PS, using jdk17.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough investigation. Pretty cool to deep dive into these things from time to time 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This JDK implementation looks risky - it knows remove
is reasonably cheap for a Set
while Collection.contains
could be really expensive. It's worth adding a comment to our code to make sure someone doesn't revert this change in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think there is a better fix @ijuma ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this fix is fine, but adding a comment would make sense (since it's a bit unintuitive).
ProducerStateManager manager; | ||
Path tempDirectory; | ||
|
||
@Param({"100", "1000", "10000", "100000"}) //, "1000000"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we want to remove the commented out code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure! removing it.
@State(value = Scope.Benchmark) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
public class ProducerStateManagerBench { | ||
final Time time = Time.SYSTEM; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit -- not a huge difference here either but could we mock time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding it in the last commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Hey there @jeqo looks like check style failed. Do you mind adding the apache header to your new benchmark?
|
@jolshan thanks for catching this! adding it now. |
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jolshan@confluent.io>
Expiration of ProducerIds is implemented with a slow removal of map keys:
Unnecessarily going through all producer ids and then throw all expired keys to be removed.
This leads to exponential time on worst case when most/all keys need to be removed:
A simple fix is to use map#remove(key) instead, leading to a more linear growth:
Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million:
[KAFKA-16229]
Committer Checklist (excluded from commit message)