From 364302a719da549fbb0c44820285e1d184599e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=AB=E7=A5=A5=E9=88=9E?= Date: Thu, 6 Jan 2022 13:51:02 +0800 Subject: [PATCH 1/7] fix internal topic problen and add new arguments --- .../astraea/topic/cost/CalculateUtils.java | 28 +++-- .../astraea/topic/cost/GetPartitionInf.java | 29 +++-- .../astraea/topic/cost/PartitionScore.java | 117 ++++++++++++------ .../topic/cost/GetPartitionInfTest.java | 7 +- .../topic/cost/PartitionScoreTest.java | 14 --- 5 files changed, 116 insertions(+), 79 deletions(-) diff --git a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java index e7204bb92a..f105bc3b94 100644 --- a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java +++ b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java @@ -1,7 +1,9 @@ package org.astraea.topic.cost; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; import org.apache.kafka.common.TopicPartition; public class CalculateUtils { @@ -13,10 +15,12 @@ public static Map> getLoad( for (var broker : brokerPartitionSize.keySet()) { Map partitionLoad = new HashMap<>(); for (var partition : brokerPartitionSize.get(broker).keySet()) { - load = - (double) brokerPartitionSize.get(broker).get(partition) - / retentionMillis.get(partition.topic()); - partitionLoad.put(partition, load); + if (retentionMillis.containsKey(partition.topic())) { + load = + (double) brokerPartitionSize.get(broker).get(partition) + / retentionMillis.get(partition.topic()); + partitionLoad.put(partition, load); + } } brokerPartitionLoad.put(broker, partitionLoad); } @@ -58,16 +62,20 @@ public static Map> getScore( Math.pow( (LoadSQR - mean * mean * brokerLoad.keySet().size()) / brokerLoad.keySet().size(), 0.5); for (var broker : load.keySet()) { - Map partitionScore = new HashMap<>(); + Map partitionScore = + new TreeMap<>( + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); for (var topicPartition : load.get(broker).keySet()) { if (brokerLoad.get(broker) - mean > 0) { partitionScore.put( topicPartition, - ((brokerLoad.get(broker) - mean) / brokerSD) - * ((partitionLoad.get(topicPartition) - partitionMean.get(broker)) - / partitionSD.get(broker)) - * 60.0); - + Math.round( + (((brokerLoad.get(broker) - mean) / brokerSD) + * ((partitionLoad.get(topicPartition) - partitionMean.get(broker)) + / partitionSD.get(broker)) + * 60.0) + * 100.0) + / 100.0); } else { partitionScore.put(topicPartition, 0.0); } diff --git a/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java b/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java index 4216ea8b50..5485053845 100644 --- a/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java +++ b/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java @@ -8,21 +8,28 @@ public class GetPartitionInf { static Map> getSize(TopicAdmin client) { Map> brokerPartitionSize = new HashMap<>(); - for (var broker : client.brokerIds()) { - Map partitionSize = new HashMap<>(); - for (var partition : client.replicas(client.publicTopics().keySet()).keySet()) - if (client.replicas(client.publicTopics().keySet()).get(partition).get(0).broker() - == broker) - partitionSize.put( - partition, - (int) client.replicas(client.publicTopics().keySet()).get(partition).get(0).size()); - brokerPartitionSize.put(broker, partitionSize); - } + client + .brokerIds() + .forEach( + (broker) -> { + TreeMap partitionSize = + new TreeMap<>( + Comparator.comparing(TopicPartition::topic) + .thenComparing(TopicPartition::partition)); + client + .replicas(client.topicNames()) + .forEach( + (tp, assignment) -> { + if (assignment.get(0).broker() == broker) + partitionSize.put(tp, (int) assignment.get(0).size()); + brokerPartitionSize.put(broker, partitionSize); + }); + }); return brokerPartitionSize; } static Map getRetentionMillis(TopicAdmin client) { - return client.publicTopics().entrySet().stream() + return client.topics().entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, diff --git a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java index fcda459cdc..bcc7b555e9 100644 --- a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java +++ b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java @@ -1,5 +1,6 @@ package org.astraea.topic.cost; +import com.beust.jcommander.Parameter; import java.util.*; import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; @@ -8,54 +9,88 @@ import org.astraea.topic.TopicAdmin; public class PartitionScore { - TopicAdmin admin; - Map> brokerPartitionSize; - Map> score; - Map> load; - Map retentionMillis; + public static void printScore( + Map> score, Argument argument) { + List partitionGood = new ArrayList<>(); + Map brokerGood = new HashMap<>(); + score + .keySet() + .forEach( + broker -> { + brokerGood.put(broker, true); + score + .get(broker) + .keySet() + .forEach( + tp -> { + if (score.get(broker).get(tp) > 0) brokerGood.put(broker, false); + }); - public PartitionScore(String address) { - admin = TopicAdmin.of(address); - brokerPartitionSize = GetPartitionInf.getSize(admin); - retentionMillis = GetPartitionInf.getRetentionMillis(admin); - load = CalculateUtils.getLoad(brokerPartitionSize, retentionMillis); - score = CalculateUtils.getScore(load); - } + if (!brokerGood.get(broker)) { + System.out.println("\nbroker: " + broker); + score + .get(broker) + .keySet() + .forEach( + tp -> { + if (score.get(broker).get(tp) > 0) { + System.out.println(tp + ": " + score.get(broker).get(tp)); + } else { + partitionGood.add(tp); + } + }); + } + }); + if (!argument.hideBalanced) { + System.out.println( + "\nThe following brokers are balanced: " + + brokerGood.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .collect(Collectors.toSet())); - public void printScore(Map> score) { - Set partitionGood = new HashSet<>(); - Map BrokerGood = new HashMap<>(); - for (var broker : score.keySet()) { - BrokerGood.put(broker, true); - for (var tp : score.get(broker).keySet()) - if (score.get(broker).get(tp) > 0) BrokerGood.put(broker, false); - System.out.println(); - if (BrokerGood.get(broker)) { - System.out.println("broker: " + broker + " is balanced."); - } else { - System.out.println("broker: " + broker); - for (var tp : score.get(broker).keySet()) { - if (score.get(broker).get(tp) > 0) { - System.out.println(tp + ": " + score.get(broker).get(tp)); - } else { - partitionGood.add(tp); - } - } - } + partitionGood.sort( + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); + System.out.println( + "The following partitions are balanced: " + + partitionGood.stream() + .map(String::valueOf) + .collect(Collectors.joining(", ", "[", "]"))); } - - System.out.print( - "\nThe following partitions are balanced: " - + partitionGood.stream() - .map(String::valueOf) - .collect(Collectors.joining(", ", "[", "]"))); } public static void main(String[] args) { var argument = ArgumentUtil.parseArgument(new Argument(), args); - var partitionScore = new PartitionScore(argument.brokers); - partitionScore.printScore(partitionScore.score); + var admin = TopicAdmin.of(argument.brokers); + var internalTopic = + Set.of( + "__consumer_offsets", + "_confluent-command", + "_confluent-metrics", + "_confluent-telemetry-metrics"); + var brokerPartitionSize = GetPartitionInf.getSize(admin); + var retentionMillis = GetPartitionInf.getRetentionMillis(admin); + if (argument.excludeInternalTopic) internalTopic.forEach(retentionMillis::remove); + var load = CalculateUtils.getLoad(brokerPartitionSize, retentionMillis); + var score = CalculateUtils.getScore(load); + printScore(score, argument); } - static class Argument extends BasicArgument {} + static class Argument extends BasicArgument { + @Parameter( + names = {"--exclude.internal.topics"}, + description = + "True if you want to ignore internal topics like _consumer_offsets while counting score.", + validateWith = ArgumentUtil.NotEmptyString.class, + converter = ArgumentUtil.BooleanConverter.class) + boolean excludeInternalTopic = false; + + @Parameter( + names = {"--hide.balanced"}, + description = + "True if you want to hide topics and partitions thar already balanced.", + validateWith = ArgumentUtil.NotEmptyString.class, + converter = ArgumentUtil.BooleanConverter.class) + boolean hideBalanced = false; + } } diff --git a/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java b/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java index f06c348c36..48853ccdb9 100644 --- a/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java +++ b/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java @@ -15,6 +15,7 @@ public class GetPartitionInfTest extends RequireBrokerCluster { static TopicAdmin admin; + static PartitionScore.Argument argument; @BeforeAll static void setup() throws ExecutionException, InterruptedException { @@ -67,8 +68,8 @@ static void setup() throws ExecutionException, InterruptedException { } @Test - void getSize() { - + void testGetSize() { + argument.excludeInternalTopic = true; var brokerPartitionSize = GetPartitionInf.getSize(admin); assertEquals(3, brokerPartitionSize.size()); assertEquals( @@ -79,7 +80,7 @@ void getSize() { } @Test - void getRetentionMillis() { + void testGetRetentionMillis() { var brokerPartitionRetentionMillis = GetPartitionInf.getRetentionMillis(admin); assertEquals(3, brokerPartitionRetentionMillis.size()); } diff --git a/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java b/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java index 414018c777..411bceb432 100644 --- a/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java +++ b/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java @@ -1,7 +1,5 @@ package org.astraea.topic.cost; -import static org.junit.jupiter.api.Assertions.assertEquals; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -11,7 +9,6 @@ import org.astraea.service.RequireBrokerCluster; import org.astraea.topic.TopicAdmin; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; public class PartitionScoreTest extends RequireBrokerCluster { static TopicAdmin admin; @@ -65,15 +62,4 @@ static void setup() throws ExecutionException, InterruptedException { } producer.close(); } - - @Test - void testGetScore() throws ExecutionException, InterruptedException { - PartitionScore partitionScore = new PartitionScore(bootstrapServers()); - assertEquals(3, partitionScore.score.size()); - assertEquals( - 3 * 4, - partitionScore.score.get(0).size() - + partitionScore.score.get(1).size() - + partitionScore.score.get(2).size()); - } } From 1d4e99910524c14abab3293be11e2b203782cbab Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Thu, 6 Jan 2022 19:43:28 +0800 Subject: [PATCH 2/7] update --- README.md | 2 ++ .../astraea/topic/cost/PartitionScore.java | 16 +++++++----- .../topic/cost/CalculateUtilsTest.java | 26 +++++++++---------- .../topic/cost/GetPartitionInfTest.java | 2 -- .../topic/cost/PartitionScoreTest.java | 20 ++++++++++++-- 5 files changed, 43 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 9aff14869e..b11a22ce15 100644 --- a/README.md +++ b/README.md @@ -326,6 +326,8 @@ This tool will score the partition on brokers, the higher score the heavier load ### Partition Score Configurations 1. --bootstrap.servers: the server to connect to +2. ----exclude.internal.topics: True if you want to ignore internal topics like _consumer_offsets while counting score. +3. --hide.balanced: True if you want to hide topics and partitions thar already balanced.:q ## Kafka Replica Syncing Monitor diff --git a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java index bcc7b555e9..63678be8fb 100644 --- a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java +++ b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java @@ -59,9 +59,8 @@ public static void printScore( } } - public static void main(String[] args) { - var argument = ArgumentUtil.parseArgument(new Argument(), args); - var admin = TopicAdmin.of(argument.brokers); + public static Map> execute( + Argument argument, TopicAdmin admin) { var internalTopic = Set.of( "__consumer_offsets", @@ -72,7 +71,13 @@ public static void main(String[] args) { var retentionMillis = GetPartitionInf.getRetentionMillis(admin); if (argument.excludeInternalTopic) internalTopic.forEach(retentionMillis::remove); var load = CalculateUtils.getLoad(brokerPartitionSize, retentionMillis); - var score = CalculateUtils.getScore(load); + return CalculateUtils.getScore(load); + } + + public static void main(String[] args) { + var argument = ArgumentUtil.parseArgument(new Argument(), args); + var admin = TopicAdmin.of(argument.brokers); + var score = execute(argument, admin); printScore(score, argument); } @@ -87,8 +92,7 @@ static class Argument extends BasicArgument { @Parameter( names = {"--hide.balanced"}, - description = - "True if you want to hide topics and partitions thar already balanced.", + description = "True if you want to hide topics and partitions thar already balanced.", validateWith = ArgumentUtil.NotEmptyString.class, converter = ArgumentUtil.BooleanConverter.class) boolean hideBalanced = false; diff --git a/app/src/test/java/org/astraea/topic/cost/CalculateUtilsTest.java b/app/src/test/java/org/astraea/topic/cost/CalculateUtilsTest.java index 80dd074867..0a3f647801 100644 --- a/app/src/test/java/org/astraea/topic/cost/CalculateUtilsTest.java +++ b/app/src/test/java/org/astraea/topic/cost/CalculateUtilsTest.java @@ -54,14 +54,14 @@ void testGetLoad() { assertEquals(2, Load.size()); assertEquals(4, Load.get(0).size()); assertEquals(4, Load.get(1).size()); - assertEquals(0.165, round(Load.get(0).get(new TopicPartition("test0", 0)))); - assertEquals(0.331, round(Load.get(0).get(new TopicPartition("test0", 1)))); - assertEquals(0.496, round(Load.get(0).get(new TopicPartition("test0", 2)))); - assertEquals(0.661, round(Load.get(0).get(new TopicPartition("test0", 3)))); - assertEquals(0.827, round(Load.get(1).get(new TopicPartition("test1", 0)))); - assertEquals(0.992, round(Load.get(1).get(new TopicPartition("test1", 1)))); - assertEquals(1.157, round(Load.get(1).get(new TopicPartition("test1", 2)))); - assertEquals(1.323, round(Load.get(1).get(new TopicPartition("test1", 3)))); + assertEquals(0.17, round(Load.get(0).get(new TopicPartition("test0", 0)))); + assertEquals(0.33, round(Load.get(0).get(new TopicPartition("test0", 1)))); + assertEquals(0.50, round(Load.get(0).get(new TopicPartition("test0", 2)))); + assertEquals(0.66, round(Load.get(0).get(new TopicPartition("test0", 3)))); + assertEquals(0.83, round(Load.get(1).get(new TopicPartition("test1", 0)))); + assertEquals(0.99, round(Load.get(1).get(new TopicPartition("test1", 1)))); + assertEquals(1.16, round(Load.get(1).get(new TopicPartition("test1", 2)))); + assertEquals(1.32, round(Load.get(1).get(new TopicPartition("test1", 3)))); } @Test @@ -74,13 +74,13 @@ void testGetScore() { assertEquals(0.0, round(Score.get(0).get(new TopicPartition("test0", 1)))); assertEquals(0.0, round(Score.get(0).get(new TopicPartition("test0", 2)))); assertEquals(0.0, round(Score.get(0).get(new TopicPartition("test0", 3)))); - assertEquals(-80.498, round(Score.get(1).get(new TopicPartition("test1", 0)))); - assertEquals(-26.833, round(Score.get(1).get(new TopicPartition("test1", 1)))); - assertEquals(26.833, round(Score.get(1).get(new TopicPartition("test1", 2)))); - assertEquals(80.498, round(Score.get(1).get(new TopicPartition("test1", 3)))); + assertEquals(-80.50, round(Score.get(1).get(new TopicPartition("test1", 0)))); + assertEquals(-26.83, round(Score.get(1).get(new TopicPartition("test1", 1)))); + assertEquals(26.83, round(Score.get(1).get(new TopicPartition("test1", 2)))); + assertEquals(80.50, round(Score.get(1).get(new TopicPartition("test1", 3)))); } double round(double score) { - return Math.round(1000 * score) / 1000.0; + return Math.round(100 * score) / 100.0; } } diff --git a/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java b/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java index 48853ccdb9..5195377983 100644 --- a/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java +++ b/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java @@ -15,7 +15,6 @@ public class GetPartitionInfTest extends RequireBrokerCluster { static TopicAdmin admin; - static PartitionScore.Argument argument; @BeforeAll static void setup() throws ExecutionException, InterruptedException { @@ -69,7 +68,6 @@ static void setup() throws ExecutionException, InterruptedException { @Test void testGetSize() { - argument.excludeInternalTopic = true; var brokerPartitionSize = GetPartitionInf.getSize(admin); assertEquals(3, brokerPartitionSize.size()); assertEquals( diff --git a/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java b/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java index 411bceb432..bca1a74455 100644 --- a/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java +++ b/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java @@ -1,5 +1,7 @@ package org.astraea.topic.cost; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -9,6 +11,7 @@ import org.astraea.service.RequireBrokerCluster; import org.astraea.topic.TopicAdmin; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; public class PartitionScoreTest extends RequireBrokerCluster { static TopicAdmin admin; @@ -19,7 +22,7 @@ static void setup() throws ExecutionException, InterruptedException { Map topicName = new HashMap<>(); topicName.put(0, "testPartitionScore0"); topicName.put(1, "testPartitionScore1"); - topicName.put(2, "testPartitionScore2"); + topicName.put(2, "__consumer_offsets"); try (var admin = TopicAdmin.of(bootstrapServers())) { admin .creator() @@ -47,7 +50,7 @@ static void setup() throws ExecutionException, InterruptedException { var producer = Producer.builder().brokers(bootstrapServers()).keySerializer(Serializer.STRING).build(); int size = 10000; - for (int t = 0; t <= 2; t++) { + for (int t = 0; t <= 1; t++) { for (int p = 0; p <= 3; p++) { producer .sender() @@ -62,4 +65,17 @@ static void setup() throws ExecutionException, InterruptedException { } producer.close(); } + + @Test + void testGetScore() { + PartitionScore.Argument argument = new PartitionScore.Argument(); + argument.excludeInternalTopic = false; + var score = PartitionScore.execute(argument, admin); + assertEquals(3, score.size()); + assertEquals(3 * 4, score.get(0).size() + score.get(1).size() + score.get(2).size()); + argument.excludeInternalTopic = true; + score = PartitionScore.execute(argument, admin); + assertEquals(3, score.size()); + assertEquals(2 * 4, score.get(0).size() + score.get(1).size() + score.get(2).size()); + } } From 722c4e42b750b93c7b81d1b6ebc5de323ab699ce Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Mon, 10 Jan 2022 21:46:28 +0800 Subject: [PATCH 3/7] update --- .../astraea/topic/cost/CalculateUtils.java | 150 +++++++++++------- .../astraea/topic/cost/PartitionScore.java | 3 +- 2 files changed, 91 insertions(+), 62 deletions(-) diff --git a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java index f105bc3b94..a0a0b6f0e3 100644 --- a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java +++ b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java @@ -1,9 +1,8 @@ package org.astraea.topic.cost; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; public class CalculateUtils { @@ -11,22 +10,31 @@ public static Map> getLoad( Map> brokerPartitionSize, Map retentionMillis) { Map> brokerPartitionLoad = new HashMap<>(); - double load; - for (var broker : brokerPartitionSize.keySet()) { - Map partitionLoad = new HashMap<>(); - for (var partition : brokerPartitionSize.get(broker).keySet()) { - if (retentionMillis.containsKey(partition.topic())) { - load = - (double) brokerPartitionSize.get(broker).get(partition) - / retentionMillis.get(partition.topic()); - partitionLoad.put(partition, load); - } - } - brokerPartitionLoad.put(broker, partitionLoad); - } + + brokerPartitionSize + .keySet() + .forEach( + (broker) -> { + Map partitionLoad = + brokerPartitionSize.get(broker).keySet().stream() + .filter(partition -> retentionMillis.containsKey(partition.topic())) + .collect( + Collectors.toMap( + Function.identity(), + partition -> + (double) brokerPartitionSize.get(broker).get(partition) + / retentionMillis.get(partition.topic()))); + brokerPartitionLoad.put(broker, partitionLoad); + }); return brokerPartitionLoad; } + public static Double countSum(Set in) { + double mean = 0.0; + for (var element : in) mean += element; + return mean; + } + public static Map> getScore( Map> load) { Map brokerLoad = new HashMap<>(); @@ -34,54 +42,74 @@ public static Map> getScore( Map partitionSD = new HashMap<>(); Map partitionMean = new HashMap<>(); Map> brokerPartitionScore = new HashMap<>(); + double brokerSD; + Set loadSet = new HashSet<>(); + Set LoadSQR = new HashSet<>(); - double mean = 0f, LoadSQR = 0f, SD, brokerSD; - int partitionNum; - for (var broker : load.keySet()) { - for (var topicPartition : load.get(broker).keySet()) { - mean += load.get(broker).get(topicPartition); - partitionLoad.put(topicPartition, load.get(broker).get(topicPartition)); - LoadSQR += Math.pow(load.get(broker).get(topicPartition), 2); - } - partitionNum = load.get(broker).keySet().size(); - brokerLoad.put(broker, mean); - mean /= partitionNum; - partitionMean.put(broker, mean); - SD = Math.pow((LoadSQR - mean * mean * partitionNum) / partitionNum, 0.5); - partitionSD.put(broker, SD); - mean = 0f; - LoadSQR = 0f; - } + load.keySet() + .forEach( + broker -> { + load.get(broker) + .keySet() + .forEach( + tp -> { + loadSet.add(load.get(broker).get(tp)); + partitionLoad.put(tp, load.get(broker).get(tp)); + LoadSQR.add(Math.pow(load.get(broker).get(tp), 2)); + }); + var partitionNum = load.get(broker).keySet().size(); + brokerLoad.put(broker, countSum(loadSet)); + var mean = countSum(loadSet) / loadSet.size(); + partitionMean.put(broker, countSum(loadSet) / loadSet.size()); + var SD = + Math.pow((countSum(LoadSQR) - mean * mean * partitionNum) / partitionNum, 0.5); + partitionSD.put(broker, SD); + loadSet.clear(); + LoadSQR.clear(); + }); - for (var i : brokerLoad.keySet()) { - mean += brokerLoad.get(i); - LoadSQR += Math.pow(brokerLoad.get(i), 2); - } - mean /= brokerLoad.keySet().size(); + brokerLoad + .keySet() + .forEach( + broker -> { + loadSet.add(brokerLoad.get(broker)); + LoadSQR.add(Math.pow(brokerLoad.get(broker), 2)); + }); + var brokerLoadMean = countSum(loadSet) / brokerLoad.keySet().size(); + var brokerLoadSQR = countSum(LoadSQR); brokerSD = Math.pow( - (LoadSQR - mean * mean * brokerLoad.keySet().size()) / brokerLoad.keySet().size(), 0.5); - for (var broker : load.keySet()) { - Map partitionScore = - new TreeMap<>( - Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); - for (var topicPartition : load.get(broker).keySet()) { - if (brokerLoad.get(broker) - mean > 0) { - partitionScore.put( - topicPartition, - Math.round( - (((brokerLoad.get(broker) - mean) / brokerSD) - * ((partitionLoad.get(topicPartition) - partitionMean.get(broker)) - / partitionSD.get(broker)) - * 60.0) - * 100.0) - / 100.0); - } else { - partitionScore.put(topicPartition, 0.0); - } - brokerPartitionScore.put(broker, partitionScore); - } - } + (brokerLoadSQR - brokerLoadMean * brokerLoadMean * brokerLoad.keySet().size()) + / brokerLoad.keySet().size(), + 0.5); + load.keySet() + .forEach( + broker -> { + Map partitionScore = + new TreeMap<>( + Comparator.comparing(TopicPartition::topic) + .thenComparing(TopicPartition::partition)); + load.get(broker) + .keySet() + .forEach( + topicPartition -> { + if (brokerLoad.get(broker) - brokerLoadMean > 0) { + partitionScore.put( + topicPartition, + Math.round( + (((brokerLoad.get(broker) - brokerLoadMean) / brokerSD) + * ((partitionLoad.get(topicPartition) + - partitionMean.get(broker)) + / partitionSD.get(broker)) + * 60.0) + * 100.0) + / 100.0); + } else { + partitionScore.put(topicPartition, 0.0); + } + brokerPartitionScore.put(broker, partitionScore); + }); + }); return brokerPartitionScore; } } diff --git a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java index 63678be8fb..575bf25a38 100644 --- a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java +++ b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java @@ -66,7 +66,8 @@ public static Map> execute( "__consumer_offsets", "_confluent-command", "_confluent-metrics", - "_confluent-telemetry-metrics"); + "_confluent-telemetry-metrics", + "__transaction_state"); var brokerPartitionSize = GetPartitionInf.getSize(admin); var retentionMillis = GetPartitionInf.getRetentionMillis(admin); if (argument.excludeInternalTopic) internalTopic.forEach(retentionMillis::remove); From 1769491d382a72cdd64fa483f767d447c6f62613 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Sun, 23 Jan 2022 16:37:32 +0800 Subject: [PATCH 4/7] update --- .../java/org/astraea/topic/cost/CalculateUtils.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java index a0a0b6f0e3..d7bb92b304 100644 --- a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java +++ b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java @@ -30,9 +30,7 @@ public static Map> getLoad( } public static Double countSum(Set in) { - double mean = 0.0; - for (var element : in) mean += element; - return mean; + return in.stream().mapToDouble(i -> i).sum(); } public static Map> getScore( @@ -57,10 +55,11 @@ public static Map> getScore( partitionLoad.put(tp, load.get(broker).get(tp)); LoadSQR.add(Math.pow(load.get(broker).get(tp), 2)); }); + var loadSum = countSum(loadSet); var partitionNum = load.get(broker).keySet().size(); - brokerLoad.put(broker, countSum(loadSet)); - var mean = countSum(loadSet) / loadSet.size(); - partitionMean.put(broker, countSum(loadSet) / loadSet.size()); + brokerLoad.put(broker, loadSum); + var mean = loadSum / loadSet.size(); + partitionMean.put(broker, loadSum / loadSet.size()); var SD = Math.pow((countSum(LoadSQR) - mean * mean * partitionNum) / partitionNum, 0.5); partitionSD.put(broker, SD); From 56c92d6f92203d223ac301af951ade511b6140a6 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Sun, 13 Feb 2022 01:28:14 +0800 Subject: [PATCH 5/7] fix some issue --- app/src/main/java/org/astraea/topic/cost/CalculateUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java index d7bb92b304..8d6cae25aa 100644 --- a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java +++ b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java @@ -58,8 +58,8 @@ public static Map> getScore( var loadSum = countSum(loadSet); var partitionNum = load.get(broker).keySet().size(); brokerLoad.put(broker, loadSum); - var mean = loadSum / loadSet.size(); - partitionMean.put(broker, loadSum / loadSet.size()); + var mean = loadSum / load.get(broker).size(); + partitionMean.put(broker, loadSum / load.get(broker).size()); var SD = Math.pow((countSum(LoadSQR) - mean * mean * partitionNum) / partitionNum, 0.5); partitionSD.put(broker, SD); From a078c2acedbf026779ffccf806bb2c24ab87dba3 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Fri, 18 Feb 2022 04:07:24 +0800 Subject: [PATCH 6/7] fix some issues --- README.md | 2 +- .../main/java/org/astraea/topic/cost/CalculateUtils.java | 2 +- .../main/java/org/astraea/topic/cost/GetPartitionInf.java | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c49101ce68..328de4d678 100644 --- a/README.md +++ b/README.md @@ -350,7 +350,7 @@ This tool will score the partition on brokers, the higher score the heavier load ### Partition Score Configurations 1. --bootstrap.servers: the server to connect to -2. ----exclude.internal.topics: True if you want to ignore internal topics like _consumer_offsets while counting score. +2. --exclude.internal.topics: True if you want to ignore internal topics like _consumer_offsets while counting score. 3. --hide.balanced: True if you want to hide topics and partitions thar already balanced.:q ## Kafka Replica Syncing Monitor diff --git a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java index 8d6cae25aa..b6f84ac8af 100644 --- a/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java +++ b/app/src/main/java/org/astraea/topic/cost/CalculateUtils.java @@ -29,7 +29,7 @@ public static Map> getLoad( return brokerPartitionLoad; } - public static Double countSum(Set in) { + private static Double countSum(Set in) { return in.stream().mapToDouble(i -> i).sum(); } diff --git a/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java b/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java index 5485053845..613694b18a 100644 --- a/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java +++ b/app/src/main/java/org/astraea/topic/cost/GetPartitionInf.java @@ -20,8 +20,11 @@ static Map> getSize(TopicAdmin client) { .replicas(client.topicNames()) .forEach( (tp, assignment) -> { - if (assignment.get(0).broker() == broker) - partitionSize.put(tp, (int) assignment.get(0).size()); + assignment.forEach( + partition -> { + if (partition.broker() == broker) + partitionSize.put(tp, (int) partition.size()); + }); brokerPartitionSize.put(broker, partitionSize); }); }); From 9acef1e8137187086fa2121f6183e20eedb5cd39 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Fri, 18 Feb 2022 04:16:29 +0800 Subject: [PATCH 7/7] rebuild code --- .../org/astraea/topic/cost/PartitionScore.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java index 575bf25a38..40def4f638 100644 --- a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java +++ b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java @@ -1,11 +1,14 @@ package org.astraea.topic.cost; +import static org.astraea.argument.ArgumentUtil.parseArgument; + import com.beust.jcommander.Parameter; +import com.beust.jcommander.converters.BooleanConverter; import java.util.*; import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; -import org.astraea.argument.ArgumentUtil; import org.astraea.argument.BasicArgument; +import org.astraea.argument.validator.NotEmptyString; import org.astraea.topic.TopicAdmin; public class PartitionScore { @@ -76,7 +79,7 @@ public static Map> execute( } public static void main(String[] args) { - var argument = ArgumentUtil.parseArgument(new Argument(), args); + var argument = parseArgument(new Argument(), args); var admin = TopicAdmin.of(argument.brokers); var score = execute(argument, admin); printScore(score, argument); @@ -87,15 +90,15 @@ static class Argument extends BasicArgument { names = {"--exclude.internal.topics"}, description = "True if you want to ignore internal topics like _consumer_offsets while counting score.", - validateWith = ArgumentUtil.NotEmptyString.class, - converter = ArgumentUtil.BooleanConverter.class) + validateWith = NotEmptyString.class, + converter = BooleanConverter.class) boolean excludeInternalTopic = false; @Parameter( names = {"--hide.balanced"}, description = "True if you want to hide topics and partitions thar already balanced.", - validateWith = ArgumentUtil.NotEmptyString.class, - converter = ArgumentUtil.BooleanConverter.class) + validateWith = NotEmptyString.class, + converter = BooleanConverter.class) boolean hideBalanced = false; } }