diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 9869da5d254b9..8fc884e661430 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -118,6 +118,11 @@ public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setThrottleTimeMs(throttleTimeMs)); } + @Override + public OffsetCommitResponse getErrorResponse(Throwable e) { + return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e); + } + public static OffsetCommitRequest parse(ByteBuffer buffer, short version) { return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 713b68974a11d..8c0bb9c182d42 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -27,7 +27,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; /** * Possible error codes: @@ -116,4 +118,82 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { public boolean shouldClientThrottle(short version) { return version >= 4; } + + public static class Builder { + OffsetCommitResponseData data = new OffsetCommitResponseData(); + HashMap byTopicName = new HashMap<>(); + + private OffsetCommitResponseTopic getOrCreateTopic( + String topicName + ) { + OffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new OffsetCommitResponseTopic().setName(topicName); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + public Builder addPartition( + String topicName, + int partitionIndex, + Errors error + ) { + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code())); + + return this; + } + + public

Builder addPartitions( + String topicName, + List

partitions, + Function partitionIndex, + Errors error + ) { + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + partitions.forEach(partition -> { + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex.apply(partition)) + .setErrorCode(error.code())); + }); + + return this; + } + + public Builder merge( + OffsetCommitResponseData newData + ) { + if (data.topics().isEmpty()) { + // If the current data is empty, we can discard it and use the new data. + data = newData; + } else { + // Otherwise, we have to merge them together. + newData.topics().forEach(newTopic -> { + OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + if (existingTopic == null) { + // If no topic exists, we can directly copy the new topic data. + data.topics().add(newTopic); + byTopicName.put(newTopic.name(), newTopic); + } else { + // Otherwise, we add the partitions to the existing one. Note we + // expect non-overlapping partitions here as we don't verify + // if the partition is already in the list before adding it. + existingTopic.partitions().addAll(newTopic.partitions()); + } + }); + } + + return this; + } + + public OffsetCommitResponse build() { + return new OffsetCommitResponse(data); + } + } } diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 95c18a6f3e3f4..18cd42c77cba1 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public KafkaApis build() { metadataSupport, replicaManager, groupCoordinator, - new GroupCoordinatorAdapter(groupCoordinator), + new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator, autoTopicCreationManager, brokerId, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 29242a8774a21..1a97079ce3923 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -16,15 +16,18 @@ */ package kafka.coordinator.group +import kafka.common.OffsetAndMetadata import kafka.server.RequestLocal import kafka.utils.Implicits.MapExtensionMethods import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData} import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.RequestContext -import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext} +import org.apache.kafka.common.utils.{BufferSupplier, Time} import java.util +import java.util.Optional import java.util.concurrent.CompletableFuture import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ @@ -34,7 +37,8 @@ import scala.jdk.CollectionConverters._ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface. */ class GroupCoordinatorAdapter( - val coordinator: GroupCoordinator + private val coordinator: GroupCoordinator, + private val time: Time ) extends org.apache.kafka.coordinator.group.GroupCoordinator { override def joinGroup( @@ -312,4 +316,79 @@ class GroupCoordinatorAdapter( future } + + override def commitOffsets( + context: RequestContext, + request: OffsetCommitRequestData, + bufferSupplier: BufferSupplier + ): CompletableFuture[OffsetCommitResponseData] = { + val currentTimeMs = time.milliseconds + val future = new CompletableFuture[OffsetCommitResponseData]() + + def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { + val response = new OffsetCommitResponseData() + val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + commitStatus.forKeyValue { (tp, error) => + val topic = byTopics.get(tp.topic) match { + case Some(existingTopic) => + existingTopic + case None => + val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> newTopic + response.topics.add(newTopic) + newTopic + } + + topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) + } + + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + val expireTimeMs = request.retentionTimeMs match { + case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None + case retentionTimeMs => Some(currentTimeMs + retentionTimeMs) + } + + val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + request.topics.forEach { topic => + topic.partitions.forEach { partition => + val tp = new TopicPartition(topic.name, partition.partitionIndex) + partitions += tp -> new OffsetAndMetadata( + offset = partition.committedOffset, + leaderEpoch = partition.committedLeaderEpoch match { + case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] + case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) + }, + metadata = partition.committedMetadata match { + case null => OffsetAndMetadata.NoMetadata + case metadata => metadata + }, + commitTimestamp = partition.commitTimestamp match { + case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs + case customTimestamp => customTimestamp + }, + expireTimestamp = expireTimeMs + ) + } + } + + coordinator.handleCommitOffsets( + request.groupId, + request.memberId, + Option(request.groupInstanceId), + request.generationId, + partitions.toMap, + callback, + RequestLocal(bufferSupplier) + ) + + future + } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 3235eecea41ce..4ebbba11786b0 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -410,7 +410,7 @@ class BrokerServer( metadataSupport = raftSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, - newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator), + newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.nodeId, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 20cfe766eb2ce..42768fa98f9fb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -187,7 +187,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal) case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) - case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal) + case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request).exceptionally(handleError) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError) @@ -410,137 +410,144 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset commit request */ - def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { - val header = request.header + def handleOffsetCommitRequest( + request: RequestChannel.Request, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - // the callback for sending an offset commit response - def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) - combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") - } - } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) - } - - // reject the request if not authorized to the group + // Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( - offsetCommitRequest.data.topics, - error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( - new OffsetCommitResponseData() - .setTopics(responseTopicList) - .setThrottleTimeMs(requestThrottleMs) - )) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { - for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION - } - } - sendResponseCallback(errorMap.toMap) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) } else { - val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + offsetCommitRequest.data.topics.asScala + )(_.name) + + val responseBuilder = new OffsetCommitResponse.Builder() + val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => + if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + } else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } else { + // Otherwise, we check all partitions to ensure that they all exist. + val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) - val topics = offsetCommitRequest.data.topics.asScala - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) - for (topicData <- topics) { - for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - if (!authorizedTopics.contains(topicData.name)) - unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION) - else - authorizedTopicRequestInfoBldr += (topicPartition -> partitionData) + topic.partitions.forEach { partition => + if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { + topicWithValidPartitions.partitions.add(partition) + } else { + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } + } + + if (!topicWithValidPartitions.partitions.isEmpty) { + authorizedTopicsRequest += topicWithValidPartitions + } } } - val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result() - - if (authorizedTopicRequestInfo.isEmpty) - sendResponseCallback(Map.empty) - else if (header.apiVersion == 0) { - // for version 0 always store offsets to ZK - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests")) - val responseInfo = authorizedTopicRequestInfo.map { - case (topicPartition, partitionData) => - try { - if (partitionData.committedMetadata() != null - && partitionData.committedMetadata().length > config.offsetMetadataMaxSize) - (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE) - else { - zkSupport.zkClient.setOrCreateConsumerOffset( - offsetCommitRequest.data.groupId, - topicPartition, - partitionData.committedOffset) - (topicPartition, Errors.NONE) - } - } catch { - case e: Throwable => (topicPartition, Errors.forException(e)) - } - } - sendResponseCallback(responseInfo) + if (authorizedTopicsRequest.isEmpty) { + requestHelper.sendMaybeThrottle(request, responseBuilder.build()) + CompletableFuture.completedFuture(()) + } else if (request.header.apiVersion == 0) { + // For version 0, always store offsets in ZK. + commitOffsetsToZookeeper( + request, + offsetCommitRequest, + authorizedTopicsRequest, + responseBuilder + ) } else { - // for version 1 and beyond store offsets in offset manager - - // "default" expiration timestamp is now + retention (and retention may be overridden if v2) - // expire timestamp is computed differently for v1 and v2. - // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. - // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that - // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. - // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect - val currentTimestamp = time.milliseconds - val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) => - val metadata = if (partitionData.committedMetadata == null) - OffsetAndMetadata.NoMetadata - else - partitionData.committedMetadata + // For version > 0, store offsets in Coordinator. + commitOffsetsToCoordinator( + request, + offsetCommitRequest, + authorizedTopicsRequest, + responseBuilder, + requestLocal + ) + } + } + } - val leaderEpochOpt = if (partitionData.committedLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) - Optional.empty[Integer] - else - Optional.of[Integer](partitionData.committedLeaderEpoch) - - k -> new OffsetAndMetadata( - offset = partitionData.committedOffset, - leaderEpoch = leaderEpochOpt, - metadata = metadata, - commitTimestamp = partitionData.commitTimestamp match { - case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp - case customTimestamp => customTimestamp - }, - expireTimestamp = offsetCommitRequest.data.retentionTimeMs match { - case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None - case retentionTime => Some(currentTimestamp + retentionTime) - } - ) + private def commitOffsetsToZookeeper( + request: RequestChannel.Request, + offsetCommitRequest: OffsetCommitRequest, + authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], + responseBuilder: OffsetCommitResponse.Builder + ): CompletableFuture[Unit] = { + val zkSupport = metadataSupport.requireZkOrThrow( + KafkaApis.unsupported("Version 0 offset commit requests")) + + authorizedTopicsRequest.foreach { topic => + topic.partitions.forEach { partition => + val error = try { + if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) { + Errors.OFFSET_METADATA_TOO_LARGE + } else { + zkSupport.zkClient.setOrCreateConsumerOffset( + offsetCommitRequest.data.groupId, + new TopicPartition(topic.name, partition.partitionIndex), + partition.committedOffset + ) + Errors.NONE + } + } catch { + case e: Throwable => + Errors.forException(e) } - // call coordinator to handle commit offset - groupCoordinator.handleCommitOffsets( - offsetCommitRequest.data.groupId, - offsetCommitRequest.data.memberId, - Option(offsetCommitRequest.data.groupInstanceId), - offsetCommitRequest.data.generationId, - partitionData, - sendResponseCallback, - requestLocal) + responseBuilder.addPartition(topic.name, partition.partitionIndex, error) + } + } + + requestHelper.sendMaybeThrottle(request, responseBuilder.build()) + CompletableFuture.completedFuture[Unit](()) + } + + private def commitOffsetsToCoordinator( + request: RequestChannel.Request, + offsetCommitRequest: OffsetCommitRequest, + authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], + responseBuilder: OffsetCommitResponse.Builder, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { + val offsetCommitRequestData = new OffsetCommitRequestData() + .setGroupId(offsetCommitRequest.data.groupId) + .setMemberId(offsetCommitRequest.data.memberId) + .setGenerationId(offsetCommitRequest.data.generationId) + .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) + .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) + .setTopics(authorizedTopicsRequest.asJava) + + newGroupCoordinator.commitOffsets( + request.context, + offsetCommitRequestData, + requestLocal.bufferSupplier + ).handle[Unit] { (results, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f734c730d44f5..37f6fea912321 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -507,7 +507,7 @@ class KafkaServer( metadataSupport = zkSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, - newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator), + newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.brokerId, diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 1fbdf333a9659..9ecbdd54e064c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -16,17 +16,19 @@ */ package kafka.coordinator.group +import kafka.common.OffsetAndMetadata import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.server.RequestLocal +import kafka.utils.MockTime import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test @@ -60,7 +62,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) def testJoinGroup(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.JOIN_GROUP, version) val request = new JoinGroupRequestData() @@ -148,7 +150,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) def testSyncGroup(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.SYNC_GROUP, version) val data = new SyncGroupRequestData() @@ -215,7 +217,7 @@ class GroupCoordinatorAdapterTest { @Test def testHeartbeat(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion) val data = new HeartbeatRequestData() @@ -246,7 +248,7 @@ class GroupCoordinatorAdapterTest { def testLeaveGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion) val data = new LeaveGroupRequestData() @@ -315,7 +317,7 @@ class GroupCoordinatorAdapterTest { expectedStatesFilter: Set[String] ): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.LIST_GROUPS, ApiKeys.LIST_GROUPS.latestVersion) val data = new ListGroupsRequestData() @@ -350,7 +352,7 @@ class GroupCoordinatorAdapterTest { @Test def testDescribeGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val groupId1 = "group-1" val groupId2 = "group-2" @@ -407,7 +409,7 @@ class GroupCoordinatorAdapterTest { @Test def testDeleteGroups(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.DELETE_GROUPS, ApiKeys.DELETE_GROUPS.latestVersion) val groupIds = List("group-1", "group-2", "group-3") @@ -446,7 +448,7 @@ class GroupCoordinatorAdapterTest { val bar1 = new TopicPartition("bar", 1) val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) when(groupCoordinator.handleFetchOffsets( "group", @@ -527,7 +529,7 @@ class GroupCoordinatorAdapterTest { val bar1 = new TopicPartition("bar", 1) val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) when(groupCoordinator.handleFetchOffsets( "group", @@ -608,4 +610,74 @@ class GroupCoordinatorAdapterTest { future.get().asScala.toList.sortWith(_.name > _.name) ) } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + def testCommitOffsets(version: Short): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val time = new MockTime() + val adapter = new GroupCoordinatorAdapter(groupCoordinator, time) + val now = time.milliseconds() + + val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version) + val data = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setRetentionTimeMs(1000) + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommitTimestamp(now) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + val bufferSupplier = BufferSupplier.create() + + val future = adapter.commitOffsets(ctx, data, bufferSupplier) + assertFalse(future.isDone) + + val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit]) + + verify(groupCoordinator).handleCommitOffsets( + ArgumentMatchers.eq(data.groupId), + ArgumentMatchers.eq(data.memberId), + ArgumentMatchers.eq(None), + ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(Map( + new TopicPartition("foo", 0) -> new OffsetAndMetadata( + offset = 100, + leaderEpoch = Optional.of[Integer](1), + metadata = "", + commitTimestamp = now, + expireTimestamp = Some(now + 1000L) + ) + )), + capturedCallback.capture(), + ArgumentMatchers.eq(RequestLocal(bufferSupplier)) + ) + + capturedCallback.getValue.apply(Map( + new TopicPartition("foo", 0) -> Errors.NONE + )) + + val expectedResponseData = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).asJava) + + assertTrue(future.isDone) + assertEquals(expectedResponseData, future.get()) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 27b2338a8e774..81c2c9ffbe2f4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1264,6 +1264,239 @@ class KafkaApisTest { ) } + @Test + def testHandleOffsetCommitRequest(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(newGroupCoordinator.commitOffsets( + requestChannelRequest.context, + offsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val offsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(offsetCommitResponse) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(offsetCommitResponse, response.data) + } + + @Test + def testHandleOffsetCommitRequestFutureFailed(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(newGroupCoordinator.commitOffsets( + requestChannelRequest.context, + offsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val expectedOffsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) + + future.completeExceptionally(Errors.NOT_COORDINATOR.exception) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(expectedOffsetCommitResponse, response.data) + } + + @Test + def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 2) + addTopicToMetadataCache("bar", numPartitions = 2) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(2) + .setCommittedOffset(30)).asJava), + // bar exists. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava), + // zar does not exist. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("zar") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(60), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(70)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + + // This is the request expected by the group coordinator. + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20)).asJava), + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava)).asJava) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(newGroupCoordinator.commitOffsets( + requestChannelRequest.context, + expectedOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val offsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + val expectedOffsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + // foo-2 is first because partitions failing the validation + // are put in the response first. + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(2) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + // zar is before bar because topics failing the validation are + // put in the response first. + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("zar") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(offsetCommitResponse) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(expectedOffsetCommitResponse, response.data) + } + @Test def testOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic" diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 97bd19024602d..30d4841082216 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; @@ -164,5 +166,20 @@ CompletableFuture> fetch String groupId, boolean requireStable ); + + /** + * Commit offsets for a given Group. + * + * @param context The request context. + * @param request The OffsetCommitRequest data. + * @param bufferSupplier The buffer supplier tight to the request thread. + * + * @return A future yielding the response or an exception. + */ + CompletableFuture commitOffsets( + RequestContext context, + OffsetCommitRequestData request, + BufferSupplier bufferSupplier + ); }