diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 0e33ceb7085dd..b430790971918 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import java.util.Optional; public class OffsetCommitRequest extends AbstractRequest { // default values for the current version @@ -121,8 +120,4 @@ public OffsetCommitResponse getErrorResponse(Throwable e) { public static OffsetCommitRequest parse(ByteBuffer buffer, short version) { return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version); } - - public static Optional groupInstanceId(OffsetCommitRequestData request) { - return Optional.ofNullable(request.groupInstanceId()); - } } diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json index 797c7bfe515c2..dbb23e3d4388c 100644 --- a/clients/src/main/resources/common/message/OffsetCommitResponse.json +++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json @@ -30,7 +30,8 @@ // Version 8 is the first flexible version. // // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is - // the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used. + // the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and + // GROUP_ID_NOT_FOUND when the group does not exist for both protocols. "validVersions": "0-9", "flexibleVersions": "8+", // Supported errors: @@ -42,6 +43,7 @@ // - UNKNOWN_MEMBER_ID (version 1+) // - INVALID_COMMIT_OFFSET_SIZE (version 0+) // - FENCED_MEMBER_EPOCH (version 7+) + // - GROUP_ID_NOT_FOUND (version 9+) // - STALE_MEMBER_EPOCH (version 9+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index eda36bf363cb4..1d0bd0124b02a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -532,6 +532,7 @@ class BrokerServer( config.consumerGroupMaxSize, config.consumerGroupAssignors, config.offsetsTopicSegmentBytes, + config.offsetMetadataMaxSize, config.groupMaxSize, config.groupInitialRebalanceDelay, GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 0975a2c38ed47..33a2d50925807 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.KafkaException; + /** * Interface common for all groups. */ @@ -50,4 +52,18 @@ public String toString() { * @return The group id. */ String groupId(); + + /** + * Validates the OffsetCommit request. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch + * for consumer groups. + */ + void validateOffsetCommit( + String memberId, + String groupInstanceId, + int generationIdOrMemberEpoch + ) throws KafkaException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 6bba39bc16dab..fe31a24524f03 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -61,6 +61,11 @@ public class GroupCoordinatorConfig { */ public final int offsetsTopicSegmentBytes; + /** + * The maximum size for a metadata entry associated with an offset commit. + */ + public final int offsetMetadataMaxSize; + /** * The generic group maximum size. */ @@ -93,6 +98,7 @@ public GroupCoordinatorConfig( int consumerGroupMaxSize, List consumerGroupAssignors, int offsetsTopicSegmentBytes, + int offsetMetadataMaxSize, int genericGroupMaxSize, int genericGroupInitialRebalanceDelayMs, int genericGroupNewMemberJoinTimeoutMs, @@ -105,6 +111,7 @@ public GroupCoordinatorConfig( this.consumerGroupMaxSize = consumerGroupMaxSize; this.consumerGroupAssignors = consumerGroupAssignors; this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; this.genericGroupMaxSize = genericGroupMaxSize; this.genericGroupInitialRebalanceDelayMs = genericGroupInitialRebalanceDelayMs; this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 6c8f2fbefbff6..6783eff79f250 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.BufferSupplier; @@ -492,9 +493,49 @@ public CompletableFuture commitOffsets( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + // For backwards compatibility, we support offset commits for the empty groupId. + if (request.groupId() == null) { + return CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse( + request, + Errors.INVALID_GROUP_ID + )); + } + + return runtime.scheduleWriteOperation( + "commit-offset", + topicPartitionFor(request.groupId()), + coordinator -> coordinator.commitOffset(context, request) + ).exceptionally(exception -> { + if (exception instanceof UnknownTopicOrPartitionException || + exception instanceof NotEnoughReplicasException) { + return OffsetCommitRequest.getErrorResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE + ); + } + + if (exception instanceof NotLeaderOrFollowerException || + exception instanceof KafkaStorageException) { + return OffsetCommitRequest.getErrorResponse( + request, + Errors.NOT_COORDINATOR + ); + } + + if (exception instanceof RecordTooLargeException || + exception instanceof RecordBatchTooLargeException || + exception instanceof InvalidFetchSizeException) { + return OffsetCommitRequest.getErrorResponse( + request, + Errors.INVALID_COMMIT_OFFSET_SIZE + ); + } + + return OffsetCommitRequest.getErrorResponse( + request, + Errors.forException(exception) + ); + }); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 8f03c3799b4b8..4ed7a4d2bf646 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; @@ -82,7 +83,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -102,6 +102,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.Utils.ofSentinel; import static org.apache.kafka.coordinator.group.generic.GenericGroupMember.EMPTY_ASSIGNMENT; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; @@ -410,6 +411,17 @@ public MetadataImage image() { return metadataImage; } + /** + * @return The group corresponding to the group id or throw GroupIdNotFoundException. + */ + public Group group(String groupId) throws GroupIdNotFoundException { + Group group = groups.get(groupId); + if (group == null) { + throw new GroupIdNotFoundException(String.format("Group %s not found.", groupId)); + } + return group; + } + /** * Gets or maybe creates a consumer group. * @@ -675,10 +687,6 @@ private List fromAssignmentM .collect(Collectors.toList()); } - private OptionalInt ofSentinel(int value) { - return value != -1 ? OptionalInt.of(value) : OptionalInt.empty(); - } - /** * Handles a regular heartbeat from a consumer group member. It mainly consists of * three parts: @@ -1804,102 +1812,102 @@ private CoordinatorResult genericGroupJoinExistingMember( responseFuture ); } else { - Optional memberError = validateExistingMember( - group, - memberId, - groupInstanceId, - "join-group" - ); - - if (memberError.isPresent()) { + try { + group.validateMember( + memberId, + groupInstanceId, + "join-group" + ); + } catch (KafkaException ex) { responseFuture.complete(new JoinGroupResponseData() .setMemberId(memberId) - .setErrorCode(memberError.get().code()) + .setErrorCode(Errors.forException(ex).code()) .setProtocolType(null) .setProtocolName(null) ); - } else { - GenericGroupMember member = group.member(memberId); - if (group.isInState(PREPARING_REBALANCE)) { + return EMPTY_RESULT; + } + + GenericGroupMember member = group.member(memberId); + if (group.isInState(PREPARING_REBALANCE)) { + return updateMemberThenRebalanceOrCompleteJoin( + request, + group, + member, + "Member " + member.memberId() + " is joining group during " + group.stateAsString() + + "; client reason: " + JoinGroupRequest.joinReason(request), + responseFuture + ); + } else if (group.isInState(COMPLETING_REBALANCE)) { + if (member.matches(request.protocols())) { + // Member is joining with the same metadata (which could be because it failed to + // receive the initial JoinGroup response), so just return current group information + // for the current generation. + responseFuture.complete(new JoinGroupResponseData() + .setMembers(group.isLeader(memberId) ? + group.currentGenericGroupMembers() : Collections.emptyList()) + .setMemberId(memberId) + .setGenerationId(group.generationId()) + .setProtocolName(group.protocolName().orElse(null)) + .setProtocolType(group.protocolType().orElse(null)) + .setLeader(group.leaderOrNull()) + .setSkipAssignment(false) + ); + } else { + // Member has changed metadata, so force a rebalance return updateMemberThenRebalanceOrCompleteJoin( request, group, member, - "Member " + member.memberId() + " is joining group during " + group.stateAsString() + + "Updating metadata for member " + memberId + " during " + group.stateAsString() + + "; client reason: " + JoinGroupRequest.joinReason(request), + responseFuture + ); + } + } else if (group.isInState(STABLE)) { + if (group.isLeader(memberId)) { + // Force a rebalance if the leader sends JoinGroup; + // This allows the leader to trigger rebalances for changes affecting assignment + // which do not affect the member metadata (such as topic metadata changes for the consumer) + return updateMemberThenRebalanceOrCompleteJoin( + request, + group, + member, + "Leader " + memberId + " re-joining group during " + group.stateAsString() + + "; client reason: " + JoinGroupRequest.joinReason(request), + responseFuture + ); + } else if (!member.matches(request.protocols())) { + return updateMemberThenRebalanceOrCompleteJoin( + request, + group, + member, + "Updating metadata for member " + memberId + " during " + group.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(request), responseFuture ); - } else if (group.isInState(COMPLETING_REBALANCE)) { - if (member.matches(request.protocols())) { - // Member is joining with the same metadata (which could be because it failed to - // receive the initial JoinGroup response), so just return current group information - // for the current generation. - responseFuture.complete(new JoinGroupResponseData() - .setMembers(group.isLeader(memberId) ? - group.currentGenericGroupMembers() : Collections.emptyList()) - .setMemberId(memberId) - .setGenerationId(group.generationId()) - .setProtocolName(group.protocolName().orElse(null)) - .setProtocolType(group.protocolType().orElse(null)) - .setLeader(group.leaderOrNull()) - .setSkipAssignment(false) - ); - } else { - // Member has changed metadata, so force a rebalance - return updateMemberThenRebalanceOrCompleteJoin( - request, - group, - member, - "Updating metadata for member " + memberId + " during " + group.stateAsString() + - "; client reason: " + JoinGroupRequest.joinReason(request), - responseFuture - ); - } - } else if (group.isInState(STABLE)) { - if (group.isLeader(memberId)) { - // Force a rebalance if the leader sends JoinGroup; - // This allows the leader to trigger rebalances for changes affecting assignment - // which do not affect the member metadata (such as topic metadata changes for the consumer) - return updateMemberThenRebalanceOrCompleteJoin( - request, - group, - member, - "Leader " + memberId + " re-joining group during " + group.stateAsString() + - "; client reason: " + JoinGroupRequest.joinReason(request), - responseFuture - ); - } else if (!member.matches(request.protocols())) { - return updateMemberThenRebalanceOrCompleteJoin( - request, - group, - member, - "Updating metadata for member " + memberId + " during " + group.stateAsString() + - "; client reason: " + JoinGroupRequest.joinReason(request), - responseFuture - ); - } else { - // For followers with no actual change to their metadata, just return group information - // for the current generation which will allow them to issue SyncGroup. - responseFuture.complete(new JoinGroupResponseData() - .setMembers(Collections.emptyList()) - .setMemberId(memberId) - .setGenerationId(group.generationId()) - .setProtocolName(group.protocolName().orElse(null)) - .setProtocolType(group.protocolType().orElse(null)) - .setLeader(group.leaderOrNull()) - .setSkipAssignment(false) - ); - } } else { - // Group reached unexpected (Empty) state. Let the joining member reset their generation and rejoin. - log.warn("Attempt to add rejoining member {} of group {} in unexpected group state {}", - memberId, group.groupId(), group.stateAsString()); - + // For followers with no actual change to their metadata, just return group information + // for the current generation which will allow them to issue SyncGroup. responseFuture.complete(new JoinGroupResponseData() + .setMembers(Collections.emptyList()) .setMemberId(memberId) - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + .setGenerationId(group.generationId()) + .setProtocolName(group.protocolName().orElse(null)) + .setProtocolType(group.protocolType().orElse(null)) + .setLeader(group.leaderOrNull()) + .setSkipAssignment(false) ); } + } else { + // Group reached unexpected (Empty) state. Let the joining member reset their generation and rejoin. + log.warn("Attempt to add rejoining member {} of group {} in unexpected group state {}", + memberId, group.groupId(), group.stateAsString()); + + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(memberId) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); } } @@ -2149,48 +2157,6 @@ private CoordinatorResult updateMemberThenRebalanceOrCompleteJoin( return maybePrepareRebalanceOrCompleteJoin(group, joinReason); } - /** - * We are validating two things: - * 1. If `groupInstanceId` is present, then it exists and is mapped to `memberId` - * 2. The `memberId` exists in the group - * - * @param group The generic group. - * @param memberId The member id. - * @param groupInstanceId The group instance id. - * @param operation The API operation. - * - * @return the error. - */ - private Optional validateExistingMember( - GenericGroup group, - String memberId, - String groupInstanceId, - String operation - ) { - if (groupInstanceId == null) { - if (!group.hasMemberId(memberId)) { - return Optional.of(Errors.UNKNOWN_MEMBER_ID); - } else { - return Optional.empty(); - } - } - - String existingMemberId = group.staticMemberId(groupInstanceId); - if (existingMemberId == null) { - return Optional.of(Errors.UNKNOWN_MEMBER_ID); - } - - if (!existingMemberId.equals(memberId)) { - log.info("Request memberId={} for static member with groupInstanceId={} " + - "is fenced by existing memberId={} during operation {}", - memberId, groupInstanceId, existingMemberId, operation); - - return Optional.of(Errors.FENCED_INSTANCE_ID); - } - - return Optional.empty(); - } - /** * Add a member then rebalance or complete join. * @@ -2444,7 +2410,7 @@ private void propagateAssignment(GenericGroup group, Errors error) { * @param group The group. * @param member The member. */ - private void rescheduleGenericGroupMemberHeartbeat( + public void rescheduleGenericGroupMemberHeartbeat( GenericGroup group, GenericGroupMember member ) { @@ -2825,24 +2791,23 @@ private Optional validateSyncGroup( // finding the correct coordinator and rejoin. return Optional.of(COORDINATOR_NOT_AVAILABLE); } else { - Optional memberError = validateExistingMember( - group, - request.memberId(), - request.groupInstanceId(), - "sync-group" - ); - if (memberError.isPresent()) { - return memberError; - } else { - if (request.generationId() != group.generationId()) { - return Optional.of(Errors.ILLEGAL_GENERATION); - } else if (isProtocolInconsistent(request.protocolType(), group.protocolType().orElse(null)) || - isProtocolInconsistent(request.protocolName(), group.protocolName().orElse(null))) { + try { + group.validateMember( + request.memberId(), + request.groupInstanceId(), + "sync-group" + ); + } catch (KafkaException ex) { + return Optional.of(Errors.forException(ex)); + } - return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); - } else { - return Optional.empty(); - } + if (request.generationId() != group.generationId()) { + return Optional.of(Errors.ILLEGAL_GENERATION); + } else if (isProtocolInconsistent(request.protocolType(), group.protocolType().orElse(null)) || + isProtocolInconsistent(request.protocolName(), group.protocolName().orElse(null))) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + } else { + return Optional.empty(); } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java index b59933a249d00..cf0b936917d85 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.coordinator.group; -import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; @@ -24,11 +24,13 @@ import java.util.OptionalInt; import java.util.OptionalLong; +import static org.apache.kafka.coordinator.group.Utils.ofSentinel; + /** * Represents a committed offset with its metadata. */ public class OffsetAndMetadata { - public static final String NO_METADATA = ""; + private static final String NO_METADATA = ""; /** * The committed offset. @@ -114,12 +116,29 @@ public static OffsetAndMetadata fromRecord( ) { return new OffsetAndMetadata( record.offset(), - record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? - OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()), + ofSentinel(record.leaderEpoch()), record.metadata(), record.commitTimestamp(), - record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ? - OptionalLong.empty() : OptionalLong.of(record.expireTimestamp()) + ofSentinel(record.expireTimestamp()) + ); + } + + /** + * @return An OffsetAndMetadata created from an OffsetCommitRequestPartition request. + */ + public static OffsetAndMetadata fromRequest( + OffsetCommitRequestData.OffsetCommitRequestPartition partition, + long currentTimeMs, + OptionalLong expireTimestampMs + ) { + return new OffsetAndMetadata( + partition.committedOffset(), + ofSentinel(partition.committedLeaderEpoch()), + partition.committedMetadata() == null ? + OffsetAndMetadata.NO_METADATA : partition.committedMetadata(), + partition.commitTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ? + currentTimeMs : partition.commitTimestamp(), + expireTimestampMs ); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java new file mode 100644 index 0000000000000..8d89f6a980a1e --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private Group validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + group = groupMetadataManager.group(request.groupId()); + } catch (GroupIdNotFoundException ex) { + if (request.generationIdOrMemberEpoch() < 0) { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true); + } else { + if (context.header.apiVersion() >= 9) { + // Starting from version 9 of the OffsetCommit API, we return GROUP_ID_NOT_FOUND + // if the group does not exist. This error works for both the old and the new + // protocol for clients using this version of the API. + throw ex; + } else { + // For older version, we return ILLEGAL_GENERATION to preserve the backward + // compatibility. + throw Errors.ILLEGAL_GENERATION.exception(); + } + } + } + + try { + group.validateOffsetCommit( + request.memberId(), + request.groupInstanceId(), + request.generationIdOrMemberEpoch() + ); + } catch (StaleMemberEpochException ex) { + // The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When + // it is, the member should be using the OffsetCommit API version >= 9. As we don't + // support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION + // error if an older version is used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw ex; + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + + return group; + } + + /** + * Computes the expiration timestamp based on the retention time provided in the OffsetCommit + * request. + * + * The "default" expiration timestamp is defined as now + retention. The retention may be overridden + * in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + * commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + * + * @param retentionTimeMs The retention time in milliseconds. + * @param currentTimeMs The current time in milliseconds. + * + * @return An optional containing the expiration timestamp if defined; an empty optional otherwise. + */ + private static OptionalLong expireTimestampMs( + long retentionTimeMs, + long currentTimeMs + ) { + return retentionTimeMs == OffsetCommitRequest.DEFAULT_RETENTION_TIME ? + OptionalLong.empty() : OptionalLong.of(currentTimeMs + retentionTimeMs); + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group = validateOffsetCommit(context, request); + + // In the old consumer group protocol, the offset commits maintain the session if + // the group is in Stable or PreparingRebalance state. + if (group.type() == Group.GroupType.GENERIC) { + GenericGroup genericGroup = (GenericGroup) group; + if (genericGroup.isInState(GenericGroupState.STABLE) || genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE)) { + groupMetadataManager.rescheduleGenericGroupMemberHeartbeat( + genericGroup, + genericGroup.member(request.memberId()) + ); + } + } + + final OffsetCommitResponseData response = new OffsetCommitResponseData(); + final List records = new ArrayList<>(); + final long currentTimeMs = time.milliseconds(); + final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); + + request.topics().forEach(topic -> { + final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name()); + response.topics().add(topicResponse); + + topic.partitions().forEach(partition -> { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); + } else { + log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + request.memberId(), partition.committedLeaderEpoch()); + + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + + final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRequest( + partition, + currentTimeMs, + expireTimestampMs + ); + + records.add(RecordHelpers.newOffsetCommitRecord( + request.groupId(), + topic.name(), + partition.partitionIndex(), + offsetAndMetadata, + metadataImage.features().metadataVersion() + )); + } + }); + }); + + return new CoordinatorResult<>(records, response); + } + + /** + * Replays OffsetCommitKey/Value to update or delete the corresponding offsets. + * + * @param key A OffsetCommitKey key. + * @param value A OffsetCommitValue value. + */ + public void replay( + OffsetCommitKey key, + OffsetCommitValue value + ) { + final String groupId = key.group(); + final TopicPartition tp = new TopicPartition(key.topic(), key.partition()); + + if (value != null) { + // The generic or consumer group should exist when offsets are committed or + // replayed. However, it won't if the consumer commits offsets but does not + // use the membership functionality. In this case, we automatically create + // a so-called "simple consumer group". This is an empty generic group + // without a protocol type. + try { + groupMetadataManager.group(groupId); + } catch (GroupIdNotFoundException ex) { + groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, true); + } + + final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRecord(value); + TimelineHashMap offsets = offsetsByGroup.get(groupId); + if (offsets == null) { + offsets = new TimelineHashMap<>(snapshotRegistry, 0); + offsetsByGroup.put(groupId, offsets); + } + + offsets.put(tp, offsetAndMetadata); + } else { + TimelineHashMap offsets = offsetsByGroup.get(groupId); + if (offsets != null) { + offsets.remove(tp); + if (offsets.isEmpty()) { + offsetsByGroup.remove(groupId); + } + } + } + } + + /** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ + public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { + metadataImage = newImage; + } + + /** + * @return The offset for the provided groupId and topic partition or null + * if it does not exist. + * + * package-private for testing. + */ + OffsetAndMetadata offset(String groupId, TopicPartition tp) { + Map offsets = offsetsByGroup.get(groupId); + if (offsets == null) { + return null; + } else { + return offsets.get(tp); + } + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java index 344c6e8fabe03..26a318f42348f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java @@ -21,8 +21,11 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.LogContext; @@ -41,6 +44,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.runtime.Coordinator; import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; @@ -132,21 +137,32 @@ public ReplicatedGroupCoordinator build() { if (topicPartition == null) throw new IllegalArgumentException("TopicPartition must be set."); + GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withTime(time) + .withTimer(timer) + .withTopicPartition(topicPartition) + .withAssignors(config.consumerGroupAssignors) + .withConsumerGroupMaxSize(config.consumerGroupMaxSize) + .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs) + .withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs) + .withGenericGroupNewMemberJoinTimeoutMs(config.genericGroupNewMemberJoinTimeoutMs) + .withGenericGroupMinSessionTimeoutMs(config.genericGroupMinSessionTimeoutMs) + .withGenericGroupMaxSessionTimeoutMs(config.genericGroupMaxSessionTimeoutMs) + .build(); + + OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withTime(time) + .withGroupMetadataManager(groupMetadataManager) + .withOffsetMetadataMaxSize(config.offsetMetadataMaxSize) + .build(); + return new ReplicatedGroupCoordinator( - new GroupMetadataManager.Builder() - .withLogContext(logContext) - .withSnapshotRegistry(snapshotRegistry) - .withTime(time) - .withTimer(timer) - .withAssignors(config.consumerGroupAssignors) - .withConsumerGroupMaxSize(config.consumerGroupMaxSize) - .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs) - .withTopicPartition(topicPartition) - .withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs) - .withGenericGroupNewMemberJoinTimeoutMs(config.genericGroupNewMemberJoinTimeoutMs) - .withGenericGroupMinSessionTimeoutMs(config.genericGroupMinSessionTimeoutMs) - .withGenericGroupMaxSessionTimeoutMs(config.genericGroupMaxSessionTimeoutMs) - .build() + groupMetadataManager, + offsetMetadataManager ); } } @@ -156,15 +172,23 @@ public ReplicatedGroupCoordinator build() { */ private final GroupMetadataManager groupMetadataManager; + /** + * The offset metadata manager. + */ + private final OffsetMetadataManager offsetMetadataManager; + /** * Constructor. * - * @param groupMetadataManager The group metadata manager. + * @param groupMetadataManager The group metadata manager. + * @param offsetMetadataManager The offset metadata manager. */ ReplicatedGroupCoordinator( - GroupMetadataManager groupMetadataManager + GroupMetadataManager groupMetadataManager, + OffsetMetadataManager offsetMetadataManager ) { this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataManager = offsetMetadataManager; } /** @@ -225,6 +249,22 @@ public CoordinatorResult genericGroupSync( ); } + /** + * Handles a OffsetCommit request. + * + * @param context The request context. + * @param request The actual OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponse response and + * a list of records to update the state machine. + */ + public CoordinatorResult commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + return offsetMetadataManager.commitOffset(context, request); + } + /** * The coordinator has been loaded. This is used to apply any * post loading operations (e.g. registering timers). @@ -233,7 +273,10 @@ public CoordinatorResult genericGroupSync( */ @Override public void onLoaded(MetadataImage newImage) { - groupMetadataManager.onNewMetadataImage(newImage, new MetadataDelta(newImage)); + MetadataDelta emptyDelta = new MetadataDelta(newImage); + groupMetadataManager.onNewMetadataImage(newImage, emptyDelta); + offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta); + groupMetadataManager.onLoaded(); } @@ -246,6 +289,7 @@ public void onLoaded(MetadataImage newImage) { @Override public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { groupMetadataManager.onNewMetadataImage(newImage, delta); + offsetMetadataManager.onNewMetadataImage(newImage, delta); } /** @@ -271,6 +315,14 @@ public void replay(Record record) throws RuntimeException { ApiMessageAndVersion value = record.value(); switch (key.version()) { + case 0: + case 1: + offsetMetadataManager.replay( + (OffsetCommitKey) key.message(), + (OffsetCommitValue) messageOrNull(value) + ); + break; + case 2: groupMetadataManager.replay( (GroupMetadataKey) key.message(), diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java new file mode 100644 index 0000000000000..7fe92c5bd8cf4 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import java.util.OptionalInt; +import java.util.OptionalLong; + +public class Utils { + private Utils() {} + + /** + * @return An OptionalInt containing the value iff the value is different from + * the sentinel (or default) value -1. + */ + public static OptionalInt ofSentinel(int value) { + return value != -1 ? OptionalInt.of(value) : OptionalInt.empty(); + } + + /** + * @return An OptionalLong containing the value iff the value is different from + * the sentinel (or default) value -1. + */ + public static OptionalLong ofSentinel(long value) { + return value != -1 ? OptionalLong.of(value) : OptionalLong.empty(); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 6f682d4e82f87..533c590c36d5d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -17,7 +17,9 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; @@ -497,6 +499,30 @@ public DeadlineAndEpoch metadataRefreshDeadline() { return metadataRefreshDeadline; } + /** + * Validates the OffsetCommit request. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param memberEpoch The member epoch. + */ + @Override + public void validateOffsetCommit( + String memberId, + String groupInstanceId, + int memberEpoch + ) throws UnknownMemberIdException, StaleMemberEpochException { + // When the member epoch is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + if (memberEpoch < 0 && members().isEmpty()) return; + + final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); + if (memberEpoch != member.memberEpoch()) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } + } + /** * Updates the current state of the group. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java index 813e2da304d03..d43ecda2cf78e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java @@ -17,6 +17,10 @@ package org.apache.kafka.coordinator.group.generic; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.ListGroupsResponseData; @@ -738,6 +742,84 @@ public String generateMemberId(String clientId, Optional groupInstanceId .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + UUID.randomUUID()); } + /** + * Validates that (1) the group instance id exists and is mapped to the member id + * if the group instance id is provided; and (2) the member id exists in the group. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param operation The operation. + * + * @throws UnknownMemberIdException + * @throws FencedInstanceIdException + */ + public void validateMember( + String memberId, + String groupInstanceId, + String operation + ) throws UnknownMemberIdException, FencedInstanceIdException { + if (groupInstanceId != null) { + String existingMemberId = staticMemberId(groupInstanceId); + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!existingMemberId.equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={} during operation {}", + memberId, groupInstanceId, existingMemberId, operation); + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!hasMemberId(memberId)) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + } + + /** + * Validates the OffsetCommit request. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param generationId The generation id. + */ + @Override + public void validateOffsetCommit( + String memberId, + String groupInstanceId, + int generationId + ) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException { + if (isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (generationId < 0 && isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != null) { + validateMember(memberId, groupInstanceId, "offset-commit"); + + if (generationId != this.generationId) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!isInState(EMPTY)) { + // If the request does not contain the member id and the generation id (version 0), + // offset commits are only accepted when the group is empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + /** * Verify the member id is up to date for static members. Return true if both conditions met: * 1. given member is a known static member to group diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 9025a1ffb5d4b..1d31bd508451d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -35,6 +35,7 @@ public void testConfigs() { 55, Collections.singletonList(assignor), 2222, + 3333, 60, 3000, 5 * 60 * 1000, @@ -48,6 +49,7 @@ public void testConfigs() { assertEquals(55, config.consumerGroupMaxSize); assertEquals(Collections.singletonList(assignor), config.consumerGroupAssignors); assertEquals(2222, config.offsetsTopicSegmentBytes); + assertEquals(3333, config.offsetMetadataMaxSize); assertEquals(60, config.genericGroupMaxSize); assertEquals(3000, config.genericGroupInitialRebalanceDelayMs); assertEquals(5 * 60 * 1000, config.genericGroupNewMemberJoinTimeoutMs); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index e3b598bf168ac..2ceebeceff714 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -91,6 +91,7 @@ private GroupCoordinatorConfig createConfig() { Integer.MAX_VALUE, Collections.singletonList(new RangeAssignor()), 1000, + 4096, Integer.MAX_VALUE, 3000, 5 * 60 * 1000, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java index adac36b8ad16a..946fa701c614d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.server.util.MockTime; import org.junit.jupiter.api.Test; import java.util.OptionalInt; @@ -71,4 +73,64 @@ public void testFromRecord() { OptionalLong.of(5678L) ), OffsetAndMetadata.fromRecord(record)); } + + @Test + public void testFromRequest() { + MockTime time = new MockTime(); + + OffsetCommitRequestData.OffsetCommitRequestPartition partition = + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + .setCommittedLeaderEpoch(-1) + .setCommittedMetadata(null) + .setCommitTimestamp(-1L); + + assertEquals( + new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "", + time.milliseconds(), + OptionalLong.empty() + ), OffsetAndMetadata.fromRequest( + partition, + time.milliseconds(), + OptionalLong.empty() + ) + ); + + partition + .setCommittedLeaderEpoch(10) + .setCommittedMetadata("hello") + .setCommitTimestamp(1234L); + + assertEquals( + new OffsetAndMetadata( + 100L, + OptionalInt.of(10), + "hello", + 1234L, + OptionalLong.empty() + ), OffsetAndMetadata.fromRequest( + partition, + time.milliseconds(), + OptionalLong.empty() + ) + ); + + assertEquals( + new OffsetAndMetadata( + 100L, + OptionalInt.of(10), + "hello", + 1234L, + OptionalLong.of(5678L) + ), OffsetAndMetadata.fromRequest( + partition, + time.milliseconds(), + OptionalLong.of(5678L) + ) + ); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java new file mode 100644 index 0000000000000..496df95e2ee1d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -0,0 +1,1146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.generic.GenericGroupMember; +import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OffsetMetadataManagerTest { + static class OffsetMetadataManagerTestContext { + public static class Builder { + final private MockTime time = new MockTime(); + final private MockCoordinatorTimer timer = new MockCoordinatorTimer<>(time); + final private LogContext logContext = new LogContext(); + final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private MetadataImage metadataImage = null; + private int offsetMetadataMaxSize = 4096; + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + OffsetMetadataManagerTestContext build() { + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + + GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() + .withTime(time) + .withTimer(timer) + .withSnapshotRegistry(snapshotRegistry) + .withLogContext(logContext) + .withMetadataImage(metadataImage) + .withTopicPartition(new TopicPartition("__consumer_offsets", 0)) + .withAssignors(Collections.singletonList(new RangeAssignor())) + .build(); + + OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() + .withTime(time) + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withMetadataImage(metadataImage) + .withGroupMetadataManager(groupMetadataManager) + .withOffsetMetadataMaxSize(offsetMetadataMaxSize) + .build(); + + return new OffsetMetadataManagerTestContext( + time, + timer, + snapshotRegistry, + groupMetadataManager, + offsetMetadataManager + ); + } + } + + final MockTime time; + final MockCoordinatorTimer timer; + final SnapshotRegistry snapshotRegistry; + final GroupMetadataManager groupMetadataManager; + final OffsetMetadataManager offsetMetadataManager; + + long lastCommittedOffset = 0L; + long lastWrittenOffset = 0L; + + OffsetMetadataManagerTestContext( + MockTime time, + MockCoordinatorTimer timer, + SnapshotRegistry snapshotRegistry, + GroupMetadataManager groupMetadataManager, + OffsetMetadataManager offsetMetadataManager + ) { + this.time = time; + this.timer = timer; + this.snapshotRegistry = snapshotRegistry; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataManager = offsetMetadataManager; + } + + public CoordinatorResult commitOffset( + OffsetCommitRequestData request + ) { + return commitOffset(ApiKeys.OFFSET_COMMIT.latestVersion(), request); + } + + public CoordinatorResult commitOffset( + short version, + OffsetCommitRequestData request + ) { + snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.OFFSET_COMMIT, + version, + "client", + 0 + ), + "1", + InetAddress.getLoopbackAddress(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + CoordinatorResult result = offsetMetadataManager.commitOffset( + context, + request + ); + + result.records().forEach(this::replay); + return result; + } + + public List> sleep(long ms) { + time.sleep(ms); + List> timeouts = timer.poll(); + timeouts.forEach(timeout -> { + if (timeout.result.replayRecords()) { + timeout.result.records().forEach(this::replay); + } + }); + return timeouts; + } + + private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) { + if (apiMessageAndVersion == null) { + return null; + } else { + return apiMessageAndVersion.message(); + } + } + + private void replay( + Record record + ) { + ApiMessageAndVersion key = record.key(); + ApiMessageAndVersion value = record.value(); + + if (key == null) { + throw new IllegalStateException("Received a null key in " + record); + } + + switch (key.version()) { + case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION: + offsetMetadataManager.replay( + (OffsetCommitKey) key.message(), + (OffsetCommitValue) messageOrNull(value) + ); + break; + + default: + throw new IllegalStateException("Received an unknown record type " + key.version() + + " in " + record); + } + + lastWrittenOffset++; + } + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testOffsetCommitWithUnknownGroup(short version) { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + Class expectedType; + if (version >= 9) { + expectedType = GroupIdNotFoundException.class; + } else { + expectedType = IllegalGenerationException.class; + } + + // Verify that the request is rejected with the correct exception. + assertThrows(expectedType, () -> context.commitOffset( + version, + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithDeadGroup() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create a dead group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + group.transitionTo(GenericGroupState.DEAD); + + // Verify that the request is rejected with the correct exception. + assertThrows(CoordinatorNotAvailableException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithUnknownMemberId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithIllegalGeneration() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(mkGenericMember("member", Optional.of("new-instance-id"))); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + + // Verify that the request is rejected with the correct exception. + assertThrows(IllegalGenerationException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithUnknownInstanceId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member without static id. + group.add(mkGenericMember("member", Optional.empty())); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGroupInstanceId("instanceid") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithFencedInstanceId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member with static id. + group.add(mkGenericMember("member", Optional.of("new-instance-id"))); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGroupInstanceId("old-instance-id") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWhileInCompletingRebalanceState() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(mkGenericMember("member", Optional.of("new-instance-id"))); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + + // Verify that the request is rejected with the correct exception. + assertThrows(RebalanceInProgressException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(1) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithoutMemberIdAndGeneration() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(mkGenericMember("member", Optional.of("new-instance-id"))); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithRetentionTime() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(mkGenericMember("member", Optional.of("new-instance-id"))); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + group.transitionTo(GenericGroupState.STABLE); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(1) + .setRetentionTimeMs(1234L) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + Collections.singletonList(RecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "", + context.time.milliseconds(), + OptionalLong.of(context.time.milliseconds() + 1234L) + ), + MetadataImage.EMPTY.features().metadataVersion() + )), + result.records() + ); + } + + @Test + public void testGenericGroupOffsetCommitMaintainsSession() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create a group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + GenericGroupMember member = mkGenericMember("member", Optional.empty()); + group.add(member); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + group.transitionTo(GenericGroupState.STABLE); + + // Schedule session timeout. This would be normally done when + // the group transitions to stable. + context.groupMetadataManager.rescheduleGenericGroupMemberHeartbeat(group, member); + + // Advance time by half of the session timeout. No timeouts are + // expired. + assertEquals(Collections.emptyList(), context.sleep(5000 / 2)); + + // Commit. + context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(1) + .setRetentionTimeMs(1234L) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ); + + // Advance time by half of the session timeout. No timeouts are + // expired. + assertEquals(Collections.emptyList(), context.sleep(5000 / 2)); + + // Advance time by half of the session timeout again. The timeout should + // expire and the member is removed from the group. + List> timeouts = + context.sleep(5000 / 2); + assertEquals(1, timeouts.size()); + assertFalse(group.hasMemberId(member.memberId())); + } + + @Test + public void testSimpleGroupOffsetCommit() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + Collections.singletonList(RecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "", + context.time.milliseconds(), + OptionalLong.empty() + ), + MetadataImage.EMPTY.features().metadataVersion() + )), + result.records() + ); + + // A generic should have been created. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + false + ); + assertNotNull(group); + assertEquals("foo", group.groupId()); + } + + @Test + public void testSimpleGroupOffsetCommitWithInstanceId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + // Instance id should be ignored. + .setGroupInstanceId("instance-id") + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + Collections.singletonList(RecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "", + context.time.milliseconds(), + OptionalLong.empty() + ), + MetadataImage.EMPTY.features().metadataVersion() + )), + result.records() + ); + } + + @Test + public void testConsumerGroupOffsetCommitWithUnknownMemberId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + context.groupMetadataManager.getOrMaybeCreateConsumerGroup( + "foo", + true + ); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup( + "foo", + true + ); + + // Add member. + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setTargetMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + + OffsetCommitRequestData request = new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(9) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )); + + // Verify that a smaller epoch is rejected. + assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request)); + + // Verify that a larger epoch is rejected. + request.setGenerationIdOrMemberEpoch(11); + assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request)); + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) { + // All the newer versions are fine. + if (version >= 9) return; + // Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields. + if (version == 0) return; + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup( + "foo", + true + ); + + // Add member. + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setTargetMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnsupportedVersionException.class, () -> context.commitOffset( + version, + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(9) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testConsumerGroupOffsetCommitFromAdminClient() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + context.groupMetadataManager.getOrMaybeCreateConsumerGroup( + "foo", + true + ); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + Collections.singletonList(RecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "", + context.time.milliseconds(), + OptionalLong.empty() + ), + MetadataImage.EMPTY.features().metadataVersion() + )), + result.records() + ); + } + + @Test + public void testConsumerGroupOffsetCommit() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup( + "foo", + true + ); + + // Add member. + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setTargetMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + .setCommittedLeaderEpoch(10) + .setCommittedMetadata("metadata") + .setCommitTimestamp(context.time.milliseconds()) + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + Collections.singletonList(RecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.of(10), + "metadata", + context.time.milliseconds(), + OptionalLong.empty() + ), + MetadataImage.EMPTY.features().metadataVersion() + )), + result.records() + ); + } + + @Test + public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withOffsetMetadataMaxSize(5) + .build(); + + // Create an empty group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup( + "foo", + true + ); + + // Add member. + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setTargetMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Arrays.asList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + .setCommittedLeaderEpoch(10) + .setCommittedMetadata("toolarge") + .setCommitTimestamp(context.time.milliseconds()), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(100L) + .setCommittedLeaderEpoch(10) + .setCommittedMetadata("small") + .setCommitTimestamp(context.time.milliseconds()) + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(Collections.singletonList( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName("bar") + .setPartitions(Arrays.asList( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + Collections.singletonList(RecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 1, + new OffsetAndMetadata( + 100L, + OptionalInt.of(10), + "small", + context.time.milliseconds(), + OptionalLong.empty() + ), + MetadataImage.EMPTY.features().metadataVersion() + )), + result.records() + ); + } + + @Test + public void testReplay() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "small", + context.time.milliseconds(), + OptionalLong.empty() + )); + + verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( + 200L, + OptionalInt.of(10), + "small", + context.time.milliseconds(), + OptionalLong.empty() + )); + + verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata( + 200L, + OptionalInt.of(10), + "small", + context.time.milliseconds(), + OptionalLong.empty() + )); + + verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata( + 300L, + OptionalInt.of(10), + "small", + context.time.milliseconds(), + OptionalLong.of(12345L) + )); + } + + @Test + public void testReplayWithTombstone() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Verify replay adds the offset the map. + verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "small", + context.time.milliseconds(), + OptionalLong.empty() + )); + + // Create a tombstone record and replay it to delete the record. + context.replay(RecordHelpers.newOffsetCommitTombstoneRecord( + "foo", + "bar", + 0 + )); + + // Verify that the offset is gone. + assertNull(context.offsetMetadataManager.offset("foo", new TopicPartition("bar", 0))); + } + + private void verifyReplay( + OffsetMetadataManagerTestContext context, + String groupId, + String topic, + int partition, + OffsetAndMetadata offsetAndMetadata + ) { + context.replay(RecordHelpers.newOffsetCommitRecord( + groupId, + topic, + partition, + offsetAndMetadata, + MetadataImage.EMPTY.features().metadataVersion() + )); + + assertEquals(offsetAndMetadata, context.offsetMetadataManager.offset( + groupId, + new TopicPartition(topic, partition) + )); + } + + private GenericGroupMember mkGenericMember( + String memberId, + Optional groupInstanceId + ) { + return new GenericGroupMember( + memberId, + groupInstanceId, + "client-id", + "host", + 5000, + 5000, + "consumer", + new JoinGroupRequestData.JoinGroupRequestProtocolCollection( + Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(new byte[0]) + ).iterator() + ) + ); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java index 280262c02a4bd..2f585d51ab606 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; @@ -34,6 +36,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -56,8 +60,10 @@ public class ReplicatedGroupCoordinatorTest { @Test public void testConsumerGroupHeartbeat() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); RequestContext context = requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT); @@ -75,11 +81,86 @@ public void testConsumerGroupHeartbeat() { assertEquals(result, coordinator.consumerGroupHeartbeat(context, request)); } + @Test + public void testCommitOffset() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT); + OffsetCommitRequestData request = new OffsetCommitRequestData(); + CoordinatorResult result = new CoordinatorResult<>( + Collections.emptyList(), + new OffsetCommitResponseData() + ); + + when(coordinator.commitOffset( + context, + request + )).thenReturn(result); + + assertEquals(result, coordinator.commitOffset(context, request)); + } + + @Test + public void testReplayOffsetCommit() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + groupMetadataManager, + offsetMetadataManager + ); + + OffsetCommitKey key = new OffsetCommitKey(); + OffsetCommitValue value = new OffsetCommitValue(); + + coordinator.replay(new Record( + new ApiMessageAndVersion(key, (short) 0), + new ApiMessageAndVersion(value, (short) 0) + )); + + coordinator.replay(new Record( + new ApiMessageAndVersion(key, (short) 1), + new ApiMessageAndVersion(value, (short) 0) + )); + + verify(offsetMetadataManager, times(2)).replay(key, value); + } + + @Test + public void testReplayOffsetCommitWithNullValue() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + groupMetadataManager, + offsetMetadataManager + ); + + OffsetCommitKey key = new OffsetCommitKey(); + + coordinator.replay(new Record( + new ApiMessageAndVersion(key, (short) 0), + null + )); + + coordinator.replay(new Record( + new ApiMessageAndVersion(key, (short) 1), + null + )); + + verify(offsetMetadataManager, times(2)).replay(key, null); + } + @Test public void testReplayConsumerGroupMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); @@ -96,8 +177,10 @@ public void testReplayConsumerGroupMetadata() { @Test public void testReplayConsumerGroupMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); @@ -113,8 +196,10 @@ public void testReplayConsumerGroupMetadataWithNullValue() { @Test public void testReplayConsumerGroupPartitionMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); @@ -131,8 +216,10 @@ public void testReplayConsumerGroupPartitionMetadata() { @Test public void testReplayConsumerGroupPartitionMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); @@ -148,8 +235,10 @@ public void testReplayConsumerGroupPartitionMetadataWithNullValue() { @Test public void testReplayConsumerGroupMemberMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); @@ -166,8 +255,10 @@ public void testReplayConsumerGroupMemberMetadata() { @Test public void testReplayConsumerGroupMemberMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); @@ -183,8 +274,10 @@ public void testReplayConsumerGroupMemberMetadataWithNullValue() { @Test public void testReplayConsumerGroupTargetAssignmentMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); @@ -201,8 +294,10 @@ public void testReplayConsumerGroupTargetAssignmentMetadata() { @Test public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); @@ -218,8 +313,10 @@ public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() { @Test public void testReplayConsumerGroupTargetAssignmentMember() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); @@ -236,8 +333,10 @@ public void testReplayConsumerGroupTargetAssignmentMember() { @Test public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); @@ -253,8 +352,10 @@ public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() { @Test public void testReplayConsumerGroupCurrentMemberAssignment() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -271,8 +372,10 @@ public void testReplayConsumerGroupCurrentMemberAssignment() { @Test public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -288,8 +391,10 @@ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { @Test public void testReplayKeyCannotBeNull() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); assertThrows(NullPointerException.class, () -> coordinator.replay(new Record(null, null))); @@ -298,8 +403,10 @@ public void testReplayKeyCannotBeNull() { @Test public void testReplayWithUnsupportedVersion() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -315,8 +422,10 @@ public void testReplayWithUnsupportedVersion() { public void testOnLoaded() { MetadataImage image = MetadataImage.EMPTY; GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); coordinator.onLoaded(image); @@ -332,8 +441,10 @@ public void testOnLoaded() { @Test public void testReplayGroupMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); GroupMetadataKey key = new GroupMetadataKey(); @@ -350,8 +461,10 @@ public void testReplayGroupMetadata() { @Test public void testReplayGroupMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( - groupMetadataManager + groupMetadataManager, + offsetMetadataManager ); GroupMetadataKey key = new GroupMetadataKey(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 7306e3a3c5df3..7b4923c4b17c5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -591,4 +592,31 @@ public void testMetadataRefreshDeadline() { assertEquals(0L, group.metadataRefreshDeadline().deadlineMs); assertEquals(0, group.metadataRefreshDeadline().epoch); } + + @Test + public void testValidateOffsetCommit() { + ConsumerGroup group = createConsumerGroup("group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0)); + + // Create a member. + group.getOrMaybeCreateMember("member-id", true); + + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1)); + + // The member epoch is stale. + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("member-id", "", 10)); + + // This should succeed. + group.validateOffsetCommit("member-id", "", 0); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java index ddc5870b8def7..ba4b177f676cd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java @@ -18,6 +18,11 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import org.apache.kafka.common.message.JoinGroupResponseData; @@ -956,6 +961,71 @@ public void testMaybeElectNewJoinedLeaderChooseExisting() { assertTrue(group.isLeader(memberId)); } + @Test + public void testValidateOffsetCommit() { + // A call from the admin client without any parameters should pass. + group.validateOffsetCommit("", "", -1); + + // Add a member. + group.add(new GenericGroupMember( + "member-id", + Optional.of("instance-id"), + "", + "", + 100, + 100, + "consumer", + new JoinGroupRequestProtocolCollection(Collections.singletonList( + new JoinGroupRequestProtocol() + .setName("roundrobin") + .setMetadata(new byte[0])).iterator()) + )); + + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); + + // No parameters and the group is not empty. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("", "", -1)); + + // The member id does not exist. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("unknown", "unknown", -1)); + + // The instance id does not exist. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("member-id", "unknown", -1)); + + // The generation id is invalid. + assertThrows(IllegalGenerationException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 0)); + + // Group is in prepare rebalance state. + assertThrows(RebalanceInProgressException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 1)); + + // Group transitions to stable. + group.transitionTo(STABLE); + + // This should work. + group.validateOffsetCommit("member-id", "instance-id", 1); + + // Replace static member. + group.replaceStaticMember("instance-id", "member-id", "new-member-id"); + + // The old instance id should be fenced. + assertThrows(FencedInstanceIdException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 1)); + + // Remove member and transitions to dead. + group.remove("new-instance-id"); + group.transitionTo(DEAD); + + // This should fail with CoordinatorNotAvailableException. + assertThrows(CoordinatorNotAvailableException.class, + () -> group.validateOffsetCommit("member-id", "new-instance-id", 1)); + } + private void assertState(GenericGroup group, GenericGroupState targetState) { Set otherStates = new HashSet<>(); otherStates.add(STABLE);