Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7619441
KAFKA-16985: Ensure consumer sends leave request on close even if int…
kirktrue Jul 24, 2024
6d19c30
More WiP
kirktrue Jul 26, 2024
f52a8b8
Fixed checkstyle errors :(
kirktrue Jul 26, 2024
e4b0a27
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Jul 29, 2024
6bd52ca
Attempting to use polling wait for member to join and then leave cons…
kirktrue Jul 29, 2024
980e782
Updates to test core issue
kirktrue Jul 30, 2024
d113d98
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Jul 31, 2024
abac405
Reverting method name change
kirktrue Jul 31, 2024
30f85a6
Testing only the new consumer group protocol as the classic consumer …
kirktrue Jul 31, 2024
fd563ac
Reverting name changes
kirktrue Jul 31, 2024
ad3c7d9
Minor cleanup
kirktrue Jul 31, 2024
acc3cb4
Update AsyncKafkaConsumer.java
kirktrue Jul 31, 2024
09ccd52
Update AsyncKafkaConsumer.java
kirktrue Jul 31, 2024
c3de041
Update AsyncKafkaConsumer.java
kirktrue Jul 31, 2024
a71b251
Attempting to generalize testCloseLeavesGroupOnInterrupt for use by b…
kirktrue Aug 3, 2024
3effda7
Reverted to only run the integration test only for CONSUMER
kirktrue Aug 9, 2024
0bc3908
Forcing the timer to 0 if the thread was interrupted
kirktrue Sep 5, 2024
fe5464b
Implement terrible, terrible workaround for 0 timeout for leaving the…
kirktrue Sep 5, 2024
17c30c4
Remove horrible atrocity of the 'leave group timer'
kirktrue Sep 11, 2024
9899647
Removed unnecessary import
kirktrue Sep 11, 2024
23aed73
Added a parameterized test to exercise close behavior when interrupted
kirktrue Sep 12, 2024
086038d
Separated tests to make their intent more obvious
kirktrue Sep 14, 2024
17658c1
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Sep 25, 2024
484b3cf
Aligned the exception throwing to match the ClassicKafkaConsumer
kirktrue Sep 25, 2024
b2a5373
Restoring interrupt flag status for ConsumerRebalanceListener invocat…
kirktrue Sep 26, 2024
144a0a9
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Oct 1, 2024
36a49fd
Fixed JavaDoc problem
kirktrue Oct 1, 2024
e091cd7
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Oct 9, 2024
0f13eff
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Oct 17, 2024
7e3a946
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Oct 31, 2024
82c024d
Revised comments
kirktrue Oct 31, 2024
e4d38dd
Removed unnecessary catch block
kirktrue Oct 31, 2024
cb63350
More updates for comments to change Javadoc style
kirktrue Oct 31, 2024
3b8751e
Updating testCloseLeavesGroupOnInterrupt() integration test to run fo…
kirktrue Oct 31, 2024
39db09c
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Nov 1, 2024
6614bd5
WIP on new approach to perform callback without hops
kirktrue Nov 1, 2024
df36f1d
WIP updates to unit tests for leaving group on close
kirktrue Nov 1, 2024
d5e71a7
Fixed silly mistake that would make this never, ever work
kirktrue Nov 2, 2024
7305973
Updates to revert unnecessary changes
kirktrue Nov 3, 2024
ca04847
More tweaks to revert unnecessary diffs
kirktrue Nov 3, 2024
dda8650
Minor test method name tweaks
kirktrue Nov 3, 2024
ad4de03
Formatting, comments, refactoring, and restoring the interrupt flag i…
kirktrue Nov 3, 2024
a417e92
Removed efforts to restore interrupt state, post handler
kirktrue Nov 4, 2024
59f128b
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Nov 5, 2024
e8b6d99
Updates to snapshot the assignment in the background thread to use in…
kirktrue Nov 5, 2024
fa878fe
Updates to address PR feedback
kirktrue Nov 7, 2024
af8ef05
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Nov 7, 2024
ac8c7d9
Reverting whitespace changes
kirktrue Nov 7, 2024
d635cee
Changed the visibility of notifyAssignmentChange since it is no longe…
kirktrue Nov 7, 2024
ca54979
Addressing the signal-on-close issue from a different approach
kirktrue Nov 7, 2024
313ae6e
Updates to address feedback from PR review
kirktrue Nov 9, 2024
271638b
Removed unnecessary import
kirktrue Nov 9, 2024
4b0d310
Notifying when subscription is changed when the assignment is cleared
kirktrue Nov 12, 2024
99d7bc1
Updated JavaDoc comments
kirktrue Nov 12, 2024
91a0ec2
Notifying on clearAssignment only if using auto-assigned partitions (…
kirktrue Nov 12, 2024
f8d0e1d
Merge branch 'trunk' into KAFKA-16985-clear-interrupt-on-consumer-close
kirktrue Nov 13, 2024
6dcd3a2
Removed superfluous call to notifyAssignmentChange() since clearAssig…
kirktrue Nov 13, 2024
e5d4044
Removed extraneous whitespace above close() method
kirktrue Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -130,7 +129,8 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
* requests in cases where a currently assigned topic is in the target assignment (new
* partition assigned, or revoked), but it is not present the Metadata cache at that moment.
* The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the
* member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}).
* member fails ({@link #transitionToFatal()} or leaves the group
* ({@link #leaveGroup()}/{@link #leaveGroupOnClose()}).
*/
private final Map<Uuid, String> assignedTopicNamesCache;

Expand All @@ -157,9 +157,9 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
private boolean rejoinedWhileReconciliationInProgress;

/**
* If the member is currently leaving the group after a call to {@link #leaveGroup()}}, this
* will have a future that will complete when the ongoing leave operation completes
* (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
* If the member is currently leaving the group after a call to {@link #leaveGroup()} or
* {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation
* completes (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
* member is not leaving.
*/
private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
Expand Down Expand Up @@ -481,6 +481,7 @@ public void onConsumerPoll() {
private void clearAssignment() {
if (subscriptions.hasAutoAssignedPartitions()) {
subscriptions.assignFromSubscribed(Collections.emptySet());
notifyAssignmentChange(Collections.emptySet());
}
currentAssignment = LocalAssignment.NONE;
clearPendingAssignmentsAndLocalNamesCache();
Expand All @@ -496,8 +497,9 @@ private void clearAssignment() {
*/
private void updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition> assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
Collection<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
Set<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, addedPartitions);
notifyAssignmentChange(assignedTopicPartitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are notifying on the path of new assignment and on leave, but I think we're missing the fenced/fatal paths.

Those 2 end up calling clearAssignment, so maybe we could notify there to cover them? (right after ln 483 subscriptions.assignFromSubscribed(Collections.emptySet()))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to look like this:

    private void clearAssignment() {
        if (subscriptions.hasAutoAssignedPartitions()) {
            subscriptions.assignFromSubscribed(Collections.emptySet());
            notifyAssignmentChange(Collections.emptySet());
        }
        currentAssignment = LocalAssignment.NONE;
        clearPendingAssignmentsAndLocalNamesCache();
    }

}

/**
Expand All @@ -523,18 +525,45 @@ public void transitionToJoining() {
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the unsubscribe API.
* This is expected to be invoked when the user calls the {@link Consumer#close()} API.
*
* @return Future that will complete when the heartbeat to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroupOnClose() {
return leaveGroup(false);
}

/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the {@link Consumer#unsubscribe()} API.
*
* @return Future that will complete when the callback execution completes and the heartbeat
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
return leaveGroup(true);
}

/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the unsubscribe API or is closing the consumer.
*
* @param runCallbacks {@code true} to insert the step to execute the {@link ConsumerRebalanceListener} callback,
* {@code false} to skip
*
* @return Future that will complete when the callback execution completes and the heartbeat
* to leave the group has been sent out.
*/
protected CompletableFuture<Void> leaveGroup(boolean runCallbacks) {
if (isNotInGroup()) {
if (state == MemberState.FENCED) {
clearAssignment();
transitionTo(MemberState.UNSUBSCRIBED);
}
subscriptions.unsubscribe();
notifyAssignmentChange(Collections.emptySet());
return CompletableFuture.completedFuture(null);
}

Expand All @@ -549,31 +578,39 @@ public CompletableFuture<Void> leaveGroup() {
CompletableFuture<Void> leaveResult = new CompletableFuture<>();
leaveGroupInProgress = Optional.of(leaveResult);

CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId, error);
} else {
log.info("Member {} completed callback to release assignment. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId);
}

// Clear the subscription, no matter if the callback execution failed or succeeded.
subscriptions.unsubscribe();
clearAssignment();
if (runCallbacks) {
CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId, error);
} else {
log.info("Member {} completed callback to release assignment. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId);
}

// Transition to ensure that a heartbeat request is sent out to effectively leave the
// group (even in the case where the member had no assignment to release or when the
// callback execution failed.)
transitionToSendingLeaveGroup(false);
});
// Clear the assignment, no matter if the callback execution failed or succeeded.
clearAssignmentAndLeaveGroup();
});
} else {
clearAssignmentAndLeaveGroup();
}

// Return future to indicate that the leave group is done when the callbacks
// complete, and the transition to send the heartbeat has been made.
return leaveResult;
}

private void clearAssignmentAndLeaveGroup() {
subscriptions.unsubscribe();
clearAssignment();

// Transition to ensure that a heartbeat request is sent out to effectively leave the
// group (even in the case where the member had no assignment to release or when the
// callback execution failed.)
transitionToSendingLeaveGroup(false);
}

/**
* Reset member epoch to the value required for the leave the group heartbeat request, and
* transition to the {@link MemberState#LEAVING} state so that a heartbeat request is sent
Expand Down Expand Up @@ -616,6 +653,15 @@ void notifyEpochChange(Optional<Integer> epoch) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
}

/**
* Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)} callback for each listener when the
* set of assigned partitions changes. This includes on assignment changes, unsubscribe, and when leaving
* the group.
*/
void notifyAssignmentChange(Set<TopicPartition> partitions) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onGroupAssignmentUpdated(partitions));
}

/**
* @return True if the member should send heartbeat to the coordinator without waiting for
* the interval.
Expand Down
Loading