-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly. #11016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…tion errors correctly.
AlterConsumerGroupOffsetsHandler to handle errors correctly.|
I am reviewing the other handlers which have been introduced in KIP-699. I will follow-up with other PRs if necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dajac , thanks for your nice catch and quick fix! I confirmed that the handleResponse in AlterConsumerGroupOffsetsHandler has the same logic as before. That is, we'll always return each partition result (even with error), except coordinator related errors (that needs retry or unmap).
Here is the previous code for other reviewer's reference:
void handleResponse(AbstractResponse abstractResponse) {
final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
Map<Errors, Integer> errorCounts = response.errorCounts();
// 1) If coordinator changed since we fetched it, retry
// 2) If there is a coordinator error, retry
if (ConsumerGroupOperationContext.hasCoordinatorMoved(errorCounts) ||
ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts)) {
Call call = getAlterConsumerGroupOffsetsCall(context, offsets);
rescheduleFindCoordinatorTask(context, () -> call, this);
return;
}
final Map<TopicPartition, Errors> partitions = new HashMap<>();
for (OffsetCommitResponseTopic topic : response.data().topics()) {
for (OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
partitions.put(tp, error);
}
}
context.future().complete(partitions);
}Thank you.
| } else if (!topicPartitions.containsKey(partition)) { | ||
| result.completeExceptionally(new IllegalArgumentException( | ||
| "Alter offset for partition \"" + partition + "\" was not attempted")); | ||
| this.future.whenComplete((topicPartitions, throwable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a change to lambda expression, no content change, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
| "Failed altering consumer group offsets for the following partitions: " + partitionsFailed); | ||
| } | ||
| return this.future.thenApply(topicPartitionErrorsMap -> { | ||
| List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to lambda expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
rajinisivaram
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dajac Thanks for the PR. Left a few minor comments/questions, apart from that LGTM
| topics.add(topic); | ||
| private void validateKeys( | ||
| Set<CoordinatorKey> groupIds | ||
| ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we put args on the same line unless the list is too big
| case GROUP_AUTHORIZATION_FAILED: | ||
| log.debug("OffsetCommit request for group id {} failed due to error {}.", | ||
| groupId.idValue, error); | ||
| partitionResults.put(topicPartition, error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should REBALANCE_IN_PROGRESS and GROUP_AUTHORIZATION_FAILED be added to groupsToRetry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good question. Prior to KIP-599, we considered them as non retryable errors so I sticked to this here. I think that it might be a good idea to consider them as retryable errors but we should do it consistently for all the group handlers. How about filing a Jira for this and tackling it separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can open a JIRA to do it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Group level errors. | ||
| case INVALID_GROUP_ID: | ||
| case REBALANCE_IN_PROGRESS: | ||
| case INVALID_COMMIT_OFFSET_SIZE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a group-level error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is. It basically indicate that we could write the group metadata to the log so it concerns the group. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L448
|
|
||
| return new ApiResult<>(completed, failed, unmapped); | ||
| if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { | ||
| return new ApiResult<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use ApiResult.completed()
| Collections.emptyList() | ||
| ); | ||
| } else { | ||
| return new ApiResult<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use ApiResult.unmapped()
|
@rajinisivaram Thanks for your comments. I have replied to them add/or addressed them. |
rajinisivaram
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dajac Thanks for the updates, LGTM
|
Failed tests are not related: |
…on errors correctly. (#11016) This patch updates `AlterConsumerGroupOffsetsHandler` to handle partition errors correctly. The issue is that any partition error fails the entire future instead of being passed as an error for its corresponding partition. Reviewers: Luke Chen <showuon@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
|
Merged to trunk and 3.0. |
…on errors correctly. (apache#11016) This patch updates `AlterConsumerGroupOffsetsHandler` to handle partition errors correctly. The issue is that any partition error fails the entire future instead of being passed as an error for its corresponding partition. Reviewers: Luke Chen <showuon@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
While reviewing #10973, I have noticed that
AlterConsumerGroupOffsetsHandlerdoes not handle partition errors correctly. The issue is that any partition error fails the entire future instead of being passed as an error for its corresponding partition.KafkaAdminClientTest#testOffsetCommitWithMultipleErrorsreproduces the bug.The regression was introduced by KIP-699.
Context: #10973 (comment).
Committer Checklist (excluded from commit message)