From 065df08a187ddce4924d990ae857539c25a0783f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 21 Nov 2022 11:01:22 +0100 Subject: [PATCH 1/9] KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface --- .../common/requests/OffsetCommitRequest.java | 5 + .../server/builders/KafkaApisBuilder.java | 2 +- .../group/GroupCoordinatorAdapter.scala | 91 +++++- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 271 +++++++++++------- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../group/GroupCoordinatorAdapterTest.scala | 92 +++++- .../unit/kafka/server/KafkaApisTest.scala | 233 +++++++++++++++ .../coordinator/group/GroupCoordinator.java | 18 ++ 9 files changed, 597 insertions(+), 119 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 9869da5d254b9..8fc884e661430 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -118,6 +118,11 @@ public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setThrottleTimeMs(throttleTimeMs)); } + @Override + public OffsetCommitResponse getErrorResponse(Throwable e) { + return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e); + } + public static OffsetCommitRequest parse(ByteBuffer buffer, short version) { return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version); } diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 95c18a6f3e3f4..18cd42c77cba1 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public KafkaApis build() { metadataSupport, replicaManager, groupCoordinator, - new GroupCoordinatorAdapter(groupCoordinator), + new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator, autoTopicCreationManager, brokerId, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 4e96e4373a73a..156820e14fbbc 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -16,15 +16,20 @@ */ package kafka.coordinator.group +import kafka.common.OffsetAndMetadata import kafka.server.RequestLocal import kafka.utils.Implicits.MapExtensionMethods -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData} -import org.apache.kafka.common.requests.RequestContext -import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext} +import org.apache.kafka.common.utils.{BufferSupplier, Time} import java.util +import java.util.Optional import java.util.concurrent.CompletableFuture -import scala.collection.immutable +import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ /** @@ -32,7 +37,8 @@ import scala.jdk.CollectionConverters._ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface. */ class GroupCoordinatorAdapter( - val coordinator: GroupCoordinator + private val coordinator: GroupCoordinator, + private val time: Time ) extends org.apache.kafka.coordinator.group.GroupCoordinator { override def joinGroup( @@ -234,4 +240,79 @@ class GroupCoordinatorAdapter( } CompletableFuture.completedFuture(results) } + + override def commitOffsets( + context: RequestContext, + request: OffsetCommitRequestData, + bufferSupplier: BufferSupplier + ): CompletableFuture[OffsetCommitResponseData] = { + val future = new CompletableFuture[OffsetCommitResponseData]() + + def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { + val response = new OffsetCommitResponseData() + val byTopics = new util.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + commitStatus.forKeyValue { (tp, error) => + var topic = byTopics.get(tp.topic) + if (topic == null) { + topic = new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName(tp.topic) + byTopics.put(tp.topic, topic) + response.topics.add(topic) + } + topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) + } + + // "default" expiration timestamp is now + retention (and retention may be overridden if v2) + // expire timestamp is computed differently for v1 and v2. + // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. + // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that + // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. + // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect + val currentTimestamp = time.milliseconds + val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + + request.topics.forEach { topic => + topic.partitions.forEach { partition => + val tp = new TopicPartition(topic.name, partition.partitionIndex) + partitions += tp -> new OffsetAndMetadata( + offset = partition.committedOffset, + leaderEpoch = partition.committedLeaderEpoch match { + case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] + case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) + }, + metadata = partition.committedMetadata match { + case null => OffsetAndMetadata.NoMetadata + case metadata => metadata + }, + commitTimestamp = partition.commitTimestamp match { + case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp + case customTimestamp => customTimestamp + }, + expireTimestamp = request.retentionTimeMs match { + case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None + case retentionTime => Some(currentTimestamp + retentionTime) + } + ) + } + } + + // call coordinator to handle commit offset + coordinator.handleCommitOffsets( + request.groupId, + request.memberId, + Option(request.groupInstanceId), + request.generationId, + partitions.toMap, + callback, + RequestLocal(bufferSupplier) + ) + + future + } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2b7561c31f3c7..9ad0d0c60e471 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -406,7 +406,7 @@ class BrokerServer( metadataSupport = raftSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, - newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator), + newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.nodeId, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 776196455ad09..09320420af971 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -412,136 +412,203 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset commit request */ def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { - val header = request.header val offsetCommitRequest = request.body[OffsetCommitRequest] - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - // the callback for sending an offset commit response - def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) - combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") + def sendResponse(response: OffsetCommitResponse): Unit = { + trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " + + s"client ${request.header.clientId}.") + + if (isDebugEnabled) { + response.data.topics.forEach { topic => + topic.partitions.forEach { partition => + if (partition.errorCode != Errors.NONE.code) { + debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + + s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") + } } } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) + } + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + response.maybeSetThrottleTimeMs(requestThrottleMs) + response + }) } - // reject the request if not authorized to the group + // Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( - offsetCommitRequest.data.topics, - error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( - new OffsetCommitResponseData() - .setTopics(responseTopicList) - .setThrottleTimeMs(requestThrottleMs) - )) + sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { - for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION + sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + } else { + val offsetCommitResponseData = new OffsetCommitResponseData() + val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + offsetCommitRequest.data.topics.asScala + )(_.name) + + def makePartitionResponse( + partitionIndex: Int, + error: Errors + ): OffsetCommitResponseData.OffsetCommitResponsePartition = { + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code) + } + + def addTopicToResponse( + topic: OffsetCommitRequestData.OffsetCommitRequestTopic, + error: Errors + ): Unit = { + val topicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name) + offsetCommitResponseData.topics.add(topicResponse) + topic.partitions.forEach { partition => + topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error)) } } - sendResponseCallback(errorMap.toMap) - } else { - val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] - val topics = offsetCommitRequest.data.topics.asScala - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) - for (topicData <- topics) { - for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - if (!authorizedTopics.contains(topicData.name)) - unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION) - else - authorizedTopicRequestInfoBldr += (topicPartition -> partitionData) + val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => + if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) + } else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } else { + // Otherwise, we check all partitions to ensure that they all exist. + var topicRequestWithValidPartitions: OffsetCommitRequestData.OffsetCommitRequestTopic = null + var topicResponseWithInvalidPartitions: OffsetCommitResponseData.OffsetCommitResponseTopic = null + + topic.partitions.forEach { partition => + if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { + // If the partition exists, we copy it into the valid set. + if (topicRequestWithValidPartitions == null) { + topicRequestWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) + } + topicRequestWithValidPartitions.partitions.add(partition) + } else { + // Otherwise, we add it to the response with UNKNOWN_TOPIC_OR_PARTITION. + if (topicResponseWithInvalidPartitions == null) { + topicResponseWithInvalidPartitions = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name) + } + topicResponseWithInvalidPartitions.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } + } + + // Known partitions are added to the authorized set. + if (topicRequestWithValidPartitions != null) { + authorizedTopicsRequest.add(topicRequestWithValidPartitions) + } + + // Unknown partitions are added to the response. + if (topicResponseWithInvalidPartitions != null) { + offsetCommitResponseData.topics.add(topicResponseWithInvalidPartitions) + // We keep track of topics with both known and unknown partitions such + // that we can merge the response from the coordinator into it later on. + if (topicRequestWithValidPartitions != null) { + topicsPendingPartitions += topic.name -> topicResponseWithInvalidPartitions + } + } } } - val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result() + if (authorizedTopicsRequest.isEmpty) { + sendResponse(new OffsetCommitResponse(offsetCommitResponseData)) + } else if (request.header.apiVersion == 0) { + // For version 0, always store offsets to ZK. + val zkSupport = metadataSupport.requireZkOrThrow( + KafkaApis.unsupported("Version 0 offset commit requests")) + + authorizedTopicsRequest.forEach { topic => + val topicResponse = topicsPendingPartitions.get(topic.name) match { + case None => + // If there is no pending topic response, we create + // a new one and add it to the response. + val newTopicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name) + offsetCommitResponseData.topics.add(newTopicResponse) + newTopicResponse + + case Some(existingTopicResponse) => + // Otherwise, we use the existing one. + existingTopicResponse + } - if (authorizedTopicRequestInfo.isEmpty) - sendResponseCallback(Map.empty) - else if (header.apiVersion == 0) { - // for version 0 always store offsets to ZK - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests")) - val responseInfo = authorizedTopicRequestInfo.map { - case (topicPartition, partitionData) => + topic.partitions.forEach { partition => try { - if (partitionData.committedMetadata() != null - && partitionData.committedMetadata().length > config.offsetMetadataMaxSize) - (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE) - else { + if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) { + topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.OFFSET_METADATA_TOO_LARGE)) + } else { zkSupport.zkClient.setOrCreateConsumerOffset( offsetCommitRequest.data.groupId, - topicPartition, - partitionData.committedOffset) - (topicPartition, Errors.NONE) + new TopicPartition(topic.name, partition.partitionIndex), + partition.committedOffset + ) + topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.NONE)) } } catch { - case e: Throwable => (topicPartition, Errors.forException(e)) + case e: Throwable => + topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.forException(e))) } + } } - sendResponseCallback(responseInfo) - } else { - // for version 1 and beyond store offsets in offset manager - - // "default" expiration timestamp is now + retention (and retention may be overridden if v2) - // expire timestamp is computed differently for v1 and v2. - // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. - // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that - // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. - // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect - val currentTimestamp = time.milliseconds - val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) => - val metadata = if (partitionData.committedMetadata == null) - OffsetAndMetadata.NoMetadata - else - partitionData.committedMetadata - val leaderEpochOpt = if (partitionData.committedLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) - Optional.empty[Integer] - else - Optional.of[Integer](partitionData.committedLeaderEpoch) - - k -> new OffsetAndMetadata( - offset = partitionData.committedOffset, - leaderEpoch = leaderEpochOpt, - metadata = metadata, - commitTimestamp = partitionData.commitTimestamp match { - case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp - case customTimestamp => customTimestamp - }, - expireTimestamp = offsetCommitRequest.data.retentionTimeMs match { - case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None - case retentionTime => Some(currentTimestamp + retentionTime) + sendResponse(new OffsetCommitResponse(offsetCommitResponseData)) + } else { + // Create a new request data with the authorized topic-partitions. + val offsetCommitRequestData = new OffsetCommitRequestData() + .setGroupId(offsetCommitRequest.data.groupId) + .setMemberId(offsetCommitRequest.data.memberId) + .setGenerationId(offsetCommitRequest.data.generationId) + .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) + .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) + .setTopics(authorizedTopicsRequest) + + newGroupCoordinator.commitOffsets( + request.context, + offsetCommitRequestData, + requestLocal.bufferSupplier + ).handle[Unit] { (results, exception) => + if (exception != null) { + sendResponse(offsetCommitRequest.getErrorResponse(exception)) + } else { + if (offsetCommitResponseData.topics.isEmpty) { + // If the pre-processed response is empty, we can directly + // send back the new one. + sendResponse(new OffsetCommitResponse(results)) + } else { + // Otherwise, we have to add the topic-partitions from new + // one into the pre-processed one. + results.topics.forEach { topic => + topicsPendingPartitions.get(topic.name) match { + case None => + // If there is no pending topic response, we just + // add the one we've got to the response. + offsetCommitResponseData.topics.add(topic) + + case Some(existingTopicResponse) => + // Otherwise, we copy partitions to the pending + // topic response. + existingTopicResponse.partitions.addAll(topic.partitions) + } + } + sendResponse(new OffsetCommitResponse(offsetCommitResponseData)) } - ) + } + }.exceptionally { exception => + error(s"Unexpected error handling request ${request.requestDesc(true)} " + + s"with context ${request.context}", exception) + requestHelper.handleError(request, exception) } - - // call coordinator to handle commit offset - groupCoordinator.handleCommitOffsets( - offsetCommitRequest.data.groupId, - offsetCommitRequest.data.memberId, - Option(offsetCommitRequest.data.groupInstanceId), - offsetCommitRequest.data.generationId, - partitionData, - sendResponseCallback, - requestLocal) } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f8e449d8d1e00..dc5702d8497d5 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -494,7 +494,7 @@ class KafkaServer( metadataSupport = zkSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, - newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator), + newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.brokerId, diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 323547474c98d..232008a38ce66 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -16,16 +16,19 @@ */ package kafka.coordinator.group +import kafka.common.OffsetAndMetadata import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.server.RequestLocal -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData} +import kafka.utils.MockTime +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData, SyncGroupResponseData} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test @@ -34,6 +37,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} import java.net.InetAddress +import java.util.Optional import scala.jdk.CollectionConverters._ class GroupCoordinatorAdapterTest { @@ -58,7 +62,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) def testJoinGroup(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.JOIN_GROUP, version) val request = new JoinGroupRequestData() @@ -146,7 +150,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) def testSyncGroup(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.SYNC_GROUP, version) val data = new SyncGroupRequestData() @@ -213,7 +217,7 @@ class GroupCoordinatorAdapterTest { @Test def testHeartbeat(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion) val data = new HeartbeatRequestData() @@ -244,7 +248,7 @@ class GroupCoordinatorAdapterTest { def testLeaveGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion) val data = new LeaveGroupRequestData() @@ -313,7 +317,7 @@ class GroupCoordinatorAdapterTest { expectedStatesFilter: Set[String] ): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.LIST_GROUPS, ApiKeys.LIST_GROUPS.latestVersion) val data = new ListGroupsRequestData() @@ -348,7 +352,7 @@ class GroupCoordinatorAdapterTest { @Test def testDescribeGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val groupId1 = "group-1" val groupId2 = "group-2" @@ -405,7 +409,7 @@ class GroupCoordinatorAdapterTest { @Test def testDeleteGroups(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.DELETE_GROUPS, ApiKeys.DELETE_GROUPS.latestVersion) val groupIds = List("group-1", "group-2", "group-3") @@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest { assertEquals(expectedResults, future.get()) } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + def testCommitOffsets(version: Short): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val time = new MockTime() + val adapter = new GroupCoordinatorAdapter(groupCoordinator, time) + val now = time.milliseconds() + + val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version) + val data = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setRetentionTimeMs(1000) + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommitTimestamp(now) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + val bufferSupplier = BufferSupplier.create() + + val future = adapter.commitOffsets(ctx, data, bufferSupplier) + assertFalse(future.isDone) + + val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit]) + + verify(groupCoordinator).handleCommitOffsets( + ArgumentMatchers.eq(data.groupId), + ArgumentMatchers.eq(data.memberId), + ArgumentMatchers.eq(None), + ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(Map( + new TopicPartition("foo", 0) -> new OffsetAndMetadata( + offset = 100, + leaderEpoch = Optional.of[Integer](1), + metadata = "", + commitTimestamp = now, + expireTimestamp = Some(now + 1000L) + ) + )), + capturedCallback.capture(), + ArgumentMatchers.eq(RequestLocal(bufferSupplier)) + ) + + capturedCallback.getValue.apply(Map( + new TopicPartition("foo", 0) -> Errors.NONE + )) + + val expectedResponseData = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).asJava) + + assertTrue(future.isDone) + assertEquals(expectedResponseData, future.get()) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 848f35425495d..e8f1cfee43b2c 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1263,6 +1263,239 @@ class KafkaApisTest { ) } + @Test + def testHandleOffsetCommitRequest(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(newGroupCoordinator.commitOffsets( + requestChannelRequest.context, + offsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val offsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(offsetCommitResponse) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(offsetCommitResponse, response.data) + } + + @Test + def testHandleOffsetCommitRequestFutureFailed(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(newGroupCoordinator.commitOffsets( + requestChannelRequest.context, + offsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val expectedOffsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) + + future.completeExceptionally(Errors.NOT_COORDINATOR.exception) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(expectedOffsetCommitResponse, response.data) + } + + @Test + def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 2) + addTopicToMetadataCache("bar", numPartitions = 2) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(2) + .setCommittedOffset(30)).asJava), + // bar exists. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava), + // zar does not exist. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("zar") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(60), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(70)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + + // This is the request expected by the group coordinator. + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20)).asJava), + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava)).asJava) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(newGroupCoordinator.commitOffsets( + requestChannelRequest.context, + expectedOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val offsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + val expectedOffsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + // foo-2 is first because partitions failing the validation + // are put in the response first. + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(2) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + // zar is before bar because topics failing the validation are + // put in the response first. + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("zar") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(offsetCommitResponse) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(expectedOffsetCommitResponse, response.data) + } + @Test def testOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic" diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 1faa8a07fac9a..e6cfe81eb9e8f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; @@ -52,6 +54,7 @@ CompletableFuture joinGroup( ); /** +<<<<<<< HEAD * Sync a Generic Group. * * @param context The coordinator request context. @@ -132,5 +135,20 @@ CompletableFuture delet List groupIds, BufferSupplier bufferSupplier ); + + /** + * Commit offsets for a given Group. + * + * @param context The request context. + * @param request The OffsetCommitRequest data. + * @param bufferSupplier The buffer supplier tight to the request thread. + * + * @return A future yielding the response or an exception. + */ + CompletableFuture commitOffsets( + RequestContext context, + OffsetCommitRequestData request, + BufferSupplier bufferSupplier + ); } From dfe1561768e691f175b224d924d24f7a5e4ab129 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 21 Dec 2022 17:44:18 +0100 Subject: [PATCH 2/9] initial refactor --- .../group/GroupCoordinatorAdapter.scala | 9 ++-- .../main/scala/kafka/server/KafkaApis.scala | 50 ++++++------------- 2 files changed, 20 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 156820e14fbbc..5e3f5b34607c0 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -250,14 +250,13 @@ class GroupCoordinatorAdapter( def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { val response = new OffsetCommitResponseData() - val byTopics = new util.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() commitStatus.forKeyValue { (tp, error) => - var topic = byTopics.get(tp.topic) + var topic = byTopics(tp.topic) if (topic == null) { - topic = new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName(tp.topic) - byTopics.put(tp.topic, topic) + topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> topic response.topics.add(topic) } topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 09320420af971..1c2b7b12b269f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -188,7 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal) case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) - case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal) + case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError) @@ -411,38 +411,22 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset commit request */ - def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleOffsetCommitRequest( + request: RequestChannel.Request, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - def sendResponse(response: OffsetCommitResponse): Unit = { - trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " + - s"client ${request.header.clientId}.") - - if (isDebugEnabled) { - response.data.topics.forEach { topic => - topic.partitions.forEach { partition => - if (partition.errorCode != Errors.NONE.code) { - debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + - s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") - } - } - } - } - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - response.maybeSetThrottleTimeMs(requestThrottleMs) - response - }) - } - // Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) } else { val offsetCommitResponseData = new OffsetCommitResponseData() val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() @@ -523,7 +507,8 @@ class KafkaApis(val requestChannel: RequestChannel, } if (authorizedTopicsRequest.isEmpty) { - sendResponse(new OffsetCommitResponse(offsetCommitResponseData)) + requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(offsetCommitResponseData)) + CompletableFuture.completedFuture[Unit](()) } else if (request.header.apiVersion == 0) { // For version 0, always store offsets to ZK. val zkSupport = metadataSupport.requireZkOrThrow( @@ -562,7 +547,8 @@ class KafkaApis(val requestChannel: RequestChannel, } } - sendResponse(new OffsetCommitResponse(offsetCommitResponseData)) + requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(offsetCommitResponseData)) + CompletableFuture.completedFuture[Unit](()) } else { // Create a new request data with the authorized topic-partitions. val offsetCommitRequestData = new OffsetCommitRequestData() @@ -579,12 +565,12 @@ class KafkaApis(val requestChannel: RequestChannel, requestLocal.bufferSupplier ).handle[Unit] { (results, exception) => if (exception != null) { - sendResponse(offsetCommitRequest.getErrorResponse(exception)) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) } else { if (offsetCommitResponseData.topics.isEmpty) { // If the pre-processed response is empty, we can directly // send back the new one. - sendResponse(new OffsetCommitResponse(results)) + requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(results)) } else { // Otherwise, we have to add the topic-partitions from new // one into the pre-processed one. @@ -601,13 +587,9 @@ class KafkaApis(val requestChannel: RequestChannel, existingTopicResponse.partitions.addAll(topic.partitions) } } - sendResponse(new OffsetCommitResponse(offsetCommitResponseData)) + requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(offsetCommitResponseData)) } } - }.exceptionally { exception => - error(s"Unexpected error handling request ${request.requestDesc(true)} " + - s"with context ${request.context}", exception) - requestHelper.handleError(request, exception) } } } From 569e8afdc86e90debb7bfd5edf2ee339c6f72792 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 21 Dec 2022 20:05:19 +0100 Subject: [PATCH 3/9] refactor logic in kafka apis to be simpler --- .../common/requests/OffsetCommitResponse.java | 78 +++++++++++++ .../main/scala/kafka/server/KafkaApis.scala | 106 +++--------------- 2 files changed, 94 insertions(+), 90 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 713b68974a11d..86163922612c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -27,7 +27,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; /** * Possible error codes: @@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { public boolean shouldClientThrottle(short version) { return version >= 4; } + + public static class Builder { + OffsetCommitResponseData data = new OffsetCommitResponseData(); + HashMap byTopicName = new HashMap<>(); + + private OffsetCommitResponseTopic getOrCreateTopic( + String topicName + ) { + OffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new OffsetCommitResponseTopic().setName(topicName); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + public Builder addPartition( + String topicName, + int partitionIndex, + Errors error + ) { + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code())); + + return this; + } + + public

Builder addPartitions( + String topicName, + List

partitions, + Function partitionIndex, + Errors error + ) { + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + partitions.forEach(partition -> { + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex.apply(partition)) + .setErrorCode(error.code())); + }); + + return this; + } + + public Builder merge( + OffsetCommitResponseData newData + ) { + if (data.topics().isEmpty()) { + // If the current data is empty, we can discard it and use the new data. + data = newData; + } else { + // Otherwise, we have to merge them together. + newData.topics().forEach(newTopic -> { + OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + if (existingTopic == null) { + // If no topic exists, we can directly copy the new topic data. + data.topics().add(newTopic); + byTopicName.put(newTopic.name(), newTopic); + } else { + // Otherwise, we add the partitions to the existing one. + existingTopic.partitions().addAll(newTopic.partitions()); + } + }); + } + + return this; + } + + public OffsetCommitResponse build() { + return new OffsetCommitResponse(data); + } + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1c2b7b12b269f..65f334d8462ca 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -428,9 +428,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { - val offsetCommitResponseData = new OffsetCommitResponseData() - val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() - val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, @@ -438,76 +435,39 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.data.topics.asScala )(_.name) - def makePartitionResponse( - partitionIndex: Int, - error: Errors - ): OffsetCommitResponseData.OffsetCommitResponsePartition = { - new OffsetCommitResponseData.OffsetCommitResponsePartition() - .setPartitionIndex(partitionIndex) - .setErrorCode(error.code) - } - - def addTopicToResponse( - topic: OffsetCommitRequestData.OffsetCommitRequestTopic, - error: Errors - ): Unit = { - val topicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name) - offsetCommitResponseData.topics.add(topicResponse) - topic.partitions.forEach { partition => - topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error)) - } - } - + val responseBuilder = new OffsetCommitResponse.Builder() val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => if (!authorizedTopics.contains(topic.name)) { // If the topic is not authorized, we add the topic and all its partitions // to the response with TOPIC_AUTHORIZATION_FAILED. - addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) } else if (!metadataCache.contains(topic.name)) { // If the topic is unknown, we add the topic and all its partitions // to the response with UNKNOWN_TOPIC_OR_PARTITION. - addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION) + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } else { // Otherwise, we check all partitions to ensure that they all exist. - var topicRequestWithValidPartitions: OffsetCommitRequestData.OffsetCommitRequestTopic = null - var topicResponseWithInvalidPartitions: OffsetCommitResponseData.OffsetCommitResponseTopic = null + val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) topic.partitions.forEach { partition => if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { - // If the partition exists, we copy it into the valid set. - if (topicRequestWithValidPartitions == null) { - topicRequestWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) - } - topicRequestWithValidPartitions.partitions.add(partition) + topicWithValidPartitions.partitions.add(partition) } else { - // Otherwise, we add it to the response with UNKNOWN_TOPIC_OR_PARTITION. - if (topicResponseWithInvalidPartitions == null) { - topicResponseWithInvalidPartitions = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name) - } - topicResponseWithInvalidPartitions.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)) + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } - // Known partitions are added to the authorized set. - if (topicRequestWithValidPartitions != null) { - authorizedTopicsRequest.add(topicRequestWithValidPartitions) - } - - // Unknown partitions are added to the response. - if (topicResponseWithInvalidPartitions != null) { - offsetCommitResponseData.topics.add(topicResponseWithInvalidPartitions) - // We keep track of topics with both known and unknown partitions such - // that we can merge the response from the coordinator into it later on. - if (topicRequestWithValidPartitions != null) { - topicsPendingPartitions += topic.name -> topicResponseWithInvalidPartitions - } + if (!topicWithValidPartitions.partitions.isEmpty) { + authorizedTopicsRequest.add(topicWithValidPartitions) } } } if (authorizedTopicsRequest.isEmpty) { - requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(offsetCommitResponseData)) + requestHelper.sendMaybeThrottle(request, responseBuilder.build()) CompletableFuture.completedFuture[Unit](()) } else if (request.header.apiVersion == 0) { // For version 0, always store offsets to ZK. @@ -515,39 +475,26 @@ class KafkaApis(val requestChannel: RequestChannel, KafkaApis.unsupported("Version 0 offset commit requests")) authorizedTopicsRequest.forEach { topic => - val topicResponse = topicsPendingPartitions.get(topic.name) match { - case None => - // If there is no pending topic response, we create - // a new one and add it to the response. - val newTopicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name) - offsetCommitResponseData.topics.add(newTopicResponse) - newTopicResponse - - case Some(existingTopicResponse) => - // Otherwise, we use the existing one. - existingTopicResponse - } - topic.partitions.forEach { partition => try { if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) { - topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.OFFSET_METADATA_TOO_LARGE)) + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.OFFSET_METADATA_TOO_LARGE) } else { zkSupport.zkClient.setOrCreateConsumerOffset( offsetCommitRequest.data.groupId, new TopicPartition(topic.name, partition.partitionIndex), partition.committedOffset ) - topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.NONE)) + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.NONE) } } catch { case e: Throwable => - topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.forException(e))) + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.forException(e)) } } } - requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(offsetCommitResponseData)) + requestHelper.sendMaybeThrottle(request, responseBuilder.build()) CompletableFuture.completedFuture[Unit](()) } else { // Create a new request data with the authorized topic-partitions. @@ -567,28 +514,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) } else { - if (offsetCommitResponseData.topics.isEmpty) { - // If the pre-processed response is empty, we can directly - // send back the new one. - requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(results)) - } else { - // Otherwise, we have to add the topic-partitions from new - // one into the pre-processed one. - results.topics.forEach { topic => - topicsPendingPartitions.get(topic.name) match { - case None => - // If there is no pending topic response, we just - // add the one we've got to the response. - offsetCommitResponseData.topics.add(topic) - - case Some(existingTopicResponse) => - // Otherwise, we copy partitions to the pending - // topic response. - existingTopicResponse.partitions.addAll(topic.partitions) - } - } - requestHelper.sendMaybeThrottle(request, new OffsetCommitResponse(offsetCommitResponseData)) - } + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) } } } From e22ad75a6f9958882e773ef6d6cc010a32d2a3e4 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 21 Dec 2022 20:37:36 +0100 Subject: [PATCH 4/9] refactor to simplify the code --- .../main/scala/kafka/server/KafkaApis.scala | 120 +++++++++++------- 1 file changed, 76 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 65f334d8462ca..816d49c642b0e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -436,7 +436,7 @@ class KafkaApis(val requestChannel: RequestChannel, )(_.name) val responseBuilder = new OffsetCommitResponse.Builder() - val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]() + val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => if (!authorizedTopics.contains(topic.name)) { // If the topic is not authorized, we add the topic and all its partitions @@ -461,62 +461,94 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!topicWithValidPartitions.partitions.isEmpty) { - authorizedTopicsRequest.add(topicWithValidPartitions) + authorizedTopicsRequest += topicWithValidPartitions } } } if (authorizedTopicsRequest.isEmpty) { requestHelper.sendMaybeThrottle(request, responseBuilder.build()) - CompletableFuture.completedFuture[Unit](()) + CompletableFuture.completedFuture(()) } else if (request.header.apiVersion == 0) { // For version 0, always store offsets to ZK. - val zkSupport = metadataSupport.requireZkOrThrow( - KafkaApis.unsupported("Version 0 offset commit requests")) + commitOffsetsToZookeeper( + request, + offsetCommitRequest, + authorizedTopicsRequest, + responseBuilder + ) + } else { + // For version > 0, store offsets to Coordinator. + commitOffsetsToCoordinator( + request, + offsetCommitRequest, + authorizedTopicsRequest, + responseBuilder, + requestLocal + ) + } + } + } - authorizedTopicsRequest.forEach { topic => - topic.partitions.forEach { partition => - try { - if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) { - responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.OFFSET_METADATA_TOO_LARGE) - } else { - zkSupport.zkClient.setOrCreateConsumerOffset( - offsetCommitRequest.data.groupId, - new TopicPartition(topic.name, partition.partitionIndex), - partition.committedOffset - ) - responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.NONE) - } - } catch { - case e: Throwable => - responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.forException(e)) - } + private def commitOffsetsToZookeeper( + request: RequestChannel.Request, + offsetCommitRequest: OffsetCommitRequest, + authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], + responseBuilder: OffsetCommitResponse.Builder + ): CompletableFuture[Unit] = { + val zkSupport = metadataSupport.requireZkOrThrow( + KafkaApis.unsupported("Version 0 offset commit requests")) + + authorizedTopicsRequest.foreach { topic => + topic.partitions.forEach { partition => + val error = try { + if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) { + Errors.OFFSET_METADATA_TOO_LARGE + } else { + zkSupport.zkClient.setOrCreateConsumerOffset( + offsetCommitRequest.data.groupId, + new TopicPartition(topic.name, partition.partitionIndex), + partition.committedOffset + ) + Errors.NONE } + } catch { + case e: Throwable => + Errors.forException(e) } - requestHelper.sendMaybeThrottle(request, responseBuilder.build()) - CompletableFuture.completedFuture[Unit](()) + responseBuilder.addPartition(topic.name, partition.partitionIndex, error) + } + } + + requestHelper.sendMaybeThrottle(request, responseBuilder.build()) + CompletableFuture.completedFuture[Unit](()) + } + + private def commitOffsetsToCoordinator( + request: RequestChannel.Request, + offsetCommitRequest: OffsetCommitRequest, + authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], + responseBuilder: OffsetCommitResponse.Builder, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { + val offsetCommitRequestData = new OffsetCommitRequestData() + .setGroupId(offsetCommitRequest.data.groupId) + .setMemberId(offsetCommitRequest.data.memberId) + .setGenerationId(offsetCommitRequest.data.generationId) + .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) + .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) + .setTopics(authorizedTopicsRequest.asJava) + + newGroupCoordinator.commitOffsets( + request.context, + offsetCommitRequestData, + requestLocal.bufferSupplier + ).handle[Unit] { (results, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) } else { - // Create a new request data with the authorized topic-partitions. - val offsetCommitRequestData = new OffsetCommitRequestData() - .setGroupId(offsetCommitRequest.data.groupId) - .setMemberId(offsetCommitRequest.data.memberId) - .setGenerationId(offsetCommitRequest.data.generationId) - .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) - .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) - .setTopics(authorizedTopicsRequest) - - newGroupCoordinator.commitOffsets( - request.context, - offsetCommitRequestData, - requestLocal.bufferSupplier - ).handle[Unit] { (results, exception) => - if (exception != null) { - requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) - } else { - requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) - } - } + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) } } } From fbcd8b100d12d22252ead5f50b3b8d46b1bb5a83 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 26 Dec 2022 10:01:33 +0100 Subject: [PATCH 5/9] cleanup --- .../org/apache/kafka/coordinator/group/GroupCoordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index e6cfe81eb9e8f..8e7ef38c55793 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -54,7 +54,6 @@ CompletableFuture joinGroup( ); /** -<<<<<<< HEAD * Sync a Generic Group. * * @param context The coordinator request context. From fd44988a1c8b662baa8cea77216c578dac63f61d Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 26 Dec 2022 10:05:34 +0100 Subject: [PATCH 6/9] cleanup --- .../kafka/coordinator/group/GroupCoordinatorAdapter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 5e3f5b34607c0..fa6bb78802c98 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -270,9 +270,9 @@ class GroupCoordinatorAdapter( // "default" expiration timestamp is now + retention (and retention may be overridden if v2) // expire timestamp is computed differently for v1 and v2. // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. - // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that + // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that. // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. - // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect + // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect. val currentTimestamp = time.milliseconds val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() From b7ad4379bd375d7e0dfcf4facb6388733bcf9a71 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 10 Jan 2023 11:32:28 +0100 Subject: [PATCH 7/9] fix bug and improve comment --- .../group/GroupCoordinatorAdapter.scala | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index fa6bb78802c98..94a715c36008a 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -246,6 +246,7 @@ class GroupCoordinatorAdapter( request: OffsetCommitRequestData, bufferSupplier: BufferSupplier ): CompletableFuture[OffsetCommitResponseData] = { + val currentTimeMs = time.milliseconds val future = new CompletableFuture[OffsetCommitResponseData]() def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { @@ -253,12 +254,16 @@ class GroupCoordinatorAdapter( val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() commitStatus.forKeyValue { (tp, error) => - var topic = byTopics(tp.topic) - if (topic == null) { - topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) - byTopics += tp.topic -> topic - response.topics.add(topic) + val topic = byTopics.get(tp.topic) match { + case Some(existingTopic) => + existingTopic + case None => + val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> newTopic + response.topics.add(newTopic) + newTopic } + topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(tp.partition) .setErrorCode(error.code)) @@ -267,15 +272,15 @@ class GroupCoordinatorAdapter( future.complete(response) } - // "default" expiration timestamp is now + retention (and retention may be overridden if v2) - // expire timestamp is computed differently for v1 and v2. - // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. - // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that. - // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. - // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect. - val currentTimestamp = time.milliseconds - val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + val expireTimeMs = request.retentionTimeMs match { + case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None + case retentionTimeMs => Some(currentTimeMs + retentionTimeMs) + } + val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() request.topics.forEach { topic => topic.partitions.forEach { partition => val tp = new TopicPartition(topic.name, partition.partitionIndex) @@ -290,18 +295,14 @@ class GroupCoordinatorAdapter( case metadata => metadata }, commitTimestamp = partition.commitTimestamp match { - case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp + case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs case customTimestamp => customTimestamp }, - expireTimestamp = request.retentionTimeMs match { - case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None - case retentionTime => Some(currentTimestamp + retentionTime) - } + expireTimestamp = expireTimeMs ) } } - // call coordinator to handle commit offset coordinator.handleCommitOffsets( request.groupId, request.memberId, From 9d147e6c1fcfb8e16eb19b0d6a430939343630e5 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 11 Jan 2023 14:37:12 +0100 Subject: [PATCH 8/9] address minor comments --- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4173320ed5e87..23b3b286234e1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -470,7 +470,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, responseBuilder.build()) CompletableFuture.completedFuture(()) } else if (request.header.apiVersion == 0) { - // For version 0, always store offsets to ZK. + // For version 0, always store offsets in ZK. commitOffsetsToZookeeper( request, offsetCommitRequest, @@ -478,7 +478,7 @@ class KafkaApis(val requestChannel: RequestChannel, responseBuilder ) } else { - // For version > 0, store offsets to Coordinator. + // For version > 0, store offsets in Coordinator. commitOffsetsToCoordinator( request, offsetCommitRequest, From b51ea5ea73756f28f0604af3529da261177adfb0 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 12 Jan 2023 15:11:57 +0100 Subject: [PATCH 9/9] address minor comment --- .../apache/kafka/common/requests/OffsetCommitResponse.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 86163922612c6..8c0bb9c182d42 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -181,7 +181,9 @@ public Builder merge( data.topics().add(newTopic); byTopicName.put(newTopic.name(), newTopic); } else { - // Otherwise, we add the partitions to the existing one. + // Otherwise, we add the partitions to the existing one. Note we + // expect non-overlapping partitions here as we don't verify + // if the partition is already in the list before adding it. existingTopic.partitions().addAll(newTopic.partitions()); } });