Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
.setThrottleTimeMs(throttleTimeMs));
}

@Override
public OffsetCommitResponse getErrorResponse(Throwable e) {
return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
}

public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Possible error codes:
Expand Down Expand Up @@ -116,4 +118,82 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
public boolean shouldClientThrottle(short version) {
return version >= 4;
}

public static class Builder {
OffsetCommitResponseData data = new OffsetCommitResponseData();
HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();

private OffsetCommitResponseTopic getOrCreateTopic(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getOrAddTopic makes more sense to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could work as well but I personally prefer getOrCreate in this case. getOrCreate is used pretty extensively in the code base as well.

String topicName
) {
OffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
topic = new OffsetCommitResponseTopic().setName(topicName);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
return topic;
}

public Builder addPartition(
String topicName,
int partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);

topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));

return this;
}

public <P> Builder addPartitions(
String topicName,
List<P> partitions,
Function<P, Integer> partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);

partitions.forEach(partition -> {
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
.setErrorCode(error.code()));
});

return this;
}

public Builder merge(
OffsetCommitResponseData newData
) {
if (data.topics().isEmpty()) {
// If the current data is empty, we can discard it and use the new data.
data = newData;
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new topic data.
data.topics().add(newTopic);
byTopicName.put(newTopic.name(), newTopic);
} else {
// Otherwise, we add the partitions to the existing one. Note we
// expect non-overlapping partitions here as we don't verify
// if the partition is already in the list before adding it.
existingTopic.partitions().addAll(newTopic.partitions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: from the code it seems that existingTopic can only include partitions that failed in some way. we are assuming that there will be no overlap between existing partitions and newTopic partitions. should we add a check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I thought about adding a check but it is costly because the only way to check is to iterate over the existing partitions to check if the new one is there. Given that we know that partitions are not supposed to be duplicated by the user of this class, I thought that it was not necessary. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to keep as is, but maybe make a comment that we assume there are no overlapping partitions?

As a side note, If there was overlap, we would just have two of the same partition in the response right? One with the error and one without?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. @jolshan that seems to be the case, though since a failed partition is added first the non-error state may overwrite the error when the consumer parses the response.

also a +1 on leaving a small note that the code assumes no overlap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment.

}
});
}

return this;
}

public OffsetCommitResponse build() {
return new OffsetCommitResponse(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public KafkaApis build() {
metadataSupport,
replicaManager,
groupCoordinator,
new GroupCoordinatorAdapter(groupCoordinator),
new GroupCoordinatorAdapter(groupCoordinator, time),
txnCoordinator,
autoTopicCreationManager,
brokerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
*/
package kafka.coordinator.group

import kafka.common.OffsetAndMetadata
import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
import org.apache.kafka.common.utils.{BufferSupplier, Time}

import java.util
import java.util.Optional
import java.util.concurrent.CompletableFuture
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._
Expand All @@ -34,7 +37,8 @@ import scala.jdk.CollectionConverters._
* that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
*/
class GroupCoordinatorAdapter(
val coordinator: GroupCoordinator
private val coordinator: GroupCoordinator,
private val time: Time
) extends org.apache.kafka.coordinator.group.GroupCoordinator {

override def joinGroup(
Expand Down Expand Up @@ -312,4 +316,79 @@ class GroupCoordinatorAdapter(

future
}

override def commitOffsets(
context: RequestContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i noticed this isn't used here as well as for the other coordinator APIs (other than joinGroup). what's the reason for having this parameter? are we expecting to use this in the new group coordinator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have put it anywhere for consistency. We may need it for other methods in the new group coordinator.

request: OffsetCommitRequestData,
bufferSupplier: BufferSupplier
): CompletableFuture[OffsetCommitResponseData] = {
val currentTimeMs = time.milliseconds
val future = new CompletableFuture[OffsetCommitResponseData]()

def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
val response = new OffsetCommitResponseData()
val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()

commitStatus.forKeyValue { (tp, error) =>
val topic = byTopics.get(tp.topic) match {
case Some(existingTopic) =>
existingTopic
case None =>
val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
byTopics += tp.topic -> newTopic
response.topics.add(newTopic)
newTopic
}

topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code))
}

future.complete(response)
}

// "default" expiration timestamp is defined as now + retention. The retention may be overridden
// in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit
// commit timestamp is provided (v1 only), the expiration timestamp is computed based on that.
val expireTimeMs = request.retentionTimeMs match {
case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
}

val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
request.topics.forEach { topic =>
topic.partitions.forEach { partition =>
val tp = new TopicPartition(topic.name, partition.partitionIndex)
partitions += tp -> new OffsetAndMetadata(
offset = partition.committedOffset,
leaderEpoch = partition.committedLeaderEpoch match {
case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
},
metadata = partition.committedMetadata match {
case null => OffsetAndMetadata.NoMetadata
case metadata => metadata
},
commitTimestamp = partition.commitTimestamp match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
case customTimestamp => customTimestamp
},
Comment on lines +373 to +376
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wasn't able to find where we validate the commit timestamp. how do we handle timestamps that are less than -1? i am also curious about retention time ms.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that they are not validated anywhere. We basically store whatever we get. As a result, if the provided retention or the commit timestamp are negative, the offset will be expired immediately. This is inline with the behavior prior to this patch. We could improve it (if we want) separately.

expireTimestamp = expireTimeMs
)
}
}

coordinator.handleCommitOffsets(
request.groupId,
request.memberId,
Option(request.groupInstanceId),
request.generationId,
partitions.toMap,
callback,
RequestLocal(bufferSupplier)
)

future
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ class BrokerServer(
metadataSupport = raftSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,
Expand Down
Loading