Skip to content

Commit

Permalink
KAFKA-17750: Extend kafka-consumer-groups command line tool to suppor…
Browse files Browse the repository at this point in the history
…t new consumer group (part 1)

Signed-off-by: PoAn Yang <payang@apache.org>
  • Loading branch information
FrankYang0529 committed Nov 28, 2024
1 parent e1ba01d commit be89eba
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
"type": "request",
"listeners": ["broker"],
"name": "ConsumerGroupDescribeRequest",
"validVersions": "0",
// Version 1 adds IsClassic field to ConsumerGroupDescribeResponse (KIP-1099).
// For ConsumerGroupDescribeRequest, version 1 is same as version 0.
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"apiKey": 69,
"type": "response",
"name": "ConsumerGroupDescribeResponse",
"validVersions": "0",
// Version 1 adds IsClassic field (KIP-1099).
"validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
Expand Down Expand Up @@ -69,7 +70,9 @@
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." },
{ "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
"about": "The target assignment." }
"about": "The target assignment." },
{ "name": "IsClassic", "type": "bool", "versions": "1+", "ignorable": true,
"about": "True for classic member." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(
.setInstanceId(instanceId)
.setRackId(rackId)
.setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames))
.setSubscribedTopicRegex(subscribedTopicRegex);
.setSubscribedTopicRegex(subscribedTopicRegex)
.setIsClassic(useClassicProtocol());
}

private static List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitionsFromMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.kafka.image.MetadataImage;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -246,8 +248,9 @@ public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation());
}

@Test
public void testAsConsumerGroupDescribeMember() {
@ParameterizedTest(name = "{displayName}.withClassicMemberMetadata={0}")
@ValueSource(booleans = {true, false})
public void testAsConsumerGroupDescribeMember(boolean withClassicMemberMetadata) {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
Uuid topicId3 = Uuid.randomUuid();
Expand Down Expand Up @@ -287,6 +290,8 @@ public void testAsConsumerGroupDescribeMember() {
.setClientHost(clientHost)
.setSubscribedTopicNames(subscribedTopicNames)
.setSubscribedTopicRegex(subscribedTopicRegex)
.setClassicMemberMetadata(withClassicMemberMetadata ? new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSupportedProtocols(toClassicProtocolCollection("range")) : null)
.build();

ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics());
Expand Down Expand Up @@ -315,7 +320,8 @@ public void testAsConsumerGroupDescribeMember() {
.setTopicName("topic4")
.setPartitions(new ArrayList<>(item.getValue()))
).collect(Collectors.toList()))
);
)
.setIsClassic(withClassicMemberMetadata);

assertEquals(expected, actual);
}
Expand Down

0 comments on commit be89eba

Please sign in to comment.