From d2811ded80f0b29f9106f295f5d32ddfc90b48a6 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 14 Oct 2021 02:19:31 +0800 Subject: [PATCH 1/2] List memebrs of partitions --- .../org/astraea/offset/OffsetExplorer.java | 8 +- .../java/org/astraea/topic/TopicAdmin.java | 168 +++++++++++++----- .../astraea/offset/OffsetExplorerTest.java | 12 +- 3 files changed, 134 insertions(+), 54 deletions(-) diff --git a/app/src/main/java/org/astraea/offset/OffsetExplorer.java b/app/src/main/java/org/astraea/offset/OffsetExplorer.java index 28c93468fd..9f80e8c13f 100644 --- a/app/src/main/java/org/astraea/offset/OffsetExplorer.java +++ b/app/src/main/java/org/astraea/offset/OffsetExplorer.java @@ -29,8 +29,12 @@ static class Result { this.partition = partition; this.earliestOffset = startOffset; this.latestOffset = endOffset; - this.groups = groups; - this.replicas = replicas; + this.groups = + groups.stream().sorted(Comparator.comparing(g -> g.groupId)).collect(Collectors.toList()); + this.replicas = + replicas.stream() + .sorted(Comparator.comparing(r -> r.broker)) + .collect(Collectors.toList()); } @Override diff --git a/app/src/main/java/org/astraea/topic/TopicAdmin.java b/app/src/main/java/org/astraea/topic/TopicAdmin.java index a8837338e8..1ad62917b9 100644 --- a/app/src/main/java/org/astraea/topic/TopicAdmin.java +++ b/app/src/main/java/org/astraea/topic/TopicAdmin.java @@ -43,38 +43,69 @@ public void reassign(String topicName, int partition, Set brokers) { @Override public Map> groups(Set topics) { - return Utils.handleException( - () -> - admin.listConsumerGroups().valid().get().stream() - .map(ConsumerGroupListing::groupId) - .collect(Collectors.toSet())) - .stream() - .flatMap( - group -> - Utils.handleException( - () -> - admin - .listConsumerGroupOffsets(group) - .partitionsToOffsetAndMetadata() - .get()) - .entrySet() - .stream() - .filter(e -> topics.contains(e.getKey().topic())) - .map(e -> Map.entry(group, e))) - .collect(Collectors.groupingBy(e -> e.getValue().getKey())) - .entrySet() - .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - e.getValue().stream() - .map( - groupOffset -> - new Group( - groupOffset.getKey(), - groupOffset.getValue().getValue().offset())) - .collect(Collectors.toList()))); + var groups = + Utils.handleException(() -> admin.listConsumerGroups().valid().get()).stream() + .map(ConsumerGroupListing::groupId) + .collect(Collectors.toList()); + + var allPartitions = partitions(topics); + + var result = new HashMap>(); + Utils.handleException(() -> admin.describeConsumerGroups(groups).all().get()) + .forEach( + (groupId, groupDescription) -> { + var partitionOffsets = + Utils.handleException( + () -> + admin + .listConsumerGroupOffsets(groupId) + .partitionsToOffsetAndMetadata() + .get()); + + var partitionMembers = + groupDescription.members().stream() + .flatMap( + m -> + m.assignment().topicPartitions().stream() + .map(tp -> Map.entry(tp, m))) + .collect(Collectors.groupingBy(Map.Entry::getKey)) + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getValue().stream() + .map(Map.Entry::getValue) + .collect(Collectors.toList()))); + + allPartitions.forEach( + tp -> { + var offset = + partitionOffsets.containsKey(tp) + ? OptionalLong.of(partitionOffsets.get(tp).offset()) + : OptionalLong.empty(); + var members = + partitionMembers.getOrDefault(tp, List.of()).stream() + .map( + m -> + new GroupMember( + m.consumerId(), + m.groupInstanceId(), + m.clientId(), + m.host())) + .collect(Collectors.toList()); + // This group is related to the partition only if it has either member or + // offset. + if (offset.isPresent() || !members.isEmpty()) { + result + .computeIfAbsent(tp, ignore -> new ArrayList<>()) + .add(new Group(groupId, offset, members)); + } + }); + }); + + return result; } private Map earliestOffset(Set partitions) { @@ -114,15 +145,7 @@ public Set topics() { @Override public Map offset(Set topics) { - var partitions = - Utils.handleException( - () -> - admin.describeTopics(topics).all().get().entrySet().stream() - .flatMap( - e -> - e.getValue().partitions().stream() - .map(p -> new TopicPartition(e.getKey(), p.partition()))) - .collect(Collectors.toSet())); + var partitions = partitions(topics); var earliest = earliestOffset(partitions); var latest = latestOffset(partitions); return earliest.entrySet().stream() @@ -132,6 +155,17 @@ public Map offset(Set topics) { Map.Entry::getKey, e -> new Offset(e.getValue(), latest.get(e.getKey())))); } + private Set partitions(Set topics) { + return Utils.handleException( + () -> + admin.describeTopics(topics).all().get().entrySet().stream() + .flatMap( + e -> + e.getValue().partitions().stream() + .map(p -> new TopicPartition(e.getKey(), p.partition()))) + .collect(Collectors.toSet())); + } + @Override public Map> replicas(Set topics) { var lags = @@ -224,17 +258,59 @@ public Map> replicas(Set topics) { void reassign(String topicName, int partition, Set brokers); class Group { - public final String id; - public final long offset; + public final String groupId; + public final OptionalLong offset; + public final List members; - public Group(String id, long offset) { - this.id = id; + public Group(String groupId, OptionalLong offset, List members) { + this.groupId = groupId; this.offset = offset; + this.members = members; } @Override public String toString() { - return "Group{" + "id='" + id + '\'' + ", offset=" + offset + '}'; + return "Group{" + + "groupId='" + + groupId + + '\'' + + ", offset=" + + (offset.isEmpty() ? "none" : offset.getAsLong()) + + ", members=" + + members + + '}'; + } + } + + class GroupMember { + private final String memberId; + private final Optional groupInstanceId; + private final String clientId; + private final String host; + + public GroupMember( + String memberId, Optional groupInstanceId, String clientId, String host) { + this.memberId = memberId; + this.groupInstanceId = groupInstanceId; + this.clientId = clientId; + this.host = host; + } + + @Override + public String toString() { + return "GroupMember{" + + "memberId='" + + memberId + + '\'' + + ", groupInstanceId=" + + groupInstanceId + + ", clientId='" + + clientId + + '\'' + + ", host='" + + host + + '\'' + + '}'; } } diff --git a/app/src/test/java/org/astraea/offset/OffsetExplorerTest.java b/app/src/test/java/org/astraea/offset/OffsetExplorerTest.java index ef9e8703c7..fd2769fe12 100644 --- a/app/src/test/java/org/astraea/offset/OffsetExplorerTest.java +++ b/app/src/test/java/org/astraea/offset/OffsetExplorerTest.java @@ -1,8 +1,6 @@ package org.astraea.offset; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.kafka.common.TopicPartition; import org.astraea.topic.TopicAdmin; import org.junit.jupiter.api.Assertions; @@ -38,7 +36,9 @@ public Map offset(Set topics) { @Override public Map> groups(Set topics) { - return Map.of(topicPartition, List.of(new Group(groupId, groupOffset))); + return Map.of( + topicPartition, + List.of(new Group(groupId, OptionalLong.of(groupOffset), List.of()))); } @Override @@ -66,8 +66,8 @@ public void close() {} Assertions.assertEquals(earliestOffset, result.get(0).earliestOffset); Assertions.assertEquals(latestOffset, result.get(0).latestOffset); Assertions.assertEquals(1, result.get(0).groups.size()); - Assertions.assertEquals(groupId, result.get(0).groups.get(0).id); - Assertions.assertEquals(groupOffset, result.get(0).groups.get(0).offset); + Assertions.assertEquals(groupId, result.get(0).groups.get(0).groupId); + Assertions.assertEquals(groupOffset, result.get(0).groups.get(0).offset.getAsLong()); Assertions.assertEquals(1, result.get(0).replicas.size()); Assertions.assertEquals(brokerId, result.get(0).replicas.get(0).broker); Assertions.assertEquals(lag, result.get(0).replicas.get(0).lag); From 0e62fb53be1aae60fb046f56dbb1d2b4dde4a69e Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 14 Oct 2021 02:24:47 +0800 Subject: [PATCH 2/2] Rename GroupMember to Member --- app/src/main/java/org/astraea/topic/TopicAdmin.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/app/src/main/java/org/astraea/topic/TopicAdmin.java b/app/src/main/java/org/astraea/topic/TopicAdmin.java index 1ad62917b9..a3f21485b8 100644 --- a/app/src/main/java/org/astraea/topic/TopicAdmin.java +++ b/app/src/main/java/org/astraea/topic/TopicAdmin.java @@ -89,7 +89,7 @@ public Map> groups(Set topics) { partitionMembers.getOrDefault(tp, List.of()).stream() .map( m -> - new GroupMember( + new Member( m.consumerId(), m.groupInstanceId(), m.clientId(), @@ -260,9 +260,9 @@ public Map> replicas(Set topics) { class Group { public final String groupId; public final OptionalLong offset; - public final List members; + public final List members; - public Group(String groupId, OptionalLong offset, List members) { + public Group(String groupId, OptionalLong offset, List members) { this.groupId = groupId; this.offset = offset; this.members = members; @@ -282,14 +282,13 @@ public String toString() { } } - class GroupMember { + class Member { private final String memberId; private final Optional groupInstanceId; private final String clientId; private final String host; - public GroupMember( - String memberId, Optional groupInstanceId, String clientId, String host) { + public Member(String memberId, Optional groupInstanceId, String clientId, String host) { this.memberId = memberId; this.groupInstanceId = groupInstanceId; this.clientId = clientId; @@ -298,7 +297,7 @@ public GroupMember( @Override public String toString() { - return "GroupMember{" + return "Member{" + "memberId='" + memberId + '\''