Skip to content

Commit b7acfcd

Browse files
committed
KAFKA-13072: refactor code
1 parent b4a3bfa commit b7acfcd

File tree

3 files changed

+24
-52
lines changed

3 files changed

+24
-52
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,18 @@ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<MemberIden
6969
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
7070
}
7171

72+
private void validateKeys(
73+
Set<CoordinatorKey> groupIds
74+
) {
75+
if (!groupIds.equals(Collections.singleton(groupId))) {
76+
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
77+
" (expected only " + Collections.singleton(groupId) + ")");
78+
}
79+
}
80+
7281
@Override
73-
public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
82+
public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
83+
validateKeys(groupIds);
7484
return new LeaveGroupRequest.Builder(groupId.idValue, members);
7585
}
7686

@@ -80,9 +90,11 @@ public ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleResponse(
8090
Set<CoordinatorKey> groupIds,
8191
AbstractResponse abstractResponse
8292
) {
93+
validateKeys(groupIds);
94+
8395
final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse;
84-
Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>();
85-
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
96+
final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>();
97+
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
8698
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
8799
final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
88100

@@ -95,10 +107,6 @@ public ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleResponse(
95107
Errors memberError = Errors.forCode(memberResponse.errorCode());
96108
String memberId = memberResponse.memberId();
97109

98-
if (memberError != Errors.NONE) {
99-
handleMemberError(groupId, memberId, memberError, groupsToUnmap, groupsToRetry);
100-
}
101-
102110
memberErrors.put(new MemberIdentity()
103111
.setMemberId(memberId)
104112
.setGroupInstanceId(memberResponse.groupInstanceId()),
@@ -133,64 +141,31 @@ private void handleGroupError(
133141
) {
134142
switch (error) {
135143
case GROUP_AUTHORIZATION_FAILED:
136-
log.error("Received authorization failure for group {} in `{}` response", groupId,
137-
apiName(), error.exception());
144+
log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId, error);
138145
failed.put(groupId, error.exception());
139146
break;
140147

141148
case COORDINATOR_LOAD_IN_PROGRESS:
142149
// If the coordinator is in the middle of loading, then we just need to retry
143-
log.debug("`{}` request for group {} failed because the coordinator " +
144-
"is still in the process of loading state. Will retry", apiName(), groupId);
150+
log.debug("`LeaveGroup` request for group {} failed because the coordinator " +
151+
"is still in the process of loading state. Will retry", groupId);
145152
groupsToRetry.add(groupId);
146153
break;
147154
case COORDINATOR_NOT_AVAILABLE:
148155
case NOT_COORDINATOR:
149156
// If the coordinator is unavailable or there was a coordinator change, then we unmap
150157
// the key so that we retry the `FindCoordinator` request
151-
log.debug("`{}` request for group {} returned error {}. " +
152-
"Will attempt to find the coordinator again and retry", apiName(), groupId, error);
158+
log.debug("`LeaveGroup` request for group {} returned error {}. " +
159+
"Will attempt to find the coordinator again and retry", groupId, error);
153160
groupsToUnmap.add(groupId);
154161
break;
155162

156163
default:
157-
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
158-
groupId, apiName());
159-
log.error(unexpectedErrorMsg, error.exception());
164+
final String unexpectedErrorMsg =
165+
String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId, error);
166+
log.error(unexpectedErrorMsg);
160167
failed.put(groupId, error.exception(unexpectedErrorMsg));
161168
}
162169
}
163170

164-
private void handleMemberError(
165-
CoordinatorKey groupId,
166-
String memberId,
167-
Errors error,
168-
Set<CoordinatorKey> groupsToUnmap,
169-
Set<CoordinatorKey> groupsToRetry
170-
) {
171-
switch (error) {
172-
case COORDINATOR_LOAD_IN_PROGRESS:
173-
// If the coordinator is in the middle of loading, then we just need to retry
174-
log.debug("`{}` request for the member {} in group {} failed because the coordinator " +
175-
"is still in the process of loading state. Will retry", apiName(), memberId, groupId);
176-
groupsToRetry.add(groupId);
177-
break;
178-
case COORDINATOR_NOT_AVAILABLE:
179-
case NOT_COORDINATOR:
180-
// If the coordinator is unavailable or there was a coordinator change, then we unmap
181-
// the key so that we retry the `FindCoordinator` request
182-
log.debug("`{}` request for the member {} in group {} returned error {}. " +
183-
"Will attempt to find the coordinator again and retry", apiName(), memberId, groupId, error);
184-
groupsToUnmap.add(groupId);
185-
break;
186-
case FENCED_INSTANCE_ID:
187-
case UNKNOWN_MEMBER_ID:
188-
log.debug("`{}` request for the member {} in group {} returned error {}.", apiName(), memberId, groupId, error);
189-
break;
190-
default:
191-
log.debug("`{}` request for the member {} in group {} returned unexpected error {}.",
192-
apiName(), memberId, groupId, error);
193-
}
194-
}
195-
196171
}

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3574,7 +3574,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception {
35743574
Collection<MemberToRemove> membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2"));
35753575

35763576
final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup(
3577-
"groupId", new RemoveMembersFromConsumerGroupOptions(membersToRemove));
3577+
GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove));
35783578

35793579
TestUtils.assertFutureError(result.all(), TimeoutException.class);
35803580
}

clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,11 @@ public void testSuccessfulHandleResponse() {
7070
public void testUnmappedHandleResponse() {
7171
assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
7272
assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
73-
assertUnmapped(handleWithMemberError(Errors.COORDINATOR_NOT_AVAILABLE));
74-
assertUnmapped(handleWithMemberError(Errors.NOT_COORDINATOR));
7573
}
7674

7775
@Test
7876
public void testRetriableHandleResponse() {
7977
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
80-
assertRetriable(handleWithMemberError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
8178
}
8279

8380
@Test

0 commit comments

Comments
 (0)