Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
Expand Down Expand Up @@ -121,8 +120,4 @@ public OffsetCommitResponse getErrorResponse(Throwable e) {
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version);
}

public static Optional<String> groupInstanceId(OffsetCommitRequestData request) {
return Optional.ofNullable(request.groupInstanceId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
// Version 8 is the first flexible version.
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used.
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and
// GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
"validVersions": "0-9",
"flexibleVersions": "8+",
// Supported errors:
Expand All @@ -42,6 +43,7 @@
// - UNKNOWN_MEMBER_ID (version 1+)
// - INVALID_COMMIT_OFFSET_SIZE (version 0+)
// - FENCED_MEMBER_EPOCH (version 7+)
// - GROUP_ID_NOT_FOUND (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ class BrokerServer(
config.consumerGroupMaxSize,
config.consumerGroupAssignors,
config.offsetsTopicSegmentBytes,
config.offsetMetadataMaxSize,
config.groupMaxSize,
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.KafkaException;

/**
* Interface common for all groups.
*/
Expand Down Expand Up @@ -50,4 +52,18 @@ public String toString() {
* @return The group id.
*/
String groupId();

/**
* Validates the OffsetCommit request.
*
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch
* for consumer groups.
*/
void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationIdOrMemberEpoch
) throws KafkaException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class GroupCoordinatorConfig {
*/
public final int offsetsTopicSegmentBytes;

/**
* The maximum size for a metadata entry associated with an offset commit.
*/
public final int offsetMetadataMaxSize;

/**
* The generic group maximum size.
*/
Expand Down Expand Up @@ -93,6 +98,7 @@ public GroupCoordinatorConfig(
int consumerGroupMaxSize,
List<PartitionAssignor> consumerGroupAssignors,
int offsetsTopicSegmentBytes,
int offsetMetadataMaxSize,
int genericGroupMaxSize,
int genericGroupInitialRebalanceDelayMs,
int genericGroupNewMemberJoinTimeoutMs,
Expand All @@ -105,6 +111,7 @@ public GroupCoordinatorConfig(
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupAssignors = consumerGroupAssignors;
this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
this.offsetMetadataMaxSize = offsetMetadataMaxSize;
this.genericGroupMaxSize = genericGroupMaxSize;
this.genericGroupInitialRebalanceDelayMs = genericGroupInitialRebalanceDelayMs;
this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
Expand Down Expand Up @@ -492,9 +493,49 @@ public CompletableFuture<OffsetCommitResponseData> commitOffsets(
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}

return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
// For backwards compatibility, we support offset commits for the empty groupId.
if (request.groupId() == null) {
return CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
request,
Errors.INVALID_GROUP_ID
));
}

return runtime.scheduleWriteOperation(
"commit-offset",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.commitOffset(context, request)
).exceptionally(exception -> {
if (exception instanceof UnknownTopicOrPartitionException ||
exception instanceof NotEnoughReplicasException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.COORDINATOR_NOT_AVAILABLE
);
}

if (exception instanceof NotLeaderOrFollowerException ||
exception instanceof KafkaStorageException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.NOT_COORDINATOR
);
}

if (exception instanceof RecordTooLargeException ||
exception instanceof RecordBatchTooLargeException ||
exception instanceof InvalidFetchSizeException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.INVALID_COMMIT_OFFSET_SIZE
);
}

return OffsetCommitRequest.getErrorResponse(
request,
Errors.forException(exception)
);
});
}

/**
Expand Down
Loading