Skip to content

Conversation

@showuon
Copy link
Member

@showuon showuon commented Jul 12, 2021

Some issues found in the DeleteConsumerGroupOffsetsHandler:

  1. if coordinator errors is put in the topic partition, plus a Errors.NONE, we'll failed with IllegalArgumentException: Partition foo was not included in the original request. This is the new added test case scenario: testDeleteConsumerGroupOffsetsResponseIncludeCoordinatorErrorAndNoneError
  2. Didn't handle all possible exceptions, so there will be "expected" exception, but be logged as "unexpected exception"
  3. In DeleteConsumerGroupOffsetsHandlerTest, we build all errors in partition result, including group error. Split group error tests and partition error tests.

This is the old handle response logic. FYR:

void handleResponse(AbstractResponse abstractResponse) {
    final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse;

    // If coordinator changed since we fetched it, retry
    // note: we use `errorCounts` to collect all errors in the response, including partition errors.
    if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
        Call call = getDeleteConsumerGroupOffsetsCall(context, partitions);
        rescheduleFindCoordinatorTask(context, () -> call, this);
        return;
    }

    // If the error is an error at the group level, the future is failed with it
    final Errors groupError = Errors.forCode(response.data().errorCode());
    if (handleGroupRequestError(groupError, context.future()))
        return;

    final Map<TopicPartition, Errors> partitions = new HashMap<>();
    response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
        new TopicPartition(topic.name(), partition.partitionIndex()),
        Errors.forCode(partition.errorCode())))
    );

    context.future().complete(partitions);
}

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put every error into partitionResults, as the log logic did

Comment on lines 126 to 133
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to #11016, we don't return any completed/failed results if we need to retry.

public void testDeleteConsumerGroupOffsets() throws Exception {
// Happy path

public void testDeleteConsumerGroupOffsetsResponseIncludeCoordinatorErrorAndNoneError() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test to include coordinator error and other None errors in all partition response. We should retry it, too.

@showuon
Copy link
Member Author

showuon commented Jul 12, 2021

@dajac @rajinisivaram @mimaison , please help take a look. Thanks.

partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError);
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
response.data().topics().forEach(topic ->
topic.partitions().forEach(partitionoffsetDeleteResponse -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we keep partition instead of partitionoffsetDeleteResponse? It is a bit more concise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Thanks. Left a few comments.

Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode());
TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
if (partitionError != Errors.NONE) {
handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually not sure about this. Looking at the code on the broker side, it seems that group errors are always returned in the top level error field. I think that we could simply return the partition errors without checking them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was doing the way you suggested, but there's test failed due to that change: testDeleteConsumerGroupOffsetsNumRetries in KafkaAdminClientTest. It put the NOT_COORDINATOR in partition error, and expected to retry. That's why I changed to this.
What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think that it used to work because ConsumerGroupOperationContext.hasCoordinatorMoved relied on response.errorCount(). I think that the unit test is incorrect in this case.

Comment on lines 154 to 155
log.error("Received non retriable error for group {} in `{}` response", groupId,
apiName(), error.exception());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try to uniformize the error messages? For instance OffsetDelete request for group id {} failed due to error {}. I would also print it as debug and we don't need to provide the exception to the logger. The exception doesn't bring much here.

groupsToUnmap.add(groupId);
break;
default:
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unexpectedErrorMsg is not necessary as used only once. I would also follow the same partern that we use for other messages.

case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", apiName(), groupId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of using apiName() here because the name offsetDelete does not start with a capital letter.

Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this line. Is it worth verifying that groupIds only contains the expected groupId here and in buildRequest? I did it here: https://github.com/apache/kafka/pull/11016/files#diff-72f508d8e6b9b7f8fde5de8b75bedb6e7985824b71d00fb172338ec9c4782651R121.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Thanks for the update. I left a few comments.

final Errors error = Errors.forCode(response.data().errorCode());
if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that groupsToRetry is not really necessary in this case. Moreover, we could directly return in the branch as we don't expect errors in the partitions.

if (error != Errors.NONE) {
  final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
  final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

  handleGroupError(groupId, error, failed, groupsToUnmap);

  return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap);
}

groupId will be either in failed or in groupsToUnmap after the call to handleGroupError.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion! Updated!

if (!partitions.isEmpty())
completed.put(groupId, partitions);

completed.put(groupId, partitionResults);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we directly return here as well?

return new ApiResult<>(Collections.singletonMap(groupId, partitionResults), Collections.emptyList(), Collections.emptyList()) ;

I think that it will make the error handling a bit more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId,
error.exception());
case NON_EMPTY_GROUP:
log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId, error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: groupId -> groupId.idValue. There are few other cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! I'll also update other PRs.

break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`OffsetDelete` request for group {} failed because the coordinator" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group -> group id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I'll also update other PRs.

return true;
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`OffsetDelete` request for group {} returned error {}. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group -> group id?

Comment on lines 3363 to 3379
new OffsetDeleteResponseData()
.setTopics(new OffsetDeleteResponseTopicCollection(Stream.of(
new OffsetDeleteResponseTopic()
.setName("foo")
.setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
).iterator())),
new OffsetDeleteResponseTopic()
.setName("bar")
.setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
).iterator()))
).collect(Collectors.toList()).iterator()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is it really better like this? Personally, I prefer the previous indentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I accidentally did it.

.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName("t0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we rely on t0p0 here for the name and the partition?

.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName("t0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Comment on lines 203 to 206
Collection<Map<TopicPartition, Errors>> completeCollection = result.completedKeys.values();
assertEquals(1, completeCollection.size());
Map<TopicPartition, Errors> completeMap = completeCollection.iterator().next();
assertEquals(expectedResult, completeMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already assert that completedKeys only contains key so it seems that we could just verify that result.completedKeys.get(key) is equal to expectedResult, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Updated.

assertEquals(emptyList(), result.unmappedKeys);
assertEquals(emptySet(), result.failedKeys.keySet());
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we add the empty line back?

Comment on lines +126 to +137
new OffsetDeleteResponseData()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName(t0p0.topic())
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(t0p0.partition())
.setErrorCode(error.code())
).iterator()))
).iterator()))
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent fix

@showuon showuon changed the title KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests KAFKA-13059: Make DeleteConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error and fix issue Jul 15, 2021
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch!

@dajac
Copy link
Member

dajac commented Jul 15, 2021

Failures are not related:

Build / JDK 11 and Scala 2.13 / testCommitTransactionTimeout() – kafka.api.TransactionsTest
12s
Build / JDK 11 and Scala 2.13 / shouldBeAbleToQueryFilterState – org.apache.kafka.streams.integration.QueryableStateIntegrationTest

@dajac dajac merged commit 46c91f4 into apache:trunk Jul 15, 2021
dajac pushed a commit that referenced this pull request Jul 15, 2021
…ATOR_NOT_AVAILABLE error (#11019)

This patch improves the error handling in `DeleteConsumerGroupOffsetsHandler`. `COORDINATOR_NOT_AVAILABLE` is not unmapped to trigger a new find coordinator request to be sent out.

Reviewers: David Jacot <djacot@confluent.io>
@dajac
Copy link
Member

dajac commented Jul 15, 2021

Merged to trunk and to 3.0. cc @kkonstantine

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…ATOR_NOT_AVAILABLE error (apache#11019)

This patch improves the error handling in `DeleteConsumerGroupOffsetsHandler`. `COORDINATOR_NOT_AVAILABLE` is not unmapped to trigger a new find coordinator request to be sent out.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants