diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index c6af2d4a3db7b..e463911c5d928 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,8 +69,18 @@ public static AdminApiFuture.SimpleAdminApiFuture groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set keys) { + public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set groupIds) { + validateKeys(groupIds); return new LeaveGroupRequest.Builder(groupId.idValue, members); } @@ -79,14 +90,17 @@ public ApiResult> handleResponse( Set groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, error, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { final Map memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { @@ -94,39 +108,45 @@ public ApiResult> handleResponse( .setMemberId(memberResponse.memberId()) .setGroupInstanceId(memberResponse.groupInstanceId()), Errors.forCode(memberResponse.errorCode())); - } - completed.put(groupId, memberErrors); + + return new ApiResult<>( + Collections.singletonMap(groupId, memberErrors), + Collections.emptyMap(), + Collections.emptyList() + ); } - return new ApiResult<>(completed, failed, unmapped); } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, - Errors error, Map failed, - List unmapped + Errors error, + Map failed, + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, - error.exception()); + log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`LeaveGroup` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("LeaveGroup request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`LeaveGroup` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); + groupsToUnmap.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `LeaveGroup` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `LeaveGroup` response")); - break; + log.error("`LeaveGroup` request for group id {} failed due to unexpected error {}", groupId.idValue, error); + failed.put(groupId, error.exception()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..86d019f11cdfc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3574,7 +3574,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception { Collection membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( - "groupId", new RemoveMembersFromConsumerGroupOptions(membersToRemove)); + GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); TestUtils.assertFutureError(result.all(), TimeoutException.class); } @@ -3641,10 +3641,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); - env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); @@ -3653,6 +3649,8 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() @@ -3661,6 +3659,13 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + MemberResponse memberResponse = new MemberResponse() .setGroupInstanceId("instance-1") .setErrorCode(Errors.NONE.code()); @@ -3719,13 +3724,10 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java index 0ffa43b4c0a80..6f5dfda5bc307 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java @@ -63,39 +63,57 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(m1, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(UnknownServerException.class, handleWithError(Errors.UNKNOWN_SERVER_ERROR)); + assertFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertFailed(UnknownServerException.class, handleWithGroupError(Errors.UNKNOWN_SERVER_ERROR)); + } + + @Test + public void testFailedHandleResponseInMemberLevel() { + assertMemberFailed(Errors.FENCED_INSTANCE_ID, handleWithMemberError(Errors.FENCED_INSTANCE_ID)); + assertMemberFailed(Errors.UNKNOWN_MEMBER_ID, handleWithMemberError(Errors.UNKNOWN_MEMBER_ID)); } private LeaveGroupResponse buildResponse(Errors error) { LeaveGroupResponse response = new LeaveGroupResponse( - new LeaveGroupResponseData() - .setErrorCode(error.code()) - .setMembers(singletonList( - new MemberResponse() - .setErrorCode(error.code()) - .setMemberId("m1") - .setGroupInstanceId("m1-gii")))); + new LeaveGroupResponseData() + .setErrorCode(error.code()) + .setMembers(singletonList( + new MemberResponse() + .setErrorCode(Errors.NONE.code()) + .setMemberId("m1") + .setGroupInstanceId("m1-gii")))); return response; } - private AdminApiHandler.ApiResult> handleWithError( + private LeaveGroupResponse buildResponseWithMemberError(Errors error) { + LeaveGroupResponse response = new LeaveGroupResponse( + new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(singletonList( + new MemberResponse() + .setErrorCode(error.code()) + .setMemberId("m1") + .setGroupInstanceId("m1-gii")))); + return response; + } + + private AdminApiHandler.ApiResult> handleWithGroupError( Errors error ) { RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext); @@ -103,6 +121,14 @@ private AdminApiHandler.ApiResult> h return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } + private AdminApiHandler.ApiResult> handleWithMemberError( + Errors error + ) { + RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext); + LeaveGroupResponse response = buildResponseWithMemberError(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + private void assertUnmapped( AdminApiHandler.ApiResult> result ) { @@ -140,4 +166,16 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertMemberFailed( + Errors expectedError, + AdminApiHandler.ApiResult> result + ) { + Map expectedResponseData = Collections.singletonMap(m1, expectedError); + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(singleton(key), result.completedKeys.keySet()); + assertEquals(expectedResponseData, result.completedKeys.get(key)); + } }