Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partitionScore #182

Merged
merged 13 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ This tool will score the partition on brokers, the higher score the heavier load
### Partition Score Configurations

1. --bootstrap.servers: the server to connect to
2. --exclude.internal.topics: True if you want to ignore internal topics like _consumer_offsets while counting score.
3. --hide.balanced: True if you want to hide topics and partitions thar already balanced.:q

## Kafka Replica Syncing Monitor

Expand Down
141 changes: 88 additions & 53 deletions app/src/main/java/org/astraea/topic/cost/CalculateUtils.java
Original file line number Diff line number Diff line change
@@ -1,79 +1,114 @@
package org.astraea.topic.cost;

import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

public class CalculateUtils {
public static Map<Integer, Map<TopicPartition, Double>> getLoad(
Map<Integer, Map<TopicPartition, Integer>> brokerPartitionSize,
Map<String, Integer> retentionMillis) {
Map<Integer, Map<TopicPartition, Double>> brokerPartitionLoad = new HashMap<>();
double load;
for (var broker : brokerPartitionSize.keySet()) {
Map<TopicPartition, Double> partitionLoad = new HashMap<>();
for (var partition : brokerPartitionSize.get(broker).keySet()) {
load =
(double) brokerPartitionSize.get(broker).get(partition)
/ retentionMillis.get(partition.topic());
partitionLoad.put(partition, load);
}
brokerPartitionLoad.put(broker, partitionLoad);
}

brokerPartitionSize
.keySet()
.forEach(
(broker) -> {
Map<TopicPartition, Double> partitionLoad =
brokerPartitionSize.get(broker).keySet().stream()
.filter(partition -> retentionMillis.containsKey(partition.topic()))
.collect(
Collectors.toMap(
Function.identity(),
partition ->
(double) brokerPartitionSize.get(broker).get(partition)
/ retentionMillis.get(partition.topic())));
brokerPartitionLoad.put(broker, partitionLoad);
});
return brokerPartitionLoad;
}

private static Double countSum(Set<Double> in) {
return in.stream().mapToDouble(i -> i).sum();
}

public static Map<Integer, Map<TopicPartition, Double>> getScore(
Map<Integer, Map<TopicPartition, Double>> load) {
Map<Integer, Double> brokerLoad = new HashMap<>();
Map<TopicPartition, Double> partitionLoad = new HashMap<>();
Map<Integer, Double> partitionSD = new HashMap<>();
Map<Integer, Double> partitionMean = new HashMap<>();
Map<Integer, Map<TopicPartition, Double>> brokerPartitionScore = new HashMap<>();
double brokerSD;
Set<Double> loadSet = new HashSet<>();
Set<Double> LoadSQR = new HashSet<>();

double mean = 0f, LoadSQR = 0f, SD, brokerSD;
int partitionNum;
for (var broker : load.keySet()) {
for (var topicPartition : load.get(broker).keySet()) {
mean += load.get(broker).get(topicPartition);
partitionLoad.put(topicPartition, load.get(broker).get(topicPartition));
LoadSQR += Math.pow(load.get(broker).get(topicPartition), 2);
}
partitionNum = load.get(broker).keySet().size();
brokerLoad.put(broker, mean);
mean /= partitionNum;
partitionMean.put(broker, mean);
SD = Math.pow((LoadSQR - mean * mean * partitionNum) / partitionNum, 0.5);
partitionSD.put(broker, SD);
mean = 0f;
LoadSQR = 0f;
}
load.keySet()
.forEach(
broker -> {
load.get(broker)
.keySet()
.forEach(
tp -> {
loadSet.add(load.get(broker).get(tp));
partitionLoad.put(tp, load.get(broker).get(tp));
LoadSQR.add(Math.pow(load.get(broker).get(tp), 2));
});
var loadSum = countSum(loadSet);
var partitionNum = load.get(broker).keySet().size();
brokerLoad.put(broker, loadSum);
var mean = loadSum / load.get(broker).size();
partitionMean.put(broker, loadSum / load.get(broker).size());
var SD =
Math.pow((countSum(LoadSQR) - mean * mean * partitionNum) / partitionNum, 0.5);
partitionSD.put(broker, SD);
loadSet.clear();
LoadSQR.clear();
});

for (var i : brokerLoad.keySet()) {
mean += brokerLoad.get(i);
LoadSQR += Math.pow(brokerLoad.get(i), 2);
}
mean /= brokerLoad.keySet().size();
brokerLoad
.keySet()
.forEach(
broker -> {
loadSet.add(brokerLoad.get(broker));
LoadSQR.add(Math.pow(brokerLoad.get(broker), 2));
});
var brokerLoadMean = countSum(loadSet) / brokerLoad.keySet().size();
var brokerLoadSQR = countSum(LoadSQR);
brokerSD =
Math.pow(
(LoadSQR - mean * mean * brokerLoad.keySet().size()) / brokerLoad.keySet().size(), 0.5);
for (var broker : load.keySet()) {
Map<TopicPartition, Double> partitionScore = new HashMap<>();
for (var topicPartition : load.get(broker).keySet()) {
if (brokerLoad.get(broker) - mean > 0) {
partitionScore.put(
topicPartition,
((brokerLoad.get(broker) - mean) / brokerSD)
* ((partitionLoad.get(topicPartition) - partitionMean.get(broker))
/ partitionSD.get(broker))
* 60.0);

} else {
partitionScore.put(topicPartition, 0.0);
}
brokerPartitionScore.put(broker, partitionScore);
}
}
(brokerLoadSQR - brokerLoadMean * brokerLoadMean * brokerLoad.keySet().size())
/ brokerLoad.keySet().size(),
0.5);
load.keySet()
.forEach(
broker -> {
Map<TopicPartition, Double> partitionScore =
new TreeMap<>(
Comparator.comparing(TopicPartition::topic)
.thenComparing(TopicPartition::partition));
load.get(broker)
.keySet()
.forEach(
topicPartition -> {
if (brokerLoad.get(broker) - brokerLoadMean > 0) {
partitionScore.put(
topicPartition,
Math.round(
(((brokerLoad.get(broker) - brokerLoadMean) / brokerSD)
* ((partitionLoad.get(topicPartition)
- partitionMean.get(broker))
/ partitionSD.get(broker))
* 60.0)
* 100.0)
/ 100.0);
} else {
partitionScore.put(topicPartition, 0.0);
}
brokerPartitionScore.put(broker, partitionScore);
});
});
return brokerPartitionScore;
}
}
32 changes: 21 additions & 11 deletions app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,31 @@
public class GetPartitionInf {
static Map<Integer, Map<TopicPartition, Integer>> getSize(TopicAdmin client) {
Map<Integer, Map<TopicPartition, Integer>> brokerPartitionSize = new HashMap<>();
for (var broker : client.brokerIds()) {
Map<TopicPartition, Integer> partitionSize = new HashMap<>();
for (var partition : client.replicas(client.publicTopics().keySet()).keySet())
if (client.replicas(client.publicTopics().keySet()).get(partition).get(0).broker()
== broker)
partitionSize.put(
partition,
(int) client.replicas(client.publicTopics().keySet()).get(partition).get(0).size());
brokerPartitionSize.put(broker, partitionSize);
}
client
.brokerIds()
.forEach(
(broker) -> {
TreeMap<TopicPartition, Integer> partitionSize =
new TreeMap<>(
Comparator.comparing(TopicPartition::topic)
.thenComparing(TopicPartition::partition));
client
.replicas(client.topicNames())
.forEach(
(tp, assignment) -> {
assignment.forEach(
partition -> {
if (partition.broker() == broker)
partitionSize.put(tp, (int) partition.size());
});
brokerPartitionSize.put(broker, partitionSize);
});
});
return brokerPartitionSize;
}

static Map<String, Integer> getRetentionMillis(TopicAdmin client) {
return client.publicTopics().entrySet().stream()
return client.topics().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
Expand Down
129 changes: 86 additions & 43 deletions app/src/main/java/org/astraea/topic/cost/PartitionScore.java
Original file line number Diff line number Diff line change
@@ -1,61 +1,104 @@
package org.astraea.topic.cost;

import static org.astraea.argument.ArgumentUtil.parseArgument;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.BooleanConverter;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.astraea.argument.ArgumentUtil;
import org.astraea.argument.BasicArgument;
import org.astraea.argument.validator.NotEmptyString;
import org.astraea.topic.TopicAdmin;

public class PartitionScore {
TopicAdmin admin;
Map<Integer, Map<TopicPartition, Integer>> brokerPartitionSize;
Map<Integer, Map<TopicPartition, Double>> score;
Map<Integer, Map<TopicPartition, Double>> load;
Map<String, Integer> retentionMillis;

public PartitionScore(String address) {
admin = TopicAdmin.of(address);
brokerPartitionSize = GetPartitionInf.getSize(admin);
retentionMillis = GetPartitionInf.getRetentionMillis(admin);
load = CalculateUtils.getLoad(brokerPartitionSize, retentionMillis);
score = CalculateUtils.getScore(load);
}
public static void printScore(
Map<Integer, Map<TopicPartition, Double>> score, Argument argument) {
List<TopicPartition> partitionGood = new ArrayList<>();
Map<Integer, Boolean> brokerGood = new HashMap<>();
score
.keySet()
.forEach(
broker -> {
brokerGood.put(broker, true);
score
.get(broker)
.keySet()
.forEach(
tp -> {
if (score.get(broker).get(tp) > 0) brokerGood.put(broker, false);
});

if (!brokerGood.get(broker)) {
System.out.println("\nbroker: " + broker);
score
.get(broker)
.keySet()
.forEach(
tp -> {
if (score.get(broker).get(tp) > 0) {
System.out.println(tp + ": " + score.get(broker).get(tp));
} else {
partitionGood.add(tp);
}
});
}
});
if (!argument.hideBalanced) {
System.out.println(
"\nThe following brokers are balanced: "
+ brokerGood.entrySet().stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));

public void printScore(Map<Integer, Map<TopicPartition, Double>> score) {
Set<TopicPartition> partitionGood = new HashSet<>();
Map<Integer, Boolean> BrokerGood = new HashMap<>();
for (var broker : score.keySet()) {
BrokerGood.put(broker, true);
for (var tp : score.get(broker).keySet())
if (score.get(broker).get(tp) > 0) BrokerGood.put(broker, false);
System.out.println();
if (BrokerGood.get(broker)) {
System.out.println("broker: " + broker + " is balanced.");
} else {
System.out.println("broker: " + broker);
for (var tp : score.get(broker).keySet()) {
if (score.get(broker).get(tp) > 0) {
System.out.println(tp + ": " + score.get(broker).get(tp));
} else {
partitionGood.add(tp);
}
}
}
partitionGood.sort(
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
System.out.println(
"The following partitions are balanced: "
+ partitionGood.stream()
.map(String::valueOf)
.collect(Collectors.joining(", ", "[", "]")));
}
}

System.out.print(
"\nThe following partitions are balanced: "
+ partitionGood.stream()
.map(String::valueOf)
.collect(Collectors.joining(", ", "[", "]")));
public static Map<Integer, Map<TopicPartition, Double>> execute(
Argument argument, TopicAdmin admin) {
var internalTopic =
Set.of(
"__consumer_offsets",
"_confluent-command",
"_confluent-metrics",
"_confluent-telemetry-metrics",
"__transaction_state");
var brokerPartitionSize = GetPartitionInf.getSize(admin);
var retentionMillis = GetPartitionInf.getRetentionMillis(admin);
if (argument.excludeInternalTopic) internalTopic.forEach(retentionMillis::remove);
var load = CalculateUtils.getLoad(brokerPartitionSize, retentionMillis);
return CalculateUtils.getScore(load);
}

public static void main(String[] args) {
var argument = ArgumentUtil.parseArgument(new Argument(), args);
var partitionScore = new PartitionScore(argument.brokers);
partitionScore.printScore(partitionScore.score);
var argument = parseArgument(new Argument(), args);
var admin = TopicAdmin.of(argument.brokers);
var score = execute(argument, admin);
printScore(score, argument);
}

static class Argument extends BasicArgument {}
static class Argument extends BasicArgument {
@Parameter(
names = {"--exclude.internal.topics"},
description =
"True if you want to ignore internal topics like _consumer_offsets while counting score.",
validateWith = NotEmptyString.class,
converter = BooleanConverter.class)
boolean excludeInternalTopic = false;

@Parameter(
names = {"--hide.balanced"},
description = "True if you want to hide topics and partitions thar already balanced.",
validateWith = NotEmptyString.class,
converter = BooleanConverter.class)
boolean hideBalanced = false;
}
}
Loading