Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.stream.Collectors;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.KafkaFuture.BaseFunction;
import org.apache.kafka.common.KafkaFuture.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
Expand All @@ -48,23 +46,19 @@ public class AlterConsumerGroupOffsetsResult {
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
@Override
public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Alter offset for partition \"" + partition + "\" was not attempted"));
this.future.whenComplete((topicPartitions, throwable) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a change to lambda expression, no content change, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Alter offset for partition \"" + partition + "\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
}
result.completeExceptionally(error.exception());
}

}
});

Expand All @@ -75,22 +69,19 @@ public void accept(final Map<TopicPartition, Errors> topicPartitions, final Thro
* Return a future which succeeds if all the alter offsets succeed.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
@Override
public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
}
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to lambda expression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
}
return null;
}
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3642,9 +3642,11 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin
}

@Override
public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterConsumerGroupOffsetsOptions options) {
public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(
String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterConsumerGroupOffsetsOptions options
) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> future =
AlterConsumerGroupOffsetsHandler.newFuture(groupId);
AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.kafka.clients.admin.internals;

import static java.util.Collections.singleton;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -73,29 +75,38 @@ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicParti
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}

@Override
public OffsetCommitRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
List<OffsetCommitRequestTopic> topics = new ArrayList<>();
Map<String, List<OffsetCommitRequestPartition>> offsetData = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
String topic = entry.getKey().topic();
OffsetAndMetadata oam = entry.getValue();
OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition()
.setCommittedOffset(oam.offset())
.setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
.setCommittedMetadata(oam.metadata())
.setPartitionIndex(entry.getKey().partition());
offsetData.computeIfAbsent(topic, key -> new ArrayList<>()).add(partition);
}
for (Map.Entry<String, List<OffsetCommitRequestPartition>> entry : offsetData.entrySet()) {
OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
.setName(entry.getKey())
.setPartitions(entry.getValue());
topics.add(topic);
private void validateKeys(Set<CoordinatorKey> groupIds) {
if (!groupIds.equals(singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + singleton(groupId) + ")");
}
}

@Override
public OffsetCommitRequest.Builder buildRequest(
int coordinatorId,
Set<CoordinatorKey> groupIds
) {
validateKeys(groupIds);

Map<String, OffsetCommitRequestTopic> offsetData = new HashMap<>();
offsets.forEach((topicPartition, offsetAndMetadata) -> {
OffsetCommitRequestTopic topic = offsetData.computeIfAbsent(
topicPartition.topic(),
key -> new OffsetCommitRequestTopic().setName(topicPartition.topic())
);

topic.partitions().add(new OffsetCommitRequestPartition()
.setCommittedOffset(offsetAndMetadata.offset())
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(-1))
.setCommittedMetadata(offsetAndMetadata.metadata())
.setPartitionIndex(topicPartition.partition()));
});

OffsetCommitRequestData data = new OffsetCommitRequestData()
.setGroupId(groupId.idValue)
.setTopics(topics);
.setTopics(new ArrayList<>(offsetData.values()));

return new OffsetCommitRequest.Builder(data);
}

Expand All @@ -105,53 +116,88 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
validateKeys(groupIds);

final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();

Map<TopicPartition, Errors> partitions = new HashMap<>();
for (OffsetCommitResponseTopic topic : response.data().topics()) {
for (OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());

if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
handleError(
groupId,
topicPartition,
error,
partitionResults,
groupsToUnmap,
groupsToRetry
);
} else {
partitions.put(tp, error);
partitionResults.put(topicPartition, error);
}
}
}
if (failed.isEmpty() && unmapped.isEmpty())
completed.put(groupId, partitions);

return new ApiResult<>(completed, failed, unmapped);
if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
return ApiResult.completed(groupId, partitionResults);
} else {
return ApiResult.unmapped(new ArrayList<>(groupsToUnmap));
}
}

private void handleError(
CoordinatorKey groupId,
TopicPartition topicPartition,
Errors error,
Map<CoordinatorKey, Throwable> failed,
List<CoordinatorKey> unmapped
Map<TopicPartition, Errors> partitionResults,
Set<CoordinatorKey> groupsToUnmap,
Set<CoordinatorKey> groupsToRetry
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId,
error.exception());
failed.put(groupId, error.exception());
break;
// If the coordinator is in the middle of loading, then we just need to retry.
case COORDINATOR_LOAD_IN_PROGRESS:
log.debug("OffsetCommit request for group id {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", groupId.idValue);
groupsToRetry.add(groupId);
break;

// If the coordinator is not available, then we unmap and retry.
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error);
unmapped.add(groupId);
log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;

// Group level errors.
case INVALID_GROUP_ID:
case REBALANCE_IN_PROGRESS:
case INVALID_COMMIT_OFFSET_SIZE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a group-level error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is. It basically indicate that we could write the group metadata to the log so it concerns the group. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L448

case GROUP_AUTHORIZATION_FAILED:
log.debug("OffsetCommit request for group id {} failed due to error {}.",
groupId.idValue, error);
partitionResults.put(topicPartition, error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should REBALANCE_IN_PROGRESS and GROUP_AUTHORIZATION_FAILED be added to groupsToRetry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good question. Prior to KIP-599, we considered them as non retryable errors so I sticked to this here. I think that it might be a good idea to consider them as retryable errors but we should do it consistently for all the group handlers. How about filing a Jira for this and tackling it separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can open a JIRA to do it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break;

// TopicPartition level errors.
case UNKNOWN_TOPIC_OR_PARTITION:
case OFFSET_METADATA_TOO_LARGE:
case TOPIC_AUTHORIZATION_FAILED:
log.debug("OffsetCommit request for group id {} and partition {} failed due" +
" to error {}.", groupId.idValue, topicPartition, error);
partitionResults.put(topicPartition, error);
break;

// Unexpected errors.
default:
log.error("Received unexpected error for group {} in `OffsetCommit` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `OffsetCommit` response"));
log.error("OffsetCommit request for group id {} and partition {} failed due" +
" to unexpected error {}.", groupId.idValue, topicPartition, error);
partitionResults.put(topicPartition, error);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,39 @@ public void testOffsetCommitNumRetries() throws Exception {
}
}

@Test
public void testOffsetCommitWithMultipleErrors() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();

try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

final TopicPartition foo0 = new TopicPartition("foo", 0);
final TopicPartition foo1 = new TopicPartition("foo", 1);

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

Map<TopicPartition, Errors> responseData = new HashMap<>();
responseData.put(foo0, Errors.NONE);
responseData.put(foo1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, responseData));

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(foo0, new OffsetAndMetadata(123L));
offsets.put(foo1, new OffsetAndMetadata(456L));
final AlterConsumerGroupOffsetsResult result = env.adminClient()
.alterConsumerGroupOffsets(GROUP_ID, offsets);

assertNull(result.partitionResult(foo0).get());
TestUtils.assertFutureError(result.partitionResult(foo1), UnknownTopicOrPartitionException.class);

TestUtils.assertFutureError(result.all(), UnknownTopicOrPartitionException.class);
}
}

@Test
public void testOffsetCommitRetryBackoff() throws Exception {
MockTime time = new MockTime();
Expand Down Expand Up @@ -4109,9 +4142,6 @@ public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));

Expand Down
Loading