Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,8 +72,19 @@ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicParti
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}

private void validateKeys(
Set<CoordinatorKey> groupIds
) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}

@Override
public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);

final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection();
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add(
new OffsetDeleteRequestTopic()
Expand All @@ -97,54 +108,67 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse;
Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
validateKeys(groupIds);

final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse;
final Errors error = Errors.forCode(response.data().errorCode());

if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

handleGroupError(groupId, error, failed, groupsToUnmap);

return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
} else {
final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data().topics().forEach(topic ->
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
response.data().topics().forEach(topic ->
topic.partitions().forEach(partition -> {
Errors partitionError = Errors.forCode(partition.errorCode());
if (!handleError(groupId, partitionError, failed, unmapped)) {
partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError);
}

partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError);
})
);
if (!partitions.isEmpty())
completed.put(groupId, partitions);

return new ApiResult<>(
Collections.singletonMap(groupId, partitionResults),
Collections.emptyMap(),
Collections.emptyList()
);
}
return new ApiResult<>(completed, failed, unmapped);
}

private boolean handleError(
private void handleGroupError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
List<CoordinatorKey> unmapped
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case GROUP_ID_NOT_FOUND:
case INVALID_GROUP_ID:
log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId,
error.exception());
case NON_EMPTY_GROUP:
log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId.idValue, error);
failed.put(groupId, error.exception());
return true;
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`OffsetDelete` request for group id {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
return true;
case NOT_COORDINATOR:
log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
return true;
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`OffsetDelete` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry.", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
return false;
log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3290,11 +3290,11 @@ public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR));
env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

final DeleteConsumerGroupOffsetsResult result = env.adminClient()
.deleteConsumerGroupOffsets("groupId", Stream.of(tp1).collect(Collectors.toSet()));
.deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet()));

TestUtils.assertFutureError(result.all(), TimeoutException.class);
}
Expand Down Expand Up @@ -3322,7 +3322,8 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception {
mockClient.prepareResponse(body -> {
firstAttemptTime.set(time.milliseconds());
return true;
}, prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR));
}, prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));


mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

Expand Down Expand Up @@ -3401,23 +3402,28 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));

/*
* We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse("foo", 0, Errors.NONE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
Expand Down Expand Up @@ -67,48 +69,88 @@ public void testBuildRequest() {
@Test
public void testSuccessfulHandleResponse() {
Map<TopicPartition, Errors> responseData = Collections.singletonMap(t0p0, Errors.NONE);
assertCompleted(handleWithError(Errors.NONE), responseData);
assertCompleted(handleWithGroupError(Errors.NONE), responseData);
}

@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}

@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND));
assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
public void testFailedHandleResponseWithGroupError() {
assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND));
assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID));
assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP));
}

private OffsetDeleteResponse buildResponse(Errors error) {
@Test
public void testFailedHandleResponseWithPartitionError() {
assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC),
handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC));
assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED),
handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED));
assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION),
handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION));
}

private OffsetDeleteResponse buildGroupErrorResponse(Errors error) {
OffsetDeleteResponse response = new OffsetDeleteResponse(
new OffsetDeleteResponseData()
.setErrorCode(error.code()));
if (error == Errors.NONE) {
response.data()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName(t0p0.topic())
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(t0p0.partition())
.setErrorCode(error.code())
).iterator()))
).iterator()));
}
return response;
}

private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
OffsetDeleteResponse response = new OffsetDeleteResponse(
new OffsetDeleteResponseData()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName("t0")
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(error.code())
).iterator()))
).iterator())));
new OffsetDeleteResponseData()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName(t0p0.topic())
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(t0p0.partition())
.setErrorCode(error.code())
).iterator()))
).iterator()))
);
Comment on lines +126 to +137
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent fix

return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithError(
private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithGroupError(
Errors error
) {
DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
OffsetDeleteResponse response = buildResponse(error);
OffsetDeleteResponse response = buildGroupErrorResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithPartitionError(
Errors error
) {
DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
OffsetDeleteResponse response = buildPartitionErrorResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

Expand Down Expand Up @@ -139,7 +181,7 @@ private void assertCompleted(
assertEquals(expected, result.completedKeys.get(key));
}

private void assertFailed(
private void assertGroupFailed(
Class<? extends Throwable> expectedExceptionType,
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> result
) {
Expand All @@ -149,4 +191,20 @@ private void assertFailed(
assertEquals(singleton(key), result.failedKeys.keySet());
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
}

private void assertPartitionFailed(
Map<TopicPartition, Errors> expectedResult,
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> result
) {
CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
assertEquals(singleton(key), result.completedKeys.keySet());

// verify the completed value is expected result
Collection<Map<TopicPartition, Errors>> completeCollection = result.completedKeys.values();
assertEquals(1, completeCollection.size());
assertEquals(expectedResult, result.completedKeys.get(key));

assertEquals(emptyList(), result.unmappedKeys);
assertEquals(emptySet(), result.failedKeys.keySet());
}
}