Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,24 @@ public boolean requireStable() {
return data.requireStable();
}

public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() {
if (version() >= 8) {
return data.groups();
} else {
OffsetFetchRequestData.OffsetFetchRequestGroup group =
new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());

data.topics().forEach(topic -> {
group.topics().add(new OffsetFetchRequestTopics()
.setName(topic.name())
.setPartitionIndexes(topic.partitionIndexes())
);
});

return Collections.singletonList(group);
}
}

public Map<String, List<TopicPartition>> groupIdsToPartitions() {
Map<String, List<TopicPartition>> groupIdsToPartitions = new HashMap<>();
for (OffsetFetchRequestGroup group : data.groups()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map.Entry;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I follow-up, I would like to clean this class. There are way too many ways to construct this object and the logic is pretty complicated. It is the same for the request. Let's do this separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like a good plan. Thanks!

import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
Expand Down Expand Up @@ -119,12 +120,6 @@ public int hashCode() {
}
}

public OffsetFetchResponse(OffsetFetchResponseData data) {
super(ApiKeys.OFFSET_FETCH);
this.data = data;
this.error = null;
}

/**
* Constructor without throttle time.
* @param error Potential coordinator or group level error code (for api version 2 and later)
Expand Down Expand Up @@ -208,6 +203,59 @@ public OffsetFetchResponse(int throttleTimeMs,
this.error = null;
}

public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) {
super(ApiKeys.OFFSET_FETCH);
data = new OffsetFetchResponseData();

if (version >= 8) {
data.setGroups(groups);
error = null;

for (OffsetFetchResponseGroup group : data.groups()) {
this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()));
}
} else {
if (groups.size() != 1) {
throw new UnsupportedVersionException(
"Version " + version + " of OffsetFetchResponse only supports one group."
);
}

OffsetFetchResponseGroup group = groups.get(0);
data.setErrorCode(group.errorCode());
error = Errors.forCode(group.errorCode());

group.topics().forEach(topic -> {
OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name());
data.topics().add(newTopic);

topic.partitions().forEach(partition -> {
OffsetFetchResponsePartition newPartition;

if (version < 2 && group.errorCode() != Errors.NONE.code()) {
// Versions prior to version 2 do not support a top level error. Therefore,
// we put it at the partition level.
newPartition = new OffsetFetchResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(group.errorCode())
.setCommittedOffset(INVALID_OFFSET)
.setMetadata(NO_METADATA)
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a bit confused on the else statement. we can hit this either when version >= 2 or group error == NONE.

based on the comment above, it seems that for version >= 2 we don't have to put error at the partition level. also, do we need to add offset/metadata if there is an error for version >= 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still possible to have a partition level error with version >= 2 (e.g. UNSTABLE_OFFSET_COMMIT). To answer your second point, if there is an error, the offset/metadata should be correctly set at this stage so we can just copy whatever we have got here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. thanks for the clarification

newPartition = new OffsetFetchResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(partition.errorCode())
.setCommittedOffset(partition.committedOffset())
.setMetadata(partition.metadata())
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
}

newTopic.partitions().add(newPartition);
});
});
}
}

public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
super(ApiKeys.OFFSET_FETCH);
this.data = data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
]},
{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true,
"about": "The top-level error code, or 0 if there was no error." },
{"name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
{ "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
"about": "The responses per group id.", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package kafka.coordinator.group

import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier

import java.util
import java.util.concurrent.CompletableFuture
import scala.collection.immutable
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -234,4 +236,80 @@ class GroupCoordinatorAdapter(
}
CompletableFuture.completedFuture(results)
}

override def fetchAllOffsets(
context: RequestContext,
groupId: String,
requireStable: Boolean
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
handleFetchOffset(
groupId,
requireStable,
None
)
}

override def fetchOffsets(
context: RequestContext,
groupId: String,
topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
requireStable: Boolean
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
topics.forEach { topic =>
topic.partitionIndexes.forEach { partition =>
topicPartitions += new TopicPartition(topic.name, partition)
}
}

handleFetchOffset(
groupId,
requireStable,
Some(topicPartitions.toSeq)
)
}

private def handleFetchOffset(
groupId: String,
requireStable: Boolean,
partitions: Option[Seq[TopicPartition]]
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
val (error, results) = coordinator.handleFetchOffsets(
groupId,
requireStable,
partitions
)

val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
if (error != Errors.NONE) {
future.completeExceptionally(error.exception)
} else {
val topicsList = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
val topicsMap = new mutable.HashMap[String, OffsetFetchResponseData.OffsetFetchResponseTopics]()

results.forKeyValue { (tp, offset) =>
val topic = topicsMap.get(tp.topic) match {
case Some(topic) =>
topic

case None =>
val topicOffsets = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(tp.topic)
topicsMap += tp.topic -> topicOffsets
topicsList.add(topicOffsets)
topicOffsets
}

topic.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(tp.partition)
.setMetadata(offset.metadata)
.setCommittedOffset(offset.offset)
.setCommittedLeaderEpoch(offset.leaderEpoch.orElse(-1))
.setErrorCode(offset.error.code))
}

future.complete(topicsList)
}

future
}
}
Loading