Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -69,8 +70,18 @@ public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}

private void validateKeys(
Set<CoordinatorKey> groupIds
) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}

@Override
public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
// Set the flag to false as for admin client request,
// we don't need to wait for any pending offset state to clear.
return new OffsetFetchRequest.Builder(groupId.idValue, false, partitions, false);
Expand All @@ -82,14 +93,19 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
validateKeys(groupIds);

final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();

Errors responseError = response.groupLevelError(groupId.idValue);
if (responseError != Errors.NONE) {
handleError(groupId, responseError, failed, unmapped);
// the groupError will contain the group level error for v0-v8 OffsetFetchResponse
Errors groupError = response.groupLevelError(groupId.idValue);
if (groupError != Errors.NONE) {
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

handleGroupError(groupId, groupError, failed, groupsToUnmap);

return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
} else {
final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionDataMap =
Expand All @@ -113,38 +129,44 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
}
completed.put(groupId, groupOffsetsListing);

return new ApiResult<>(
Collections.singletonMap(groupId, groupOffsetsListing),
Collections.emptyMap(),
Collections.emptyList()
);
}
return new ApiResult<>(completed, failed, unmapped);
}

private void handleError(
private void handleGroupError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey,
Throwable> failed,
List<CoordinatorKey> unmapped
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
error.exception());
log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;

case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("OffsetFetch request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`OffsetFetch` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;

default:
log.error("Received unexpected error for group {} in `OffsetFetch` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `OffsetFetch` response"));
log.error("`OffsetFetch` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2960,23 +2960,27 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));

env.kafkaClient().prepareResponse(
new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));

/*
* We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(
new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));

Expand Down Expand Up @@ -3020,17 +3024,21 @@ public void testListConsumerGroupOffsets() throws Exception {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

// Retriable errors should be retried
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));

/*
* We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -67,15 +69,26 @@ public void testSuccessfulHandleResponse() {
assertCompleted(handleWithError(Errors.NONE), expected);
}


@Test
public void testSuccessfulHandleResponseWithOnePartitionError() {
Map<TopicPartition, OffsetAndMetadata> expectedResult = Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));

// expected that there's only 1 partition result returned because the other partition is skipped with error
assertCompleted(handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult);
assertCompleted(handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED), expectedResult);
assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult);
}

@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
}

@Test
Expand All @@ -91,6 +104,24 @@ private OffsetFetchResponse buildResponse(Errors error) {
return response;
}

private OffsetFetchResponse buildResponseWithPartitionError(Errors error) {

Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));

OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, responseData);
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionError(
Errors error
) {
ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
OffsetFetchResponse response = buildResponseWithPartitionError(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithError(
Errors error
) {
Expand Down