Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add earliest/latest/maxTimestamp to AsyncAdmin #960

Merged
merged 1 commit into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/main/java/org/astraea/common/admin/AsyncAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ static AsyncAdmin of(Map<String, String> configs) {
*/
CompletionStage<Set<TopicPartitionReplica>> topicPartitionReplicas(Set<Integer> brokers);

CompletionStage<Map<TopicPartition, Long>> earliestOffsets(Set<TopicPartition> topicPartitions);

CompletionStage<Map<TopicPartition, Long>> latestOffsets(Set<TopicPartition> topicPartitions);

CompletionStage<Map<TopicPartition, Long>> maxTimestamps(Set<TopicPartition> topicPartitions);

CompletionStage<List<Partition>> partitions(Set<String> topics);

/** @return online node information */
Expand Down
251 changes: 175 additions & 76 deletions common/src/main/java/org/astraea/common/admin/AsyncAdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,124 @@ public CompletionStage<Set<TopicPartitionReplica>> topicPartitionReplicas(Set<In
.collect(Collectors.toCollection(TreeSet::new)));
}

/**
* Some requests get blocked when there are offline brokers. In order to avoid blocking, this
* method returns the fetchable partitions.
*/
private CompletionStage<Set<TopicPartition>> updatableTopicPartitions(Set<String> topics) {
if (topics.isEmpty()) return CompletableFuture.completedFuture(Set.of());
return to(kafkaAdmin.describeTopics(topics).all())
.thenApply(
ts ->
ts.entrySet().stream()
// kafka update metadata based on topic, so it may get blocked if there is one
// offline partition
.filter(
e ->
e.getValue().partitions().stream()
.allMatch(tp -> tp.leader() != null && !tp.leader().isEmpty()))
.flatMap(
e ->
e.getValue().partitions().stream()
.map(tp -> TopicPartition.of(e.getKey(), tp.partition())))
.collect(Collectors.toSet()));
}

@Override
public CompletionStage<Map<TopicPartition, Long>> earliestOffsets(
Set<TopicPartition> topicPartitions) {
if (topicPartitions.isEmpty()) return CompletableFuture.completedFuture(Map.of());
return updatableTopicPartitions(
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
.thenCompose(
ps ->
to(kafkaAdmin
.listOffsets(
ps.stream()
.collect(
Collectors.toMap(
TopicPartition::to,
ignored -> new OffsetSpec.EarliestSpec())))
.all())
.thenApply(
result ->
ps.stream()
.collect(
Utils.toSortedMap(
tp -> tp,
tp ->
Optional.ofNullable(result.get(TopicPartition.to(tp)))
.map(
ListOffsetsResult.ListOffsetsResultInfo::offset)
.orElse(-1L)))));
}

@Override
public CompletionStage<Map<TopicPartition, Long>> latestOffsets(
Set<TopicPartition> topicPartitions) {
if (topicPartitions.isEmpty()) return CompletableFuture.completedFuture(Map.of());
return updatableTopicPartitions(
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
.thenCompose(
ps ->
to(kafkaAdmin
.listOffsets(
ps.stream()
.collect(
Collectors.toMap(
TopicPartition::to,
ignored -> new OffsetSpec.LatestSpec())))
.all())
.thenApply(
result ->
ps.stream()
.collect(
Utils.toSortedMap(
tp -> tp,
tp ->
Optional.ofNullable(result.get(TopicPartition.to(tp)))
.map(
ListOffsetsResult.ListOffsetsResultInfo::offset)
.orElse(-1L)))));
}

@Override
public CompletionStage<Map<TopicPartition, Long>> maxTimestamps(
Set<TopicPartition> topicPartitions) {
if (topicPartitions.isEmpty()) return CompletableFuture.completedFuture(Map.of());
return updatableTopicPartitions(
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
.thenApply(
ps -> {
System.out.println("ps:" + ps.size());
return ps;
})
.thenCompose(
ps ->
to(kafkaAdmin
.listOffsets(
ps.stream()
.collect(
Collectors.toMap(
TopicPartition::to,
ignored -> new OffsetSpec.MaxTimestampSpec())))
.all())
// the old kafka does not support to fetch max timestamp
.exceptionally(e -> Map.of())
.thenApply(
result ->
ps.stream()
.collect(
Utils.toSortedMap(
tp -> tp,
tp ->
Optional.ofNullable(result.get(TopicPartition.to(tp)))
.map(
ListOffsetsResult.ListOffsetsResultInfo
::timestamp)
.orElse(-1L)))));
}

@Override
public CompletionStage<List<Partition>> partitions(Set<String> topics) {
if (topics.isEmpty()) return CompletableFuture.completedFuture(List.of());
Expand All @@ -262,84 +380,65 @@ public CompletionStage<List<Partition>> partitions(Set<String> topics) {
.map(
tp ->
Map.entry(
new org.apache.kafka.common.TopicPartition(
e.getKey(), tp.partition()),
tp)))
TopicPartition.of(e.getKey(), tp.partition()), tp)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

return allPartitions.thenCompose(
partitionInfos -> {
// kafka admin update metadata based on topic, so we skip topics hosted by offline node
var availablePartitions =
partitionInfos.entrySet().stream()
.collect(Collectors.groupingBy(e -> e.getKey().topic()))
.entrySet()
.stream()
.filter(
e ->
e.getValue().stream()
.allMatch(
e2 ->
e2.getValue().leader() != null
&& !e2.getValue().leader().isEmpty()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return to(kafkaAdmin
.listOffsets(
availablePartitions.keySet().stream()
.collect(
Collectors.toMap(
Function.identity(), e -> new OffsetSpec.EarliestSpec())))
.all())
.thenCompose(
earliest ->
to(kafkaAdmin
.listOffsets(
availablePartitions.keySet().stream()
.collect(
Collectors.toMap(
Function.identity(),
e -> new OffsetSpec.LatestSpec())))
.all())
.thenCompose(
latest ->
to(kafkaAdmin
.listOffsets(
availablePartitions.keySet().stream()
.collect(
Collectors.toMap(
Function.identity(),
e -> new OffsetSpec.MaxTimestampSpec())))
.all())
// the old kafka does not support to fetch max timestamp
.handle(
(r, e) ->
e == null
? r
: Map
.<String,
ListOffsetsResult.ListOffsetsResultInfo>
of())
.thenApply(
maxTimestamp ->
partitionInfos.entrySet().stream()
.map(
entry ->
Partition.of(
entry.getKey().topic(),
entry.getValue(),
Optional.ofNullable(
earliest.get(entry.getKey())),
Optional.ofNullable(
latest.get(entry.getKey())),
Optional.ofNullable(
maxTimestamp.get(
entry.getKey()))))
.sorted(
Comparator.comparing(Partition::topic)
.thenComparing(Partition::partition))
.collect(Collectors.toList()))));
});
return updatableTopicPartitions(topics)
.thenCompose(
tps ->
earliestOffsets(tps)
.thenCompose(
earliestOffsets ->
latestOffsets(tps)
.thenCompose(
latestOffsets ->
maxTimestamps(tps)
.thenCombine(
allPartitions,
(maxTimestamps, tpInfos) ->
tpInfos.keySet().stream()
.map(
tp -> {
var earliest =
earliestOffsets.getOrDefault(
tp, -1L);
var latest =
latestOffsets.getOrDefault(
tp, -1L);
var maxTimestamp =
maxTimestamps.getOrDefault(
tp, -1L);
var tpInfo = tpInfos.get(tp);
var leader =
tpInfo.leader() == null
|| tpInfo
.leader()
.isEmpty()
? null
: NodeInfo.of(
tpInfo.leader());
var replicas =
tpInfo.replicas().stream()
.map(NodeInfo::of)
.collect(Collectors.toList());
var isr =
tpInfo.isr().stream()
.map(NodeInfo::of)
.collect(Collectors.toList());
return Partition.of(
tp.topic(),
tp.partition(),
leader,
replicas,
isr,
earliest,
latest,
maxTimestamp);
})
.sorted(
Comparator.comparing(
Partition::topicPartition))
.collect(Collectors.toList())))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Utils;
import org.astraea.it.RequireBrokerCluster;
import org.junit.jupiter.api.Assertions;
Expand All @@ -31,8 +32,15 @@ public class AsyncAdminWithOfflineBrokerTest extends RequireBrokerCluster {

private static final String TOPIC_NAME = Utils.randomString();
private static final int PARTITIONS = 10;

private static final Set<TopicPartition> TOPIC_PARTITIONS =
IntStream.range(0, PARTITIONS)
.mapToObj(id -> TopicPartition.of(TOPIC_NAME, id))
.collect(Collectors.toSet());
private static final int CLOSED_BROKER_ID = brokerIds().iterator().next();

private static int NUMBER_OF_ONLINE_PARTITIONS = -1;

@BeforeAll
static void closeOneBroker() throws ExecutionException, InterruptedException {
try (var admin = AsyncAdmin.of(bootstrapServers())) {
Expand All @@ -44,6 +52,9 @@ static void closeOneBroker() throws ExecutionException, InterruptedException {
.toCompletableFuture()
.get();
var allPs = admin.partitions(Set.of(TOPIC_NAME)).toCompletableFuture().get();
NUMBER_OF_ONLINE_PARTITIONS =
PARTITIONS
- (int) allPs.stream().filter(p -> p.leader().get().id() == CLOSED_BROKER_ID).count();
Assertions.assertEquals(PARTITIONS, allPs.size());
Utils.sleep(Duration.ofSeconds(2));
}
Expand Down Expand Up @@ -92,8 +103,11 @@ void testPartitions() throws ExecutionException, InterruptedException {
// there is no more data, so all replicas are in-sync
Assertions.assertEquals(1, p.isr().size());
p.isr().forEach(n -> Assertions.assertTrue(n.offline()));
Assertions.assertEquals(-1, p.maxTimestamp());
Assertions.assertEquals(-1, p.earliestOffset());
Assertions.assertEquals(-1, p.latestOffset());
});
Assertions.assertNotEquals(0, offlinePartitions.size());
Assertions.assertEquals(PARTITIONS - NUMBER_OF_ONLINE_PARTITIONS, offlinePartitions.size());
}
}

Expand Down Expand Up @@ -149,4 +163,31 @@ void testTopics() throws ExecutionException, InterruptedException {
1, admin.topics(Set.of(TOPIC_NAME)).toCompletableFuture().get().size());
}
}

@Timeout(10)
@Test
void testEarliest() throws ExecutionException, InterruptedException {
try (var admin = AsyncAdmin.of(bootstrapServers())) {
Assertions.assertEquals(
0, admin.earliestOffsets(TOPIC_PARTITIONS).toCompletableFuture().get().size());
}
}

@Timeout(10)
@Test
void testLatest() throws ExecutionException, InterruptedException {
try (var admin = AsyncAdmin.of(bootstrapServers())) {
Assertions.assertEquals(
0, admin.latestOffsets(TOPIC_PARTITIONS).toCompletableFuture().get().size());
}
}

@Timeout(10)
@Test
void testMaxTimestamp() throws ExecutionException, InterruptedException {
try (var admin = AsyncAdmin.of(bootstrapServers())) {
Assertions.assertEquals(
0, admin.maxTimestamps(TOPIC_PARTITIONS).toCompletableFuture().get().size());
}
}
}