KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance#12561
Conversation
|
@C0urante , @showuon , @yashmayya I have created this PR as per the discussions on the ticket. Plz review whenever you get the chance. Thanks! |
|
Thanks @vamossagar12. I haven't looked too closely at the code yet but have a couple high-level thoughts:
|
|
In this PR, similar to Luke's PR, I have added the condition to do a revoking rebalance consecutively. The only difference is that whether revocation would happen or not is decided by the time vis-a-vis the next scheduled revoking rebalance as per exponential backoff. I could notice the flip-side of that as I needed to add a delay in some of the tests to get a revoking rebalance if the workers joined before the next scheduled rebalance. I think what you are saying makes sense. Let me give it a try. Thanks! |
|
@C0urante , i made the changes based on my understanding of your suggestions. Plz review whenever you get the chance. Thanks! |
There was a problem hiding this comment.
@C0urante needed your suggestion on these configs. Should we expose the initial interval parameter so that users can set it? Also, the exp base(second parameter) I believe I have set it to a slightly high value which goes upto 4 seconds in the second attempt and about a minute in the third attempt and so on.
There was a problem hiding this comment.
Should we expose the initial interval parameter so that users can set it?
I don't think so; that would require a KIP to introduce a new user-facing configuration property.
Also, the exp base(second parameter) I believe I have set it to a slightly high value which goes upto 4 seconds in the second attempt and about a minute in the third attempt and so on.
I think this is alright 👍
I don't see why we need jitter, though, and I'm hesitant to add it since it can lead to rebalance delays that exceed the scheduled.rebalance.max.delay.ms parameter. Is there a reason you opted for jitter here?
Also, it's worth noting that the behavior here is to always use a backoff of 10 ms if the scheduled.rebalance.max.delay.ms property is set to 10 or less. I'm not sure this is optimal; people may set the property to zero if they want to disable delayed rebalances altogether, and with this change, we'd be reintroducing delays with no way to disable them. Maybe we should:
- Set the initial interval to zero
- Check if the result of invoking
backoffis zero, and if it is, skip the delay altogether and perform the revocation immediately
There was a problem hiding this comment.
Oops, turns out we can't set the initial interval to 0 with the ExponentialBackoff class as that causes the backoff to be zero for every attempt. Hmm... this may involve a little bit of special handling on our end to accomplish.
There was a problem hiding this comment.
yeah i recall now that even i had tested it with 0 initial interval and it was always returning 0. Let me try to incorporate this .
There was a problem hiding this comment.
I added some logic to set delay to 0 when maxDelay is 0 in which case we would always revoke. I bumped up the multiplier from 20-> 30 as well since initial interval is reduced now.
showuon
left a comment
There was a problem hiding this comment.
Have a look at the non-test code, left some comments. And I agree with Chris that we should not introduce exponential backoff for 1st revocation, and should reset it ASAP we don't need it. Thank you for working on this.
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
Thanks @showuon . I removed the change to revoke the first time around. The delay is now introduced only when there are successive revoking rebalances. |
C0urante
left a comment
There was a problem hiding this comment.
Thanks @vamossagar12, this is looking much better! Still have a few comments but this is definitely going in the right direction.
There was a problem hiding this comment.
Should we expose the initial interval parameter so that users can set it?
I don't think so; that would require a KIP to introduce a new user-facing configuration property.
Also, the exp base(second parameter) I believe I have set it to a slightly high value which goes upto 4 seconds in the second attempt and about a minute in the third attempt and so on.
I think this is alright 👍
I don't see why we need jitter, though, and I'm hesitant to add it since it can lead to rebalance delays that exceed the scheduled.rebalance.max.delay.ms parameter. Is there a reason you opted for jitter here?
Also, it's worth noting that the behavior here is to always use a backoff of 10 ms if the scheduled.rebalance.max.delay.ms property is set to 10 or less. I'm not sure this is optimal; people may set the property to zero if they want to disable delayed rebalances altogether, and with this change, we'd be reintroducing delays with no way to disable them. Maybe we should:
- Set the initial interval to zero
- Check if the result of invoking
backoffis zero, and if it is, skip the delay altogether and perform the revocation immediately
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This isn't actually true, is it? We'll need to wait delay ms to perform the next allocation-balancing revocation (if another one is necessary), but we're performing the revocation for the current round immediately.
This also highlights a larger issue with the approach here: we should only issue an assignment with a delay when that delay is the time until the re-allocation of lost assignments and/or the next round where we will permit allocation-balancing revocations. With this approach, we'll perform allocation-balancing revocations immediately, but also issue an assignment with a delay, which doesn't really serve much purpose since workers automatically rejoin the group (triggering a rebalance) immediately after every round that included a revocation (see here).
I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if delay == 0, and if it is non-zero, then skip those revocations during the current round.
There was a problem hiding this comment.
@C0urante , the log line is definitely incorrect and I would change it. Thanks for pointing it out. I had a question on what you proposed as the better approach though.
With the approach I tried to use here, I am allowing this revoking rebalance to happen and set a delay so that if another worker joins within that delay, it would need to wait for that much of time before we can perform any revocations. This way the approach is slightly optimistic in the sense atleast the first time around we would have a balanced allocation but further allocation-balancing revocations won't allow a barrage of rebalances in succession.
With what you proposed, it seems to me that even the first such worker joining would be made to wait so that in my mind seems slightly pessimistic. IMO since we are already setting the delay for future consecutive allocation-balancing revocations, it might be better to allow the first one go through. WDYT? Have I understood the context correctly or is there anything that I am missing?
There was a problem hiding this comment.
By gating the delay behind revokedInPrevious, we're always allowing the first round that would require allocation-balancing revocations to actually include those revocations. The proposal I made should only cause us to introduce the delay (and gate revocations behind that delay) after the second consecutive rebalance that would require allocation-balancing revocations.
I'd also like to reiterate this point:
we should only issue an assignment with a delay when that delay is the time until the re-allocation of lost assignments and/or the next round where we will permit allocation-balancing revocations.
I agree that if we feel comfortable permitting revocations in a given round, then we should just go ahead and include those revocations in the assignment--but in that case, we don't need to include a delay in the assignment, especially since workers will all automatically rejoin the cluster after they've revoked the appropriate connectors/tasks.
There was a problem hiding this comment.
Yeah, +1 to @C0urante's point that scheduling a rebalance here doesn't seem to serve any purpose?
I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if delay == 0, and if it is non-zero, then skip those revocations during the current round.
In this case, we wouldn't know how much time has passed between the previous round of revoking rebalance and the current rebalance right? Or do we not account for that with the assumption that members will rejoin with a new round of rebalance immediately after a revoking round of rebalance anyway?
There was a problem hiding this comment.
Or do we not account for that with the assumption that members will rejoin with a new round of rebalance immediately after a revoking round of rebalance anyway
I guess we go by this assumption.
There was a problem hiding this comment.
Thanks @C0urante , @yashmayya I made the change as suggested. I had made missed the fact that since the workers would join immediately post a load-balancing rebalance, introducing delay for that round is needless. Plz review.
|
Thanks @C0urante . I had one question on the proposed approach regarding delays. Since the other changes are smallish in nature, I would wait for your response on that one before making the rest of the changes. |
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Yeah, +1 to @C0urante's point that scheduling a rebalance here doesn't seem to serve any purpose?
I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if delay == 0, and if it is non-zero, then skip those revocations during the current round.
In this case, we wouldn't know how much time has passed between the previous round of revoking rebalance and the current rebalance right? Or do we not account for that with the assumption that members will rejoin with a new round of rebalance immediately after a revoking round of rebalance anyway?
showuon
left a comment
There was a problem hiding this comment.
The changes look good. Left some comments. Also, could you re-enable the test in RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining? This fix should resolve the flaky test.
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
Outdated
Show resolved
Hide resolved
Thanks @showuon . I enabled it and it passed locally. Have pushed the change along with other review comments(and a couple of clarifying questions). |
showuon
left a comment
There was a problem hiding this comment.
LGTM! Thanks for the improvement!
Thank you ! |
|
@C0urante I made the switch back as suggested. Plz review whenever you get the chance. |
|
@vamossagar12 there are failing unit tests, can you check those out and ping us after you've taken a look and fixed any issues? |
Oops.. Sorry about that. I didn't check the test results after the latest changes. Will check/fix those and call for review post that. |
2c48580 to
0d39c4c
Compare
I checked in the changes by fixing the 2 unit tests that were failing. This run, there didn't seem to be connect specific failures. My bad on the oversight the last time around. |
| assertAssignment(leaderId, offset, | ||
| Collections.emptyList(), 0, | ||
| Collections.emptyList(), 0, | ||
| Collections.emptyList(), 1, |
There was a problem hiding this comment.
Isn't this a regression? We shouldn't be revoking tasks in this round since, without those revocations, we'd have a balanced assignment.
There was a problem hiding this comment.
Yeah that's true. Revoking tasks at this point leads to imbalance. This was happening since as per the new changes, the moment the delay expires we were allowing revocation. So, at this point, the flag is true, the delay goes to 0 and since revocation is technically possible at this point, the code was doing it. I have added the canRevoke flag back to handle this case with which this testcase another test IncrementalCooperativeAssignorTest#testTaskAssignmentWhenWorkerBounces which seemed to have a similar issue seem to be fixed.
There was a problem hiding this comment.
👍 Thanks. I wish we could do this without adding an extra boolean flag but preventing regressions seems worth the additional complexity, especially since there are other issues with the allocation algorithm that need to be worked out and, when fixed, will hopefully render the flag unnecessary.
There was a problem hiding this comment.
Are there any cases where the introduction of the canRevoke flag will defeat the purpose of this PR, by causing an imbalanced assignment to be sent out with no guaranteed follow-up rebalances (such as those caused by a scheduled rebalance delay, or workers rejoining the cluster after receiving an assignment with revocations)?
There was a problem hiding this comment.
TBH, even I didn't want to re-introduce the flag back but seemed the easiest way to get around the regression. I guess, as you said it might be easier to work through other issues on the allocation algorithm to finally have the flag redundant.
Regarding the side-effects of the re-introduction of this flag, I had imagined that adding the flag back would break some of the tests but that didn't happen which may or mayn't be a good thing. I did look at the logic again and compared with the original algorithm it seemed to me that this line:
is the line that prevented successive revoking rebalances. The other check here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L312
gives us a window to set it to true and force a revocation in the next round which kind of made me believe that it should be a safe check. That said, if there are scenarios where we think we need testing, I would be happy to do that.
There was a problem hiding this comment.
Isn't this a regression? We shouldn't be revoking tasks in this round since, without those revocations, we'd have a balanced assignment.
Hello.. I have been playing around with the revokedInPrevious and the test cases. I looked at this case which is a regression which I agree. However, based on the changes made in this PR, what I think is that post the revoking rebalance, there would be a follow up rebalance which would eventually lead to a balanced load. If you look at IncrementalCooperativeAssignorTest#testTaskAssignmentWhenWorkerBounces this behaviour is exhibited. On similar lines, I made some edits to WorkerCoordinatorIncrementalTest#testTaskAssignmentWhenWorkerBounces method by causing a follow up rebalance after the revoking rebalance after worker 3 comes back. The assignments with which I am calling the final onLeaderElected may not be accurate (as I couldn't get the full assignments using the deserialzeAssignment) but I just chose something which is representative.
I know this is a deviation from what the KIP proposes i.e a bounced member gets its assignments back without any revocations but if what I am stating above sounds ok, do you think it's really bad- coz it seems to be getting to a balanced load after a a follow up rebalance. WDYT?
There was a problem hiding this comment.
Hey @vamossagar12--I'd rather err on the side of not increasing the number of rebalances required for simple cases like the one covered in WorkerCoordinatorIncrementalTest::testTaskAssignmentWhenWorkerBounces.
I plan to file the downstream PR sometime next week. For ease of review I'd prefer to keep it separate from this one, which has grown fairly large and complex on its own.
There was a problem hiding this comment.
Thanks @C0urante . I have reverted the test with the follow up rebalance I was basing my thoughts on the fact that KIP-415 allows having more rebalances so adding another one should be ok. Having said that, I am fine with whatever you stated as well.
Regarding a separate PR, yes that makes sense. Thanks for your help on this!
There was a problem hiding this comment.
Hi @vamossagar12--sorry for the delay. I've filed the follow-up PR here: vamossagar12#1
I've marked it as a draft since it should not be merged into your KAFKA-12495 branch until it's approved to be merged into trunk here, but it's ready for review now. Can you take a look and see what you think? If it looks alright, we can ask Luke to take a look.
(cc @showuon)
There was a problem hiding this comment.
Thanks @C0urante ! I have added some comments for some understanding and around naming.
d879493 to
ab8db49
Compare
...test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
Show resolved
Hide resolved
c7bf4d8 to
71297c3
Compare
KAFKA-12495: Improve rebalance allocation algorithm
|
@C0urante , you want to take another pass on this one whenever you get the chance? |
|
@vamossagar12 the build is failing with a Checkstyle error; can you take a look? |
422c81f to
2eb1f3f
Compare
|
@C0urante , i did that but looks like my force push cleaned up the PR merge since I didn't pull the latest :( I see you have deleted the branch. Would it be possible to create another PR for this? Sorry about this :( |
|
@vamossagar12 It looks like the commit is still present in GitHub: https://github.com/vamossagar12/kafka/tree/422c81fc0a2dd19fee0ed1eb033ef35fc9e27ba1 Can you fetch that commit locally, then add a commit on top of it that removes the |
This reverts commit 2eb1f3fbedb793353d58f61a6a0074b9bbfb6063.
0d0b56a to
52d2381
Compare
|
Thanks @C0urante . i have added back the merge commit and pushed here along with fixing the checkstyle. I ran the tests locally and a few MM related ones failed. |
|
Thanks @vamossagar12. I've run the tests locally and they've succeeded; will wait to see what happens with CI but I think this should be okay as-is. |
C0urante
left a comment
There was a problem hiding this comment.
LGTM, thanks Sagar for the PR and Luke for the additional review. Great to finally get a fix in for this long-standing issue!
|
The single test failure is unrelated. Merging... |
|
Yeah!!! Thank you @vamossagar12 @C0urante ! Nice team work! |
|
One question to @C0urante , do you think we should backport to v3.3 branch? |
|
@showuon I was thinking about it--IMO the additional complexity introduced by this change is a little too risky to backport but I'm almost on the fence here. If you'd like to see it on 3.3 I wouldn't be opposed |
|
I think 3.4 release is also around the corner(few weeks maybe?). Would it better to have it in 3.4? |
|
Yes, I have the same thought as Chris. Besides, this bug already exist for a long long time, it's not that urgent to put it into 3.3. So, let's keep it in 3.4 (trunk branch), which is what we currently did. :) |
|
hwy @C0urante , I was thinking should the exponential backoff thing that we have introduced as part of this PR should go somewhere in the docs? I am saying this since this is a deviation from how things worked prior to this. WDYT? |
|
@vamossagar12 I don't think it's necessary to call this out anywhere unless it's caused unexpected issues with our users. The intention behind the exponential backoff is to avoid rebalance storms but I'm still not sure when those would ever realistically happen, so until we see otherwise, I'd prefer to leave it as an internal implementation detail. |
|
@C0urante , thanks for your response. Makes sense. |
…en worker joins after revoking rebalance (apache#12561) Reviewers: Yash Mayya <yash.mayya@gmail.com>, Luke Chen <showuon@gmail.com>, Chris Egerton <chrise@aiven.io>
Currently, the Incremental rebalance protocol does not allow a subsequent revoking rebalance when a worker joins right after one. This can lead to imbalance in assignments. See KAFKA-12495 for more details.
This PR aims to fix the above. Note that there already exists another PR: #10367 to fix this. The main difference between the 2 approaches is that this one introduces an exponential backoff delay between 2 successive revoking rebalances. This is to dis-allow rebalance storms and still not wait for entire scheduled rebalance delay.
Notable changes in this PR =>