Skip to content

Commit

Permalink
[COMMON] display all nodes in pref log
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Oct 26, 2024
1 parent 2e35711 commit a599f2f
Showing 1 changed file with 21 additions and 16 deletions.
37 changes: 21 additions & 16 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.astraea.app.argument.DataRateField;
import org.astraea.app.argument.DataSizeField;
import org.astraea.app.argument.DistributionTypeField;
Expand All @@ -56,6 +56,7 @@
import org.astraea.common.DistributionType;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.Partition;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
Expand Down Expand Up @@ -106,18 +107,24 @@ public static List<String> execute(final Argument param) {
() -> consumerThreads.stream().allMatch(AbstractThread::closed));

try (var admin = Admin.of(param.bootstrapServers())) {
Supplier<LongStream> sizes =
() ->
var matchedBrokers =
admin.brokers().toCompletableFuture().join().stream()
.filter(
b ->
b.topicPartitionPaths().stream()
.anyMatch(p -> param.topics.contains(p.topic())))
.map(Broker::host)
.collect(Collectors.toSet());
Function<String, Long> size =
host ->
admin.brokers().toCompletableFuture().join().stream()
.filter(
b ->
b.topicPartitionPaths().stream()
.anyMatch(p -> param.topics.contains(p.topic())))
.filter(b -> b.host().equals(host))
.mapToLong(
b ->
b.topicPartitionPaths().stream()
.mapToLong(TopicPartitionPath::size)
.sum());
.sum())
.sum();

var fileWriterTask =
CompletableFuture.completedFuture(
Expand All @@ -129,14 +136,12 @@ public static List<String> execute(final Argument param) {
() -> consumerThreads.stream().allMatch(AbstractThread::closed),
() -> producerThreads.stream().allMatch(AbstractThread::closed),
param.logInterval,
List.of(
ReportFormat.CSVContentElement.create(
"size", () -> String.valueOf(sizes.get().sum())),
ReportFormat.CSVContentElement.create(
"max size", () -> String.valueOf(sizes.get().max().getAsLong())),
ReportFormat.CSVContentElement.create(
"min size",
() -> String.valueOf(sizes.get().min().getAsLong())))))
matchedBrokers.stream()
.map(
host ->
ReportFormat.CSVContentElement.create(
host, () -> String.valueOf(size.apply(host))))
.toList()))
.thenAcceptAsync(Runnable::run);

var monkeys = MonkeyThread.play(consumerThreads, param);
Expand Down

0 comments on commit a599f2f

Please sign in to comment.