Skip to content

Conversation

@showuon
Copy link
Member

@showuon showuon commented Jul 12, 2021

Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error

This is the old handle response logic. FYR:

void handleResponse(AbstractResponse abstractResponse) {
      final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
      final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();

      // If coordinator changed since we fetched it, retry
      // here, we'll check all errors, including partition errors, to see if we need to retry
      if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
          Call call = getListConsumerGroupOffsetsCall(context);
          rescheduleFindCoordinatorTask(context, () -> call, this);
          return;
      }

      if (handleGroupRequestError(response.error(), context.future()))
          return;

      for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
          response.responseData().entrySet()) {
          final TopicPartition topicPartition = entry.getKey();
          OffsetFetchResponse.PartitionData partitionData = entry.getValue();
          final Errors error = partitionData.error;

          if (error == Errors.NONE) {
              final Long offset = partitionData.offset;
              final String metadata = partitionData.metadata;
              final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
              // Negative offset indicates that the group has no committed offset for this partition
              if (offset < 0) {
                  groupOffsetsListing.put(topicPartition, null);
              } else {
                  groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
              }
          } else {
              log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
          }
      }
      context.future().complete(groupOffsetsListing);
  }

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Member Author

showuon commented Jul 12, 2021

@dajac , please take a look. Thanks.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments. Thanks for the PR.


if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
return new ApiResult<>(
completed,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could get rid of completed and use Collections.singletonMap(groupId, groupOffsetsListing), no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can't do that because the completed here could be empty map. If we put Collections.singletonMap(groupId, groupOffsetsListing), it'll always not empty. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I think that there is a case that we don't handle correctly.

Imagine that GROUP_AUTHORIZATION_FAILED is returned as a partition error. In this case, we ignore it in handlePartitionError and therefore don't add the failed group to failed. I think that we should also handle all the group level errors in handlePartitionError.

The second thing is that if there is a group failure, we should not add the group to completed at L131. Otherwise, this will complete the group future with an empty list.

Could you check this out and add a test for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Will do it tomorrow (my time). Thanks.

}

}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we bring this back?

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Thanks for the update. I left a few more comments.


if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
return new ApiResult<>(
completed,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I think that there is a case that we don't handle correctly.

Imagine that GROUP_AUTHORIZATION_FAILED is returned as a partition error. In this case, we ignore it in handlePartitionError and therefore don't add the failed group to failed. I think that we should also handle all the group level errors in handlePartitionError.

The second thing is that if there is a group failure, we should not add the group to completed at L131. Otherwise, this will complete the group future with an empty list.

Could you check this out and add a test for it?

switch (error) {
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also update the log messages here and below to follow what you did in handleGroupError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I forgot the partitionError section. Will do.

final String unexpectedErrorMsg =
String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error);
log.error(unexpectedErrorMsg);
failed.put(groupId, error.exception(unexpectedErrorMsg));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also remove providing the error message here like we did for the others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks.

@showuon
Copy link
Member Author

showuon commented Jul 16, 2021

Failed tests are unrelated, thanks.

    Build / JDK 16 and Scala 2.13 / kafka.api.TransactionsTest.testCommitTransactionTimeout()
    Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()

@dajac dajac changed the title KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests KAFKA-13064: Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error Jul 16, 2021
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dajac
Copy link
Member

dajac commented Jul 16, 2021

Failures are not related:

Build / JDK 16 and Scala 2.13 / testCommitTransactionTimeout() – kafka.api.TransactionsTest
9s
Build / JDK 11 and Scala 2.13 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
7s
Build / JDK 11 and Scala 2.13 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
10s
Build / JDK 8 and Scala 2.12 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
6s
Build / JDK 8 and Scala 2.12 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest

@dajac dajac merged commit 4fd6d2b into apache:trunk Jul 16, 2021
dajac pushed a commit that referenced this pull request Jul 16, 2021
…OR_NOT_AVAILABLE error (#11026)

This patch improve the error handling in `ListConsumerGroupOffsetsHandler` and ensures that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
@dajac
Copy link
Member

dajac commented Jul 16, 2021

Merged to trunk and 3.0.

@dajac
Copy link
Member

dajac commented Jul 16, 2021

@showuon Thanks for the patches. Could you update the description of this PR and the others to ensure that the description reflects the changes?

@showuon
Copy link
Member Author

showuon commented Jul 16, 2021

@dajac , all checked and updated. Thank you very much for your patiently review all these PRs! After these update, we are more confident in these new handlers. :)

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…OR_NOT_AVAILABLE error (apache#11026)

This patch improve the error handling in `ListConsumerGroupOffsetsHandler` and ensures that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants