Skip to content

KAFKA-12495: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments#14000

Closed
vamossagar12 wants to merge 3 commits intoapache:trunkfrom
vamossagar12:fixing-connect-comment-revokedInPrevious
Closed

KAFKA-12495: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments#14000
vamossagar12 wants to merge 3 commits intoapache:trunkfrom
vamossagar12:fixing-connect-comment-revokedInPrevious

Conversation

@vamossagar12
Copy link
Contributor

@vamossagar12 vamossagar12 commented Jul 12, 2023

This PR addresses a few issues I noticed with #12561. To provide some context, #12561 provided a mechanism to allow successive revoking rebalances as long as those successive rebalances happened after an exponential backoff timer set after 1 successive revoking rebalance. However, there were some things I noticed while doing some testing which need fixing. These are :

  1. This condition checks if there are lost assignments and exits only if there are no lost assignments and the previous round did not have revocations. Consider a case when a connector or a few connectors are deleted and that leads to load balancing revocations. That would mean that the revokedInPrevious flag would be set to true post that. But, on the follow up rebalance, when there are no lost assignments, the condition lostAssignments.isEmpty() && !revokedInPrevious would no longer be true because revokedInPrevious would be true. This typically culminates in printing this line No worker seems to have departed the group during the rebalance.... which doesn't make sense. Ideally we should exit this condition as soon as lost assignments are empty. Note that this change as such is harmless but this line's presence can seem very confusing. This can be checked in the logs as follows:

Log lines after connector deletions:

2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Member configs: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897=WorkerState{url='http://127.0.0.1:8084/', offset=29, Assignment{error=0, leader='worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897', leaderUrl='http://127.0.0.1:8084/', offset=27, connectorIds=[kafka-connect-redis-1], taskIds=[kafka-connect-redis-1-0, kafka-connect-redis-1-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0}}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd=WorkerState{url='http://127.0.0.1:8083/', offset=29, Assignment{error=0, leader='worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897', leaderUrl='http://127.0.0.1:8084/', offset=27, connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-0, kafka-connect-redis-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0}}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:117)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Max config offset root: 29, local snapshot config offsets root: 29 (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:123)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Performing task assignment during generation: 30 with memberId:  (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:177)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Previous assignments: { connectorIds=[kafka-connect-redis-1, kafka-connect-redis], taskIds=[kafka-connect-redis-1-0, kafka-connect-redis-1-1, kafka-connect-redis-0, kafka-connect-redis-1]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:213)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Configured assignments: { connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-1, kafka-connect-redis-0]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:232)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Received assignments: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897={ connectorIds=[kafka-connect-redis-1], taskIds=[kafka-connect-redis-1-0, kafka-connect-redis-1-1]}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd={ connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-0, kafka-connect-redis-1]}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:625)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Active assignments: { connectorIds=[kafka-connect-redis-1, kafka-connect-redis], taskIds=[kafka-connect-redis-1-0, kafka-connect-redis-1-1, kafka-connect-redis-0, kafka-connect-redis-1]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:237)
[2023-07-12 13:35:30,531] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Deleted assignments: { connectorIds=[kafka-connect-redis-1], taskIds=[kafka-connect-redis-1-0, kafka-connect-redis-1-1]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:253)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Lost assignments: { connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:262)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Created: { connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:267)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Deleted connectors and tasks to revoke from each worker: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897={ connectorIds=[kafka-connect-redis-1], taskIds=[kafka-connect-redis-1-1, kafka-connect-redis-1-0]}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd={ connectorIds=[], taskIds=[]}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:275)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Duplicated connectors and tasks to revoke from each worker: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897={ connectorIds=[], taskIds=[]}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd={ connectorIds=[], taskIds=[]}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:280)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:325)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Incremental connector assignments: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897=[], worker-1-946364e9-6efd-492a-badc-fab26507c9dd=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:382)
[2023-07-12 13:35:30,532] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Incremental task assignments: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897=[], worker-1-946364e9-6efd-492a-badc-fab26507c9dd=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:383)
[2023-07-12 13:35:30,533] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Filling assignment: worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897 -> Assignment{error=0, leader='worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897', leaderUrl='http://127.0.0.1:8084/', offset=29, connectorIds=[], taskIds=[], revokedConnectorIds=[kafka-connect-redis-1], revokedTaskIds=[kafka-connect-redis-1-1, kafka-connect-redis-1-0], delay=0} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:579)

Log lines in the subsequent rebalance:

[2023-07-12 13:35:30,742] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Performing task assignment (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:109)
[2023-07-12 13:35:30,742] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Member configs: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897=WorkerState{url='http://127.0.0.1:8084/', offset=29, Assignment{error=0, leader='worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897', leaderUrl='http://127.0.0.1:8084/', offset=29, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0}}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd=WorkerState{url='http://127.0.0.1:8083/', offset=29, Assignment{error=0, leader='worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897', leaderUrl='http://127.0.0.1:8084/', offset=29, connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0}}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:117)
[2023-07-12 13:35:30,742] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Max config offset root: 29, local snapshot config offsets root: 29 (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:123)
[2023-07-12 13:35:30,742] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Performing task assignment during generation: 31 with memberId:  (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:177)
[2023-07-12 13:35:30,742] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Previous assignments: { connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-1]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:213)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Configured assignments: { connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-1, kafka-connect-redis-0]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:232)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Received assignments: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897={ connectorIds=[], taskIds=[]}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd={ connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-1]}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:625)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Active assignments: { connectorIds=[kafka-connect-redis], taskIds=[kafka-connect-redis-1]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:237)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Deleted assignments: { connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:253)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Lost assignments: { connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:262)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Created: { connectorIds=[], taskIds=[kafka-connect-redis-0]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:267)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Deleted connectors and tasks to revoke from each worker: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897={ connectorIds=[], taskIds=[]}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd={ connectorIds=[], taskIds=[]}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:275)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Duplicated connectors and tasks to revoke from each worker: {worker-2-ec04a27d-64a5-4d39-a1f2-77ff1b6f5897={ connectorIds=[], taskIds=[]}, worker-1-946364e9-6efd-492a-badc-fab26507c9dd={ connectorIds=[], taskIds=[]}} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:280)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Found the following connectors and tasks missing from previous assignments: { connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:459)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] **No worker seems to have departed the group during the rebalance. The missing assignments that the leader is detecting are probably due to some workers failing to receive the new assignments in the previous rebalance. Will reassign missing tasks as new tasks** (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:466)
[2023-07-12 13:35:30,743] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Previous round had revocations but this round didn't. Probably, the cluster has reached a balanced load. Resetting the exponential backoff clock (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor:332)

This PR fixes it by removing the check on revokedInPrevious flag from that condition.

  1. This unsetting of the flag revokedInPrevious within handleLostAssignments has been removed. Instead now it has been moved to the beginning of the rebalance start loop. That makes the changes a lot easier to reason about with no loss of functionality that the original PR aimed to solved.

  2. There was also an inconsistency in the flag and the counter being used for unsetting the revocations state. Over here only the flag is being unset but at a couple of other places, the counter is also reset. IMO both should be done as we are signifying that this round has no revoking rebalance. This PR addresses that concern as well.

  3. Some general refactoring to move the revoking rebalance unsetting to a separate method.

  4. Modified one of the tests. Note that only of the tests needed modifications which was failing due to 3 IMO. Other tests worked w/o any changes.

@vamossagar12 vamossagar12 changed the title Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments Jul 12, 2023
@vamossagar12 vamossagar12 requested a review from C0urante July 14, 2023 10:14
@vamossagar12
Copy link
Contributor Author

Hey Chris, I tagged you for this minor PR since you have context around these changes.

@gharris1727
Copy link
Contributor

Hey @vamossagar12 I don't think that this comment is incorrect or confusing enough to warrant stand-alone PR. If you have other substantive changes in this area, then we can re-examine this comment at that time.

@vamossagar12
Copy link
Contributor Author

Thanks @gharris1727 . hmm the meaning of the variable and it's usage in the comment is slightly off in this case. revokedInPrevious being true doesn't just signify successive revoking rebalances (which is what the original comment reflects) but also signifies that just the previous round had one. I just realised that my updated comment isn't accurate as well and ideally both should be included. Something along the lines of

There are no lost assignments and there have been no revoking rebalances in the previous round(s)

would be more accurate imo. WDYT?

Also, there is a bug where in due to empty lost assignments and a follow up rebalance post connector deletions can lead to these lines getting printed:

https://github.com/apache/kafka/pull/14000/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4L459-R462

Ideally nothing should happen if lost assignments are empty. I haven't had the chance to take a look at fixing it though. Will file a ticket later when I have some time.

@vamossagar12
Copy link
Contributor Author

hey Greg, let me know what you think about my above comment specially updating the comment in question to

There are no lost assignments and there have been no revoking rebalances in the previous round(s)

@C0urante C0urante removed their request for review July 25, 2023 18:33
…d has revocations and lost assignments is empty
@vamossagar12 vamossagar12 changed the title [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments Jul 31, 2023
addNewEmptyWorkers("worker3");
performStandardRebalance();
assertTrue(assignor.delay > 0);
assertEquals(40, assignor.delay); // First successive revoking rebalance.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an elaborate check over here.

addNewEmptyWorkers("worker5");
performStandardRebalance();
assertTrue(assignor.delay > 40);
assertEquals(40, assignor.delay); // First successive revoking rebalance.
Copy link
Contributor Author

@vamossagar12 vamossagar12 Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this happens because we weren't unsetting the counter when delay goes equal to 0. This is the first successive revoking rebalance and the delay should be 40. I say this because this case is similar to worker2 -> worker3 joining above where the delay was 40. It should be similar here as well.


// Sixth assignment with sixth worker joining after the expiry.
// Should revoke
time.sleep(assignor.delay);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this to prove that the changes still work correctly and the delay gets extended further.

@vamossagar12
Copy link
Contributor Author

@gharris1727 I updated the PR to cater to the changes I was talking about here. Note that the original comment correction is not needed anymore.

@vamossagar12
Copy link
Contributor Author

Test failures are unrelated.

@vamossagar12
Copy link
Contributor Author

vamossagar12 commented Sep 6, 2023

@gharris1727 , bumping this one again. It would be nice to fix this since the log line No worker seems to have departed the group crops up unexpectedly leading to confusions when going through the logs

@github-actions
Copy link

github-actions bot commented Dec 6, 2023

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Dec 6, 2023
@vamossagar12 vamossagar12 changed the title IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments KAFKA-12495:IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments Feb 1, 2024
@github-actions github-actions bot removed the stale Stale PRs label Feb 3, 2024
@vamossagar12 vamossagar12 changed the title KAFKA-12495:IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments KAFKA-12495: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments Apr 5, 2024
@github-actions
Copy link

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Dec 27, 2024
@github-actions
Copy link

This PR has been closed since it has not had any activity in 120 days. If you feel like this
was a mistake, or you would like to continue working on it, please feel free to re-open the
PR and ask for a review.

@github-actions github-actions bot added the closed-stale PRs that were closed due to inactivity label Jan 27, 2025
@github-actions github-actions bot closed this Jan 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

closed-stale PRs that were closed due to inactivity connect stale Stale PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants