From 61dc7c211ba1d8cab00374e0d576a937ffe5574a Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 16 Jan 2023 09:10:09 +0800 Subject: [PATCH] [COMMON] add assignor and state to ConsumerGroup (#1441) --- .../org/astraea/common/admin/AdminImpl.java | 2 ++ .../astraea/common/admin/ConsumerGroup.java | 18 +++++++++++++++++- .../org/astraea/common/admin/AdminTest.java | 12 ++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/astraea/common/admin/AdminImpl.java b/common/src/main/java/org/astraea/common/admin/AdminImpl.java index b87d92a925..fb8bafabb0 100644 --- a/common/src/main/java/org/astraea/common/admin/AdminImpl.java +++ b/common/src/main/java/org/astraea/common/admin/AdminImpl.java @@ -519,6 +519,8 @@ public CompletionStage> consumerGroups(Set consumerG groupId -> new ConsumerGroup( groupId, + consumerGroupDescriptions.get(groupId).partitionAssignor(), + consumerGroupDescriptions.get(groupId).state().name(), NodeInfo.of(consumerGroupDescriptions.get(groupId).coordinator()), consumerGroupMetadata.get(groupId).entrySet().stream() .collect( diff --git a/common/src/main/java/org/astraea/common/admin/ConsumerGroup.java b/common/src/main/java/org/astraea/common/admin/ConsumerGroup.java index ceba6e0bbb..de50902c08 100644 --- a/common/src/main/java/org/astraea/common/admin/ConsumerGroup.java +++ b/common/src/main/java/org/astraea/common/admin/ConsumerGroup.java @@ -23,16 +23,24 @@ public class ConsumerGroup { private final String groupId; + private final String assignor; + + private final String state; + private final NodeInfo coordinator; private final Map consumeProgress; private final Map> assignment; - public ConsumerGroup( + ConsumerGroup( String groupId, + String assignor, + String state, NodeInfo coordinator, Map consumeProgress, Map> assignment) { this.groupId = Objects.requireNonNull(groupId); + this.assignor = assignor; + this.state = state; this.coordinator = coordinator; this.consumeProgress = Map.copyOf(consumeProgress); this.assignment = Map.copyOf(assignment); @@ -53,4 +61,12 @@ public Map> assignment() { public Map consumeProgress() { return consumeProgress; } + + public String assignor() { + return assignor; + } + + public String state() { + return state; + } } diff --git a/common/src/test/java/org/astraea/common/admin/AdminTest.java b/common/src/test/java/org/astraea/common/admin/AdminTest.java index 614db19347..8385272a81 100644 --- a/common/src/test/java/org/astraea/common/admin/AdminTest.java +++ b/common/src/test/java/org/astraea/common/admin/AdminTest.java @@ -762,6 +762,18 @@ void testConsumerGroups() { Assertions.assertEquals( 1, admin.consumerGroups(Set.of("abc")).toCompletableFuture().join().size()); + admin + .consumerGroups(admin.consumerGroupIds().toCompletableFuture().join()) + .toCompletableFuture() + .join() + .forEach( + c -> { + Assertions.assertNotNull(c.groupId()); + Assertions.assertNotNull(c.coordinator()); + Assertions.assertNotNull(c.assignor()); + Assertions.assertNotNull(c.state()); + }); + // test internal Assertions.assertNotEquals( 0,