Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List members of partitions #54

Merged
merged 2 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions app/src/main/java/org/astraea/offset/OffsetExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ static class Result {
this.partition = partition;
this.earliestOffset = startOffset;
this.latestOffset = endOffset;
this.groups = groups;
this.replicas = replicas;
this.groups =
groups.stream().sorted(Comparator.comparing(g -> g.groupId)).collect(Collectors.toList());
this.replicas =
replicas.stream()
.sorted(Comparator.comparing(r -> r.broker))
.collect(Collectors.toList());
}

@Override
Expand Down
167 changes: 121 additions & 46 deletions app/src/main/java/org/astraea/topic/TopicAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,69 @@ public void reassign(String topicName, int partition, Set<Integer> brokers) {

@Override
public Map<TopicPartition, List<Group>> groups(Set<String> topics) {
return Utils.handleException(
() ->
admin.listConsumerGroups().valid().get().stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toSet()))
.stream()
.flatMap(
group ->
Utils.handleException(
() ->
admin
.listConsumerGroupOffsets(group)
.partitionsToOffsetAndMetadata()
.get())
.entrySet()
.stream()
.filter(e -> topics.contains(e.getKey().topic()))
.map(e -> Map.entry(group, e)))
.collect(Collectors.groupingBy(e -> e.getValue().getKey()))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(
groupOffset ->
new Group(
groupOffset.getKey(),
groupOffset.getValue().getValue().offset()))
.collect(Collectors.toList())));
var groups =
Utils.handleException(() -> admin.listConsumerGroups().valid().get()).stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toList());

var allPartitions = partitions(topics);

var result = new HashMap<TopicPartition, List<Group>>();
Utils.handleException(() -> admin.describeConsumerGroups(groups).all().get())
.forEach(
(groupId, groupDescription) -> {
var partitionOffsets =
Utils.handleException(
() ->
admin
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get());

var partitionMembers =
groupDescription.members().stream()
.flatMap(
m ->
m.assignment().topicPartitions().stream()
.map(tp -> Map.entry(tp, m)))
.collect(Collectors.groupingBy(Map.Entry::getKey))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList())));

allPartitions.forEach(
tp -> {
var offset =
partitionOffsets.containsKey(tp)
? OptionalLong.of(partitionOffsets.get(tp).offset())
: OptionalLong.empty();
var members =
partitionMembers.getOrDefault(tp, List.of()).stream()
.map(
m ->
new Member(
m.consumerId(),
m.groupInstanceId(),
m.clientId(),
m.host()))
.collect(Collectors.toList());
// This group is related to the partition only if it has either member or
// offset.
if (offset.isPresent() || !members.isEmpty()) {
result
.computeIfAbsent(tp, ignore -> new ArrayList<>())
.add(new Group(groupId, offset, members));
}
});
});

return result;
}

private Map<TopicPartition, Long> earliestOffset(Set<TopicPartition> partitions) {
Expand Down Expand Up @@ -114,15 +145,7 @@ public Set<String> topics() {

@Override
public Map<TopicPartition, Offset> offset(Set<String> topics) {
var partitions =
Utils.handleException(
() ->
admin.describeTopics(topics).all().get().entrySet().stream()
.flatMap(
e ->
e.getValue().partitions().stream()
.map(p -> new TopicPartition(e.getKey(), p.partition())))
.collect(Collectors.toSet()));
var partitions = partitions(topics);
var earliest = earliestOffset(partitions);
var latest = latestOffset(partitions);
return earliest.entrySet().stream()
Expand All @@ -132,6 +155,17 @@ public Map<TopicPartition, Offset> offset(Set<String> topics) {
Map.Entry::getKey, e -> new Offset(e.getValue(), latest.get(e.getKey()))));
}

private Set<TopicPartition> partitions(Set<String> topics) {
return Utils.handleException(
() ->
admin.describeTopics(topics).all().get().entrySet().stream()
.flatMap(
e ->
e.getValue().partitions().stream()
.map(p -> new TopicPartition(e.getKey(), p.partition())))
.collect(Collectors.toSet()));
}

@Override
public Map<TopicPartition, List<Replica>> replicas(Set<String> topics) {
var lags =
Expand Down Expand Up @@ -224,17 +258,58 @@ public Map<TopicPartition, List<Replica>> replicas(Set<String> topics) {
void reassign(String topicName, int partition, Set<Integer> brokers);

class Group {
public final String id;
public final long offset;
public final String groupId;
public final OptionalLong offset;
public final List<Member> members;

public Group(String id, long offset) {
this.id = id;
public Group(String groupId, OptionalLong offset, List<Member> members) {
this.groupId = groupId;
this.offset = offset;
this.members = members;
}

@Override
public String toString() {
return "Group{" + "id='" + id + '\'' + ", offset=" + offset + '}';
return "Group{"
+ "groupId='"
+ groupId
+ '\''
+ ", offset="
+ (offset.isEmpty() ? "none" : offset.getAsLong())
+ ", members="
+ members
+ '}';
}
}

class Member {
private final String memberId;
private final Optional<String> groupInstanceId;
private final String clientId;
private final String host;

public Member(String memberId, Optional<String> groupInstanceId, String clientId, String host) {
this.memberId = memberId;
this.groupInstanceId = groupInstanceId;
this.clientId = clientId;
this.host = host;
}

@Override
public String toString() {
return "Member{"
+ "memberId='"
+ memberId
+ '\''
+ ", groupInstanceId="
+ groupInstanceId
+ ", clientId='"
+ clientId
+ '\''
+ ", host='"
+ host
+ '\''
+ '}';
}
}

Expand Down
12 changes: 6 additions & 6 deletions app/src/test/java/org/astraea/offset/OffsetExplorerTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.astraea.offset;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import org.apache.kafka.common.TopicPartition;
import org.astraea.topic.TopicAdmin;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -38,7 +36,9 @@ public Map<TopicPartition, Offset> offset(Set<String> topics) {

@Override
public Map<TopicPartition, List<Group>> groups(Set<String> topics) {
return Map.of(topicPartition, List.of(new Group(groupId, groupOffset)));
return Map.of(
topicPartition,
List.of(new Group(groupId, OptionalLong.of(groupOffset), List.of())));
}

@Override
Expand Down Expand Up @@ -66,8 +66,8 @@ public void close() {}
Assertions.assertEquals(earliestOffset, result.get(0).earliestOffset);
Assertions.assertEquals(latestOffset, result.get(0).latestOffset);
Assertions.assertEquals(1, result.get(0).groups.size());
Assertions.assertEquals(groupId, result.get(0).groups.get(0).id);
Assertions.assertEquals(groupOffset, result.get(0).groups.get(0).offset);
Assertions.assertEquals(groupId, result.get(0).groups.get(0).groupId);
Assertions.assertEquals(groupOffset, result.get(0).groups.get(0).offset.getAsLong());
Assertions.assertEquals(1, result.get(0).replicas.size());
Assertions.assertEquals(brokerId, result.get(0).replicas.get(0).broker);
Assertions.assertEquals(lag, result.get(0).replicas.get(0).lag);
Expand Down