diff --git a/common/src/main/java/org/astraea/common/cost/BrokerInputCost.java b/common/src/main/java/org/astraea/common/cost/BrokerInputCost.java index c80d93b2fc..2c4c69b993 100644 --- a/common/src/main/java/org/astraea/common/cost/BrokerInputCost.java +++ b/common/src/main/java/org/astraea/common/cost/BrokerInputCost.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import org.astraea.common.admin.ClusterBean; @@ -51,7 +52,12 @@ public Optional fetcher() { public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var brokerCost = brokerCost(clusterInfo, clusterBean).value(); var value = dispersion.calculate(brokerCost.values()); - return () -> value; + return ClusterCost.of( + value, + () -> + brokerCost.values().stream() + .map(Objects::toString) + .collect(Collectors.joining(", ", "{", "}"))); } @Override diff --git a/common/src/main/java/org/astraea/common/cost/BrokerOutputCost.java b/common/src/main/java/org/astraea/common/cost/BrokerOutputCost.java index 66723a8604..b24f1bfad9 100644 --- a/common/src/main/java/org/astraea/common/cost/BrokerOutputCost.java +++ b/common/src/main/java/org/astraea/common/cost/BrokerOutputCost.java @@ -52,7 +52,12 @@ public Optional fetcher() { public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var brokerCost = brokerCost(clusterInfo, clusterBean).value(); var value = dispersion.calculate(brokerCost.values()); - return () -> value; + return ClusterCost.of( + value, + () -> + brokerCost.values().stream() + .map(Object::toString) + .collect(Collectors.joining(", ", "{", "}"))); } @Override diff --git a/common/src/main/java/org/astraea/common/cost/ClusterCost.java b/common/src/main/java/org/astraea/common/cost/ClusterCost.java index 8f741469be..afd92d828d 100644 --- a/common/src/main/java/org/astraea/common/cost/ClusterCost.java +++ b/common/src/main/java/org/astraea/common/cost/ClusterCost.java @@ -16,9 +16,33 @@ */ package org.astraea.common.cost; -/** Return type of cost function, `HasMoveCost`. It returns the score of brokers. */ +import java.util.function.Supplier; + public interface ClusterCost { + /** + * Build a {@link ClusterCost} instance. The provided cost value must be within the range of [0, + * 1]. See the javadoc of {@link ClusterCost#value()} for further detail. + * + * @param costValue The cost value of a Kafka cluster. The provided cost value should be within + * the range of [0, 1]. See the javadoc of {@link ClusterCost#value()} for further detail. + * @param description a descriptive text about the background story of this cost value. This value + * might be displayed on a user interface. + */ + static ClusterCost of(double costValue, Supplier description) { + return new ClusterCost() { + @Override + public double value() { + return costValue; + } + + @Override + public String toString() { + return description.get(); + } + }; + } + /** * The cost score of a Kafka cluster. This value represents the idealness of a Kafka cluster in * terms of a specific performance aspect. diff --git a/common/src/main/java/org/astraea/common/cost/HasClusterCost.java b/common/src/main/java/org/astraea/common/cost/HasClusterCost.java index 9067e14e41..362c24be34 100644 --- a/common/src/main/java/org/astraea/common/cost/HasClusterCost.java +++ b/common/src/main/java/org/astraea/common/cost/HasClusterCost.java @@ -17,7 +17,6 @@ package org.astraea.common.cost; import java.util.Collection; -import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -50,39 +49,39 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) costAndWeight.entrySet().stream() .collect( Collectors.toUnmodifiableMap( - Map.Entry::getKey, - e -> e.getKey().clusterCost(clusterInfo, clusterBean).value())); + Map.Entry::getKey, e -> e.getKey().clusterCost(clusterInfo, clusterBean))); var totalWeight = costAndWeight.values().stream().mapToDouble(x -> x).sum(); var compositeScore = costAndWeight.keySet().stream() - .mapToDouble(cost -> scores.get(cost) * costAndWeight.get(cost) / totalWeight) + .mapToDouble( + cost -> scores.get(cost).value() * costAndWeight.get(cost) / totalWeight) .sum(); - return new ClusterCost() { - @Override - public double value() { - return compositeScore; - } - - @Override - public String toString() { - Bi3Function descriptiveName = - (function, cost, weight) -> - "{\"" + function.toString() + "\" cost " + cost + " weight " + weight + "}"; - return "WeightCompositeClusterCost[" - + costAndWeight.entrySet().stream() - .sorted( - Comparator.>comparingDouble( - Map.Entry::getValue) - .reversed()) - .map( - e -> - descriptiveName.apply(e.getKey(), scores.get(e.getKey()), e.getValue())) - .collect(Collectors.joining(", ")) - + "] = " - + compositeScore; - } - }; + return ClusterCost.of( + compositeScore, + () -> { + Bi3Function descriptiveName = + (function, cost, weight) -> + "{\"" + + function.toString() + + "\" cost " + + cost.value() + + " weight " + + weight + + " description " + + cost + + " }"; + return "WeightCompositeClusterCost[" + + costAndWeight.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .map( + e -> + descriptiveName.apply( + e.getKey(), scores.get(e.getKey()), e.getValue())) + .collect(Collectors.joining(", ")) + + "] = " + + compositeScore; + }); } @Override diff --git a/common/src/main/java/org/astraea/common/cost/NetworkCost.java b/common/src/main/java/org/astraea/common/cost/NetworkCost.java index a056e1360f..78d654b5fc 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkCost.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import org.astraea.common.DataRate; import org.astraea.common.EnumInfo; import org.astraea.common.admin.BrokerTopic; import org.astraea.common.admin.ClusterBean; @@ -144,9 +145,17 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) if (summary.getMin() < 0) throw new IllegalStateException( "Corrupted min rate: " + summary.getMin() + ", brokers: " + brokerRate); - if (summary.getMax() == 0) return () -> 0; // edge case to avoid divided by zero error + if (summary.getMax() == 0) + return ClusterCost.of( + 0, () -> "network load zero"); // edge case to avoid divided by zero error double score = (summary.getMax() - summary.getMin()) / (summary.getMax()); - return () -> score; + return ClusterCost.of( + score, + () -> + brokerRate.values().stream() + .map(x -> DataRate.Byte.of(x.longValue()).perSecond()) + .map(DataRate::toString) + .collect(Collectors.joining(", ", "{", "}"))); } @Override diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index 98c43dd6b1..1c2d16585c 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -49,7 +49,12 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var brokerScore = brokerCost(clusterInfo, clusterBean).value(); var value = dispersion.calculate(brokerScore.values()); - return () -> value; + return ClusterCost.of( + value, + () -> + brokerScore.values().stream() + .map(Object::toString) + .collect(Collectors.joining(", ", "{", "}"))); } private static Map leaderCount( diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java index 067c27fb16..40bd0fb5c7 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java @@ -85,7 +85,12 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var brokerCost = brokerCost(clusterInfo, clusterBean).value(); var value = dispersion.calculate(brokerCost.values()); - return () -> value; + return ClusterCost.of( + value, + () -> + brokerCost.values().stream() + .map(Object::toString) + .collect(Collectors.joining(", ", "{", "}"))); } @Override diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java index 02ed2508fe..7f2c58b187 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java @@ -70,7 +70,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) var totalReplicas = clusterInfo.replicas().size(); // no need to rebalance - if (totalReplicas == 0) return () -> 0; + if (totalReplicas == 0) return ClusterCost.of(0, () -> "no replica"); var replicaPerBroker = clusterInfo @@ -85,15 +85,15 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) var max = summary.getMax(); var min = anyBrokerEmpty ? 0 : summary.getMin(); // complete balance - if (max - min == 0) return () -> 0; + if (max - min == 0) return ClusterCost.of(0, () -> "complete balance " + max); // complete balance in terms of integer // The following case will trigger if the number of replicas is not integer times of brokers. // For example: allocate 4 replicas to 3 brokers. The ideal placement state will be (2,1,1), // (1,2,1) or (1,1,2). All these cases should be considered as optimal solution since the number // of replica must be integer. And this case will be trigger if the (max - min) equals 1. If // such case is detected, return 0 as the optimal state of this cost function was found. - if (max - min == 1) return () -> 0; - return () -> (double) (max - min) / (totalReplicas); + if (max - min == 1) return ClusterCost.of(0, () -> "integer balance " + max); + return ClusterCost.of((double) (max - min) / (totalReplicas), summary::toString); } @Override diff --git a/common/src/test/java/org/astraea/common/cost/ClusterCostTest.java b/common/src/test/java/org/astraea/common/cost/ClusterCostTest.java index eb6df1f517..6c1ebbc6f3 100644 --- a/common/src/test/java/org/astraea/common/cost/ClusterCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ClusterCostTest.java @@ -40,12 +40,16 @@ static void closeService() { @Test void testMerge() { - HasClusterCost cost0 = (c, b) -> () -> 0.2; - HasClusterCost cost1 = (c, b) -> () -> 0.5; - HasClusterCost cost2 = (c, b) -> () -> 0.8; + HasClusterCost cost0 = (c, b) -> ClusterCost.of(0.2, () -> "description0"); + HasClusterCost cost1 = (c, b) -> ClusterCost.of(0.5, () -> "description1"); + HasClusterCost cost2 = (c, b) -> ClusterCost.of(0.8, () -> "description2"); var merged = HasClusterCost.of(Map.of(cost0, 1D, cost1, 2D, cost2, 2D)); - var result = merged.clusterCost(null, null).value(); + var clusterCost = merged.clusterCost(null, null); + var result = clusterCost.value(); Assertions.assertEquals(0.56, Math.round(result * 100.0) / 100.0); + Assertions.assertTrue(clusterCost.toString().contains("description0")); + Assertions.assertTrue(clusterCost.toString().contains("description1")); + Assertions.assertTrue(clusterCost.toString().contains("description2")); } @Test