KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug#10985
Conversation
…ent with unfilledMembers at minQuota
|
ready for review @dajac @guozhangwang @hachikuji @showuon |
| for (String droppedOutConsumer : membersWithOldGeneration) { | ||
| consumerToOwnedPartitions.get(droppedOutConsumer).clear(); | ||
| } |
There was a problem hiding this comment.
This part I just moved here to keep things up to date as we go, before we were clearing them after the loop
There was a problem hiding this comment.
Because we cleared them earlier now, the membersWithOldGeneration is not necessary any more. We can just iterate membersOfCurrentHighestGeneration here.
| ownedPartitions.add(tp); | ||
| } else { | ||
| String otherConsumer = allPreviousPartitionsToOwner.get(tp); | ||
| log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " |
There was a problem hiding this comment.
This is fix #2 -- if we somehow still get multiple consumers claiming a partition in the same generation, we have to consider both invalid and remove it from their ownedPartitions
There was a problem hiding this comment.
nit: do you think we should log at ERROR since this is not expected really? Right now we would sort of "hide" such bugs and still be able to proceed silently; I feel we should shouting out such scenarios a bit louder in logs.
There was a problem hiding this comment.
Good point, yes I would absolutely want/hope a user would report this as a bug. Changed to ERROR
|
|
||
| for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { | ||
| if (ownedPartitions.contains(doublyClaimedPartition)) { | ||
| log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" |
There was a problem hiding this comment.
Strictly speaking this should never ever happen, even if we do get these "impossible" doubly-claimed partitions, we're also removing them from the ownedPartitions above (that's fix #2). But I put in a safeguard just in case, it shouldn't hurt (performance-wise we should generally not even enter this loop since partitionsWithMultiplePreviousOwners should almost always be empty
There was a problem hiding this comment.
nit: add a space after "multiple". i.e. despite being claimed by multiple[ ]
| // the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled | ||
| if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { | ||
| unfilledMembers.add(consumer); | ||
| potentiallyUnfilledMembersAtMinQuota.add(consumer); |
There was a problem hiding this comment.
There was a problem hiding this comment.
Honestly it took me quite a while to understand the fix :P After understanding that I think maybe it's better to rename these two collections more explicitly:
unfilledMembers->MembersWithLessThanMinQuotaPartitions.potentiallyUnfilledMembersAtMinQuota->MembersWithExactMinQuotaPartitions.
And also (since the maxQuota is always either == minQuota or minQuota + 1):
expectedNumMembersAssignedOverMinQuota->expectedNumMembersWithMaxQuotanumMembersAssignedOverMinQuota->numMembersWithMaxQuota
There was a problem hiding this comment.
Ack on the first two renamings, though I'd still want to prefix them with unfilled to emphasize that these structures only hold members that may potentially be assigned one or more partitions. ie, if minQuota == maxQuota, then potentiallyUnfilledMembersAtMinQuota should actually be empty, in which case MembersWithExactMinQuotaPartitions doesn't quite make sense. I'll clarify this in the comments as well.
There was a problem hiding this comment.
For the 3 & 4th suggested renamings, it's a bit subtle but this would actually be incorrect. In the case minQuota == maxQuota, the expectedNumMembersAssignedOverMinQuota variable will evaluate to 0, which would not make sense if it was called expectedNumMembersWithMaxQuota. Of course we could go through a tweak the logic for this case, but I'd prefer not to mix that into this PR. For now I'll just clarify in the comments for these variables.
(I did still rename them slightly to hopefully be more clear, and also in line with the new names of the other two data structures we renamed)
...ts/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
Show resolved
Hide resolved
showuon
left a comment
There was a problem hiding this comment.
@ableegoldman , Thanks for the fix (of my bug :) )! have a quick look and left some comments. Will look again tomorrow. Thanks.
| for (String droppedOutConsumer : membersWithOldGeneration) { | ||
| consumerToOwnedPartitions.get(droppedOutConsumer).clear(); | ||
| } |
There was a problem hiding this comment.
Because we cleared them earlier now, the membersWithOldGeneration is not necessary any more. We can just iterate membersOfCurrentHighestGeneration here.
|
|
||
| for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { | ||
| if (ownedPartitions.contains(doublyClaimedPartition)) { | ||
| log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" |
There was a problem hiding this comment.
nit: add a space after "multiple". i.e. despite being claimed by multiple[ ]
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
Outdated
Show resolved
Hide resolved
|
Thanks for the reviews so far, all comments should be addressed. Migrated the encoded |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Show resolved
Hide resolved
showuon
left a comment
There was a problem hiding this comment.
Have a 2nd review. LGTM! Thanks for the fix to make the cooperative-sticky assignor more reliable!
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks for the patch @ableegoldman ! I made a pass on it.
| // If the current member's generation is higher, all the previously owned partitions are invalid | ||
| if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { | ||
| membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); | ||
| allPreviousPartitionsToOwner.clear(); |
There was a problem hiding this comment.
If the current member's generation is available but < maxGeneration, should we also clear it from the consumerToOwnedPartitions map? I think the passed in subscriptions is not sorted by the generations right?
There was a problem hiding this comment.
In that case, it's never added to consumerToOwnedPartitions in the first place. This map is not pre-filled, it gets populated inside this loop. So if its < maxGeneration, then we just insert an empty list into the map for that member's owned partitions
| // the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled | ||
| if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { | ||
| unfilledMembers.add(consumer); | ||
| potentiallyUnfilledMembersAtMinQuota.add(consumer); |
There was a problem hiding this comment.
Honestly it took me quite a while to understand the fix :P After understanding that I think maybe it's better to rename these two collections more explicitly:
unfilledMembers->MembersWithLessThanMinQuotaPartitions.potentiallyUnfilledMembersAtMinQuota->MembersWithExactMinQuotaPartitions.
And also (since the maxQuota is always either == minQuota or minQuota + 1):
expectedNumMembersAssignedOverMinQuota->expectedNumMembersWithMaxQuotanumMembersAssignedOverMinQuota->numMembersWithMaxQuota
| // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions | ||
| numMembersAssignedOverMinQuota++; | ||
| if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { | ||
| potentiallyUnfilledMembersAtMinQuota.clear(); |
There was a problem hiding this comment.
I'd suggest we remove this potentiallyUnfilledMembersAtMinQuota.clear(); logic and just keep two expected numbers:
- expectedNumMembersWithMaxQuota = totalPartitionsCount % numberOfConsumers;
- expectedNumMembersWithMinQuota = numberOfConsumers - expectedNumMembersWithMaxQuota;
And then we can also remove the check in line 309, and at the end when we exhausted unassignedPartitions, just check
numMembersWithMaxQuota == expectedNumMembersWithMaxQuota &&
numMembersWithMinQuota == expectedNumMembersWithMinQuota
WDYT?
There was a problem hiding this comment.
While I'm not really a fan of the potentiallyUnfilledMembersAtMinQuota logic (it's definitely awkward but I felt it was still the lesser evil in terms of complicating the code), I don't think we can get rid of it that easily. The problem is that when minQuota != maxQuota, and so far currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions, then consumers that are filled up to exactly minQuota have to be considered potentially not yet at capacity since some will need one more partition, though not all. So this data structure is not just used to verify that everything is properly assigned after we've exhausted the unassignedPartitions, it's used to track which consumers can still receive another partition (ie, are "unfilled"). Does that make sense?
There was a problem hiding this comment.
Yes that makes sense, still this logic
if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) {
potentiallyUnfilledMembersAtMinQuota.clear();
}
Seems only needed because we have the check in 309 (?) Say if we do not check that, but instead just check the expected numbers of consumers with minQuota and maxQuota is satisfied, then do we still need this?
There was a problem hiding this comment.
this logic Seems only needed because we have the check in 309 (?)
No, I don't think so. It should be for line 279:
// to handle the case that when there are still unassignedPartition left, but no more members to be assigned.
if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty() && unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
throw new IllegalStateException("No more unfilled consumers to be assigned.");In line 309, it is just an early error detect and log for it. Not related to potentiallyUnfilledMembersAtMinQuota (or now unfilledMembersWithExactlyMinQuotaPartitions members)
There was a problem hiding this comment.
I tried to summarize both methods:
What @guozhangwang 's meaning is, we can "lazily" detect the issue after assigning all unassignedPartitions. we don't need to clear the potentiallyUnfilledMembersAtMinQuota here, because as the "original" variable naming said: they are "potentially unfilled members", just keep them there. We "should not" assign partitions to them in this case because we've reached expectedNumMembersAssignedOverMinQuota.
But if somehow, after assigning unassignedPartitions to all unfilledMembers, there are still some unassignedPartitions left. We can just assign them to potentiallyUnfilledMembersAtMinQuota. And after running out the unassignedPartitions, we can check:
numMembersWithMaxQuota == expectedNumMembersWithMaxQuota &&
numMembersWithMinQuota == expectedNumMembersWithMinQuota
to do error handling.
VS.
In @ableegoldman 's version , we find issue immediately and handle it. we computed the potentiallyUnfilledMembersAtMinQuota correctly (that's why we need to clear it). So, if the issue happened:
if somehow, after assigning unassignedPartitions to all unfilledMembers, there are still some unassignedPartitions left
We can try to get member from potentiallyUnfilledMembersAtMinQuota and then assign unassignedPartition to the member. If we can't get member from it (i.e. potentiallyUnfilledMembersAtMinQuota is empty), we throw exception directly.
Both ways can find errors when happened. Personally, I like Sophie's version more since it's much clear.
There was a problem hiding this comment.
Thanks @showuon and @guozhangwang , I think that all makes sense. One of my primary motivations was to keep all data structures at all times consistent with what they represent so they could always be relied upon to be used at any point. For that reason I also prefer to keep things as is, and clear the potentiallyUnfilledMembersAtMinQuota (now renamed to nfilledMembersWithExactlyMinQuotaPartitions) as soon as we have filled the last member who may be above minQuota, at which point all of the members at exactly minQuota are no longer considered "unfilled"
| ownedPartitions.add(tp); | ||
| } else { | ||
| String otherConsumer = allPreviousPartitionsToOwner.get(tp); | ||
| log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " |
There was a problem hiding this comment.
nit: do you think we should log at ERROR since this is not expected really? Right now we would sort of "hide" such bugs and still be able to proceed silently; I feel we should shouting out such scenarios a bit louder in logs.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
| // should be done. But in case of some algorithmic error, just log a warning and continue to | ||
| // assign any remaining partitions within the assignment constraints | ||
| if (unassignedPartitions.indexOf(unassignedPartition) != unassignedPartitions.size() - 1) { | ||
| log.warn("Filled the last member up to maxQuota but still had partitions remaining to assign, " |
There was a problem hiding this comment.
Related to the one above: maybe we just check that
numMembersWithMaxQuota == expectedNumMembersWithMaxQuota &&
numMembersWithMinQuota == expectedNumMembersWithMinQuota
And if not, log the full assignment as an ERROR?
There was a problem hiding this comment.
I responded to the above comment as well, but specifically here I think that to just check on that condition requires us to make assumptions about the algorithm's correctness up to this point (and the correctness of its assumptions). But if those are all correct then we would never reach this to begin with, so it's better to directly look for any remaining unassignedPartitions -- it's a sanity check.
But ack on bumping to ERROR
...ts/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
Show resolved
Hide resolved
|
Responded to your comments @guozhangwang , let me know if that all makes sense or if you have any more concerns that need to be addressed in this PR |
|
Just one more reply on the clearing logic, otherwise LGTM. |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Show resolved
Hide resolved
|
LGTM! |
|
Some unrelated flaky test failures: |
…utilize generation in cooperative, and fix assignment bug (#10985) 1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in #10509, so it shouldn't be in any released versions and does not need to be backported. Reviewers: Guozhang Wang <guozhang@apache.org>, Luke Chen <showuon@gmail.com>
|
Merged to trunk and cherrypicked back to 3.0 (cc @kkonstantine) Unfortunately it's not really possible to cherrypick this back to 2.8, since there have been so many changes to the assignor since then. However the main conflicts are due to the fix #3, which isn't actually necessary as the bug it's fixing was only present in 3.0. So we just need to extract the #1 and #2 fixes and apply those to the 2.8 assignor (along with the tests, most if not all of which are still relevant in 2.8). We should split this out into a separate ticket though, so we can close the current one and unblock the 3.0 release. @showuon If you have time would you be interested in picking this up? I guess an alternative is to just port those improvements you made to the assignor back to 2.8 and then just cherrypick this fix as usual, which should be relatively smooth since I doubt much else has touched the assignor recently. I'll leave it up to you to decide which route to go, if you want to take this one. Luckily there's no rush on this since the 2.8.1 release isn't right around the corner like the 3.0 release is, so we can take our time |
|
Let me handle the porting to v2.8. :) |
|
@showuon filed https://issues.apache.org/jira/browse/KAFKA-13081 just fyi
Awesome 🚀 |
…ive assignor (#11068) This is the fix 1 and fix 2 in #10985 for v2.8, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…ive assignor (#11068) This is the fix 1 and fix 2 in #10985 for v2.7, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…ive assignor (#11068) This is the fix 1 and fix 2 in #10985 for v2.6, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…ive assignor (#11068) This is the fix 1 and fix 2 in #10985 for v2.6, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…ive assignor (apache#11068) This is the fix 1 and fix 2 in apache#10985 for v2.6, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…utilize generation in cooperative, and fix assignment bug (apache#10985) 1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in apache#10509, so it shouldn't be in any released versions and does not need to be backported. Reviewers: Guozhang Wang <guozhang@apache.org>, Luke Chen <showuon@gmail.com>
The primary goal of this PR is to address the problem we've seen in the wild in which the ConsumerCoordinator fails to update its SubscriptionState and ultimately feeds invalid
ownedPartitionsdata as input to the assignor. Previously the assignor would detect that something was wrong and just throw an exception, now we make several efforts to detect this earlier in the assignment process and then fix it if possible, and work around it if not.Specifically, this PR does a few things:
generationfield back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as wellgenerationfield safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from theownedPartitionsof both consumers, since we can't tell who, if anyone, has the valid claim to this partition.minQuotawere being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to themaxQuota, meaning thoseminQuotamembers could/should not accept any more partitions beyond that. I believe this was introduced in #10509, so it shouldn't be in any released versions and does not need to be backported.