diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 17e2b2609c6ec..4f47c8591145b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -432,6 +432,22 @@ private void onErrorResponse(final R response, final long currentTimeMs) { "subscribe. " + errorMessage)); break; + case GROUP_ID_NOT_FOUND: + // If the group doesn't exist (e.g., member never joined due to InvalidTopicException), + // GROUP_ID_NOT_FOUND should be ignored - the leave is effectively complete. + // When a leave heartbeat (epoch=-1) is sent, the state transitions synchronously + // from LEAVING to UNSUBSCRIBED in onHeartbeatRequestGenerated() before the request is sent. + if (membershipManager().state() == MemberState.UNSUBSCRIBED) { + logger.info("{} received GROUP_ID_NOT_FOUND for group {} while unsubscribed. ", + heartbeatRequestName(), membershipManager().groupId()); + membershipManager().onHeartbeatRequestSkipped(); + } else { + // Else, this is a fatal error, we should throw it and transition to fatal state. + logger.error("{} failed due to unexpected error {}: {}", heartbeatRequestName(), error, errorMessage); + handleFatalFailure(error.exception(errorMessage)); + } + break; + default: if (!handleSpecificExceptionInResponse(response, currentTimeMs)) { // If the manager receives an unknown error - there could be a bug in the code or a new error code diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 9063ae5ab5bf4..3ef1d712c9667 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -429,6 +429,57 @@ public void testFailureOnFatalException() { verify(backgroundEventHandler).add(any()); } + /** + * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not a fatal error. + * This can happen when the consumer never successfully joined the group + * (e.g., due to an InvalidTopicException during poll() and close() sends + * a leave heartbeat for a group that was never created. + */ + @Test + public void testGroupIdNotFoundExceptionWhileUnsubscribed() { + // Setup: member is in UNSUBSCRIBED state with epoch -1 + when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); + when(membershipManager.memberEpoch()).thenReturn(-1); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Complete the heartbeat with GROUP_ID_NOT_FOUND error + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + // Verify: no fatal error, heartbeat skipped (benign) + verify(membershipManager, never()).transitionToFatal(); + verify(membershipManager).onHeartbeatRequestSkipped(); + verify(backgroundEventHandler, never()).add(any()); + } + + /** + * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. + * This would indicate the group was unexpectedly deleted while the member + * was actively participating. + */ + @Test + public void testGroupIdNotFoundWhileStableIsFatal() { + // Setup: member is in STABLE state with positive epoch + when(membershipManager.state()).thenReturn(MemberState.STABLE); + when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Complete the heartbeat with GROUP_ID_NOT_FOUND error + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + // Verify: fatal error + verify(membershipManager).transitionToFatal(); + verify(backgroundEventHandler).add(any()); + } + @Test public void testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() { time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 8952271b250d5..20528e775deef 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -354,6 +354,57 @@ public void testFailureOnFatalException() { verify(backgroundEventHandler).add(any()); } + /** + * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as fatal. + * This can happen when the consumer never successfully joined the group + * (e.g., due to an InvalidTopicException during poll() and close() sends + * a leave heartbeat for a group that was never created. + */ + @Test + public void testGroupIdNotFoundExceptionWhileUnsubscribed() { + // Setup: member is in UNSUBSCRIBED state with epoch -1 + when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); + when(membershipManager.memberEpoch()).thenReturn(-1); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Complete the heartbeat with GROUP_ID_NOT_FOUND error + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + // Verify: no fatal error, heartbeat skipped (benign) + verify(membershipManager, never()).transitionToFatal(); + verify(membershipManager).onHeartbeatRequestSkipped(); + verify(backgroundEventHandler, never()).add(any()); + } + + /** + * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. + * This would indicate the group was unexpectedly deleted while the member + * was actively participating. + */ + @Test + public void testGroupIdNotFoundWhileStableIsFatal() { + // Setup: member is in STABLE state with positive epoch + when(membershipManager.state()).thenReturn(MemberState.STABLE); + when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Complete the heartbeat with GROUP_ID_NOT_FOUND error + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + // Verify: fatal error + verify(membershipManager).transitionToFatal(); + verify(backgroundEventHandler).add(any()); + } + @Test public void testNoCoordinator() { when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());