Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -68,8 +69,18 @@ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<MemberIden
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}

private void validateKeys(
Set<CoordinatorKey> groupIds
) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}

@Override
public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
return new LeaveGroupRequest.Builder(groupId.idValue, members);
}

Expand All @@ -79,54 +90,63 @@ public ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
validateKeys(groupIds);
final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse;
Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();

final Errors error = Errors.forCode(response.data().errorCode());
final Errors error = response.topLevelError();
if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

handleGroupError(groupId, error, failed, groupsToUnmap);

return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
} else {
final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
for (MemberResponse memberResponse : response.memberResponses()) {
memberErrors.put(new MemberIdentity()
.setMemberId(memberResponse.memberId())
.setGroupInstanceId(memberResponse.groupInstanceId()),
Errors.forCode(memberResponse.errorCode()));

}
completed.put(groupId, memberErrors);

return new ApiResult<>(
Collections.singletonMap(groupId, memberErrors),
Collections.emptyMap(),
Collections.emptyList()
);
}
return new ApiResult<>(completed, failed, unmapped);
}

private void handleError(
private void handleGroupError(
CoordinatorKey groupId,
Errors error, Map<CoordinatorKey,
Throwable> failed,
List<CoordinatorKey> unmapped
Errors error,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId,
error.exception());
log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;

case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`LeaveGroup` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("LeaveGroup request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`LeaveGroup` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;

default:
log.error("Received unexpected error for group {} in `LeaveGroup` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `LeaveGroup` response"));
break;
log.error("`LeaveGroup` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3574,7 +3574,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception {
Collection<MemberToRemove> membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2"));

final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup(
"groupId", new RemoveMembersFromConsumerGroupOptions(membersToRemove));
GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove));

TestUtils.assertFutureError(result.all(), TimeoutException.class);
}
Expand Down Expand Up @@ -3641,10 +3641,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));

env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())));
Expand All @@ -3653,6 +3649,8 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception {
* We need to return two responses here, one for NOT_COORDINATOR call when calling remove member
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
Expand All @@ -3661,6 +3659,13 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

MemberResponse memberResponse = new MemberResponse()
.setGroupInstanceId("instance-1")
.setErrorCode(Errors.NONE.code());
Expand Down Expand Up @@ -3719,13 +3724,10 @@ public void testRemoveMembersFromGroup() throws Exception {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

// Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

// Retriable errors should be retried
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,72 @@ public void testBuildRequest() {
@Test
public void testSuccessfulHandleResponse() {
Map<MemberIdentity, Errors> responseData = Collections.singletonMap(m1, Errors.NONE);
assertCompleted(handleWithError(Errors.NONE), responseData);
assertCompleted(handleWithGroupError(Errors.NONE), responseData);
}

@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}

@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(UnknownServerException.class, handleWithError(Errors.UNKNOWN_SERVER_ERROR));
assertFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(UnknownServerException.class, handleWithGroupError(Errors.UNKNOWN_SERVER_ERROR));
}

@Test
public void testFailedHandleResponseInMemberLevel() {
assertMemberFailed(Errors.FENCED_INSTANCE_ID, handleWithMemberError(Errors.FENCED_INSTANCE_ID));
assertMemberFailed(Errors.UNKNOWN_MEMBER_ID, handleWithMemberError(Errors.UNKNOWN_MEMBER_ID));
}

private LeaveGroupResponse buildResponse(Errors error) {
LeaveGroupResponse response = new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(error.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(error.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
new LeaveGroupResponseData()
.setErrorCode(error.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(Errors.NONE.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithError(
private LeaveGroupResponse buildResponseWithMemberError(Errors error) {
LeaveGroupResponse response = new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(error.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithGroupError(
Errors error
) {
RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
LeaveGroupResponse response = buildResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithMemberError(
Errors error
) {
RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
LeaveGroupResponse response = buildResponseWithMemberError(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private void assertUnmapped(
AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> result
) {
Expand Down Expand Up @@ -140,4 +166,16 @@ private void assertFailed(
assertEquals(singleton(key), result.failedKeys.keySet());
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
}

private void assertMemberFailed(
Errors expectedError,
AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> result
) {
Map<MemberIdentity, Errors> expectedResponseData = Collections.singletonMap(m1, expectedError);
CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
assertEquals(expectedResponseData, result.completedKeys.get(key));
}
}