Skip to content

Comments

KAFKA-16985: Ensure consumer attempts to send leave request on close even if interrupted#16686

Merged
lianetm merged 58 commits intoapache:trunkfrom
kirktrue:KAFKA-16985-clear-interrupt-on-consumer-close
Nov 13, 2024
Merged

KAFKA-16985: Ensure consumer attempts to send leave request on close even if interrupted#16686
lianetm merged 58 commits intoapache:trunkfrom
kirktrue:KAFKA-16985-clear-interrupt-on-consumer-close

Conversation

@kirktrue
Copy link
Contributor

@kirktrue kirktrue commented Jul 24, 2024

This change ensures that AsyncKafkaConsumer attempts to leave the group cleanly upon close(), regardless of the timeout or interrupts. The application thread will invoke the ConsumerRebalanceListener’s callback, if present, to release the assignment, and then enqueue a LeaveGroupOnClose for the background thread. The background thread will send out the “leave group” heartbeat, and will notify the application thread (if still waiting) upon response.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…errupted

The consumer should attempt to leave the group cleanly upon close(), regardless of a) the timeout, b) interrupts.

If the user interrupted the current thread, upon close(), note the interruption, clear the flag, proceed with the close logic, and then throw an InterruptException at the end of close().

If the user invokes close() with a low timeout, make sure to process the background events even if the UnsubscribeEvent itself timed out. Those background events are needed to signal the 'leave group' action in the background thread.
@kirktrue kirktrue added KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) consumer labels Jul 24, 2024
@kirktrue kirktrue marked this pull request as ready for review August 1, 2024 17:18
@kirktrue kirktrue requested a review from lianetm August 1, 2024 17:19
@kirktrue
Copy link
Contributor Author

kirktrue commented Aug 1, 2024

@lianetm @philipnee—can I get a review, please 😄

@kirktrue
Copy link
Contributor Author

kirktrue commented Aug 1, 2024

Latest test failures look unrelated.

@kirktrue kirktrue requested a review from lianetm August 21, 2024 23:58
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @kirktrue ! Here are some comments after another pass to non-testing files

SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(assignedPartitions);

boolean isThreadInterrupted = Thread.currentThread().isInterrupted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed anymore right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was removed already. Not sure how it was still showing for you 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the review I mentioned I started adding comments one day, and finished the day after he he, that's probably why.

applicationEventHandler.add(new CommitOnCloseEvent());
}

private void releaseAssignmentOnClose(final Timer timer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: matter of changed perception I guess, but what about renaming this to clearly indicate that it is running callbacks? ( ~runRebalanceCallbacksOnClose). I see it's important given the challenges that callbacks bring here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to runRebalanceCallbacksOnClose() as suggested 👍

Comment on lines 1357 to 1360
ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null);

if (cgm == null)
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to you, but what about:

Suggested change
ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null);
if (cgm == null)
return;
if (!groupMetadata.get().isPresent())
return;
int memberEpoch = groupMetadata.get().get().generationId();

I find it's comes back clearer later on, to refer to member epoch instead of generation id to determine the callback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored as suggested 👍

* @return Future that will complete when the callback execution completes and the heartbeat
* to leave the group has been sent out.
*/
protected CompletableFuture<Void> leaveGroup(Supplier<CompletableFuture<Void>> callbackFutureSupplier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why passing a Supplier of CompletableFuture instead of simply passing the CompletableFuture itself? seems more verbose/obfuscated but I could be missing an upside.

In any case passing an Optional maybe suits better? given that we want to represent that the leave may (or may not) receive a future to wait on before sending the request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an ordering issue 😢

To pass the CompletableFuture into the (internal) leaveGroup() method, the code would have to execute signalMemberLeavingGroup() first. For the ConsumerMembershipManager, signalMemberLeavingGroup() is overridden and calls invokeOnPartitionsRevokedOrLostToReleaseAssignment(). I assumed that invoking invokeOnPartitionsRevokedOrLostToReleaseAssignment() before calling leaveGroup() would throw off the existing logic.

That said, I don't like the use of Supplier here, either. I also wanted to leave the existing code as is, where possible.

Let me know if you have ideas to simplify it.

if (exception == null) {
// Enable newly added partitions to start fetching and updating positions for them.
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
notifyAssignmentChange(addedPartitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should notify with the full assignment here I would say (assignedPartitions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I should have looked more closely at the actual variable name of addedPartitions 🤦‍♂️

if (subscriptions.assignFromUser(new HashSet<>(event.partitions())))
if (subscriptions.assignFromUser(new HashSet<>(partitions))) {
metadata.requestUpdateForNewTopics();
requestManagers.consumerMembershipManager.ifPresent(cmm -> cmm.notifyAssignmentChange(new HashSet<>(partitions)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm this is tricky, I wonder if dangerous. Here we are saying that the membership mgr will notify about an assignment that is not really a group assignment (it's manual assign). That notification is then taken on the consumer close to trigger callbacks and send a request to the coordinator (I expect it will all be no-op or throw errors at some point?). I know there is a group check on those operations on close, but we could have groupId but not be in a group if there was never a call to subscribe.

If what we really need in the consumer is a snapshot of group assignment (needed to trigger callbacks), why don't we better keep just that (explicit name, it's not the consumer assignment, it's the consumer group assignment), and do not involve that in this call which is about manual assign? Limiting the scope to what we really need seems to avoid trouble in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've made the snapshot targeted only for consumer group members. I'd love for it to be more general, but this is the right call. PTAL at the changes when you have the time. Thanks!

* to perform the necessary steps to leave the consumer group cleanly, if possible. The event's timeout is based on
* either the user-provided value to {@link Consumer#close(Duration)} or
* {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} if {@link Consumer#close()} was called. The event is considered
* complete when the membership manager sends the heartbeat message to leave the group. The event does not wait on a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This event does wait for the response from the coordinator right? It's called with addAndGet, and this event future completes when the leaveGroupOnClose in the membershipMgr completes, and that only happens on maybeCompleteLeaveInProgress, when we receive a response to the HB to leave (or skip HB, or get error).

We intentionally did that to keep the same behaviour of the classic consumer (coordinator does awaitPendingRequests on close), and to avoid responses to disconnected clients that we used to have before, when we would send the leave and carry on with the consumer close/shutdown without waiting for a response. Makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is that the event should wait until the response has been received for those cases where the timeout is sufficient. I've updated the comment/documentation.

@kirktrue
Copy link
Contributor Author

kirktrue commented Nov 7, 2024

Thanks for the review, @lianetm 🙏

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kirktrue, thanks for the updates!

requestManagers.commitRequestManager.get().signalClose();
}

private void process(@SuppressWarnings("unused") final LeaveGroupOnCloseEvent event) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this suppressWarning needed? (the param is used to complete the future)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* @return Future that will complete when the callback execution completes and the heartbeat
* to leave the group has been sent out.
*/
protected CompletableFuture<Void> leaveGroup(boolean isClosing) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be clearer to have this param explicitly naming what it does vs. when it's used? (ie. runCallbacks). Mainly because this manager knows nothing about closing really, it knows about leaving a group with or without callbacks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to runCallbacks.

leaveGroupInProgress = Optional.of(leaveResult);

CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
CompletableFuture<Void> callbackResult = signalMemberLeavingGroup(isClosing);
Copy link
Member

@lianetm lianetm Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could avoid propagating the isClosing/runCallbacks to this func that is really about "invokeCallbacks" (we're effectively calling an "invokeCallbacks" func, passing a param "runCallback" that can be false). This seems confusing and could lead to misleading logs I would say (ie. we would end up with the log line "Member completed callbacks..." when it really didn't do that here).

So one idea that comes to mind is to shortcircuit before the signalMemberLeavingGroup:

        if (isClosing/runCallbacks) {
            CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
            callbackResult.whenComplete((result, error) -> {
                // log callback completion
                ...
                clearAssignmentAndSendLeave(); // encapsulate what we do after callbacks
            });
        } else {
            clearAssignmentAndSendLeave();
        }

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clearAssignmentAndSendLeave() method as suggested.


/**
* Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)} callback for each listener when the
* set of assigned partitions changes. This includes on assignment changes, unsubscribing, and when leaving
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo unsubscribing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 1172 to 1174
Set<TopicPartition> allAssignedPartitions = assignedPartitions.stream()
.map(tip -> new TopicPartition(tip.topic(), tip.partition()))
.collect(Collectors.toSet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can reuse the toTopicPartitionSet already defined in this same class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks, the code was ugly 😅

.map(tip -> new TopicPartition(tip.topic(), tip.partition()))
.collect(Collectors.toSet());

notifyAssignmentChange(allAssignedPartitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm this should probably go right after the subscription is updated, not after the callback complete. Having it here, I guess it would allow for a race if there is a call in the app thread to consumer.close, while a reconciliation is running this code. If the call to consumer.close happens after callbacks complete on ln 1167 above, but before making it to this line, the app thread could potentially close revoking partitions without including the ones just added here right?

So we could just call this notify right after updateSubscriptionAwaitingCallback (which is the one that does the actual update of the subscription state). From that moment on the partitions are effectively assigned even though the onPartitionsAssigned may have not completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved notifyAssignmentChange() to right after subscriptions.assignFromSubscribedAwaitingCallback() in updateSubscriptionAwaitingCallback().

Duration timeout = Duration.ofMillis(timeoutMs);

try {
Thread.currentThread().interrupt();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to interrupt if we're already mocking the interrupt exception above? ln 698

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Removed.

consumer.subscriptions().assignFromUser(new HashSet<>(event.partitions()));
HashSet<TopicPartition> partitions = new HashSet<>(event.partitions());
consumer.subscriptions().assignFromUser(partitions);
consumer.setGroupAssignmentSnapshot(partitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this func if about manual assignment so I would expect we don't need to set the group assignment snapshot right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. That's a holdover from when I was updating the snapshot for manually-assigned partitions too.


@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a very similar test for close(0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worked up the test, but I ran it 10 times and it failed twice. The 'leave group on 0 timeout' is a best effort, so the timing may not work out that we can send off the request before the ConsumerNetworkThread shuts down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, makes sense. It's still important to ensure we run callbacks on close(0) but that is covered on testCloseLeavesGroupDespiteOnPartitionsLostError, all good.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @kirktrue !

Could you please update the PR description to remove all the old callback flow that is not applied on close anymore? Important to call out that this is ensuring callbacks executed + attempt to send the leave request, regardless of timeout/interrupt.

Collection<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
Set<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, addedPartitions);
notifyAssignmentChange(assignedTopicPartitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are notifying on the path of new assignment and on leave, but I think we're missing the fenced/fatal paths.

Those 2 end up calling clearAssignment, so maybe we could notify there to cover them? (right after ln 483 subscriptions.assignFromSubscribed(Collections.emptySet()))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to look like this:

    private void clearAssignment() {
        if (subscriptions.hasAutoAssignedPartitions()) {
            subscriptions.assignFromSubscribed(Collections.emptySet());
            notifyAssignmentChange(Collections.emptySet());
        }
        currentAssignment = LocalAssignment.NONE;
        clearPendingAssignmentsAndLocalNamesCache();
    }

Comment on lines 1307 to 1308
* The {@link #close(Duration)} method should otherwise honor the timeout and interrupt flag,
* except where it would violate the previous tenets.
Copy link
Member

@lianetm lianetm Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we're making it sound more complicated than it is, by saying "except where it would violate the previous tenets" and given that there are 12 tenets above :). Close always honours timeout except for callbacks only right? (already mentioned in point 4). If we simplify as mentioned above, adding to 4 the fact that callbacks are not time-bounded, I would say we don't need to have this one maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Removed.

Comment on lines 1289 to 1290
* Leaving the consumer group is achieved by issuing a ‘leave group‘ network request. This network I/O
* must be performed on the background thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true but this is not specific to the close in any way right? (It's the same for all api calls that issue requests). Would it be better/simpler to focus on the tenets of the close itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but there was some discussion earlier in this thread which suggested clarification around which thread does what part of the overall task. I removed it, per your suggestion.

Comment on lines 1263 to 1264
* Since the {@link ConsumerRebalanceListener} APIs do not include a timeout parameter, a given
* {@link ConsumerRebalanceListener} implementation cannot alter its behavior to adhere to the timeout.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this is closely related to the one above (said in a way there already). Callbacks block for the full execution, not time-bounded, that's it. Could we simplify by merging them? (just adding the "not time bounded" bit above). Trying to simplify, this doc got a bit too complicated I would say.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consolidated these points. LMK if it still needs work.

/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the unsubscribe API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This is expected to be invoked when the user calls the unsubscribe API.
* This is expected to be invoked when the user calls the unsubscribe API or is closing the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change as suggested.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kirktrue, just these minor comments, almost there.

private void clearAssignmentAndLeaveGroup() {
subscriptions.unsubscribe();
clearAssignment();
notifyAssignmentChange(Collections.emptySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed here anymore right? (because clearAssignment will do)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I removed the extra call since clearAssignment() handles it where appropriate anyway.

* </li>
* </ol>
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, makes sense. It's still important to ensure we run callbacks on close(0) but that is covered on testCloseLeavesGroupDespiteOnPartitionsLostError, all good.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @kirktrue! LGTM

@lianetm lianetm merged commit b6b2c9e into apache:trunk Nov 13, 2024
@kirktrue
Copy link
Contributor Author

Thank you ♾️ @lianetm and @chia7712!!!! Thanks also to everyone else who helped define and frame the problem.

@kirktrue kirktrue deleted the KAFKA-16985-clear-interrupt-on-consumer-close branch November 13, 2024 19:39
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
…even if interrupted (apache#16686)

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>, Philip Nee <pnee@confluent.io>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…even if interrupted (apache#16686)

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>, Philip Nee <pnee@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients consumer core Kafka Broker ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants