Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST] Provide descriptive Object#toString for ClusterCosts #1473

Merged
merged 5 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.astraea.common.admin.ClusterBean;
Expand Down Expand Up @@ -51,7 +52,12 @@ public Optional<Fetcher> fetcher() {
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerCost = brokerCost(clusterInfo, clusterBean).value();
var value = dispersion.calculate(brokerCost.values());
return () -> value;
return ClusterCost.of(
value,
() ->
brokerCost.values().stream()
.map(Objects::toString)
.collect(Collectors.joining(", ", "{", "}")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ public Optional<Fetcher> fetcher() {
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerCost = brokerCost(clusterInfo, clusterBean).value();
var value = dispersion.calculate(brokerCost.values());
return () -> value;
return ClusterCost.of(
value,
() ->
brokerCost.values().stream()
.map(Object::toString)
.collect(Collectors.joining(", ", "{", "}")));
}

@Override
Expand Down
26 changes: 25 additions & 1 deletion common/src/main/java/org/astraea/common/cost/ClusterCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,33 @@
*/
package org.astraea.common.cost;

/** Return type of cost function, `HasMoveCost`. It returns the score of brokers. */
import java.util.function.Supplier;

public interface ClusterCost {

/**
* Build a {@link ClusterCost} instance. The provided cost value must be within the range of [0,
* 1]. See the javadoc of {@link ClusterCost#value()} for further detail.
*
* @param costValue The cost value of a Kafka cluster. The provided cost value should be within
* the range of [0, 1]. See the javadoc of {@link ClusterCost#value()} for further detail.
* @param description a descriptive text about the background story of this cost value. This value
* might be displayed on a user interface.
*/
static ClusterCost of(double costValue, Supplier<String> description) {
return new ClusterCost() {
@Override
public double value() {
return costValue;
}

@Override
public String toString() {
return description.get();
}
};
}

/**
* The cost score of a Kafka cluster. This value represents the idealness of a Kafka cluster in
* terms of a specific performance aspect.
Expand Down
57 changes: 28 additions & 29 deletions common/src/main/java/org/astraea/common/cost/HasClusterCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.astraea.common.cost;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -50,39 +49,39 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean)
costAndWeight.entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
e -> e.getKey().clusterCost(clusterInfo, clusterBean).value()));
Map.Entry::getKey, e -> e.getKey().clusterCost(clusterInfo, clusterBean)));
var totalWeight = costAndWeight.values().stream().mapToDouble(x -> x).sum();
var compositeScore =
costAndWeight.keySet().stream()
.mapToDouble(cost -> scores.get(cost) * costAndWeight.get(cost) / totalWeight)
.mapToDouble(
cost -> scores.get(cost).value() * costAndWeight.get(cost) / totalWeight)
.sum();

return new ClusterCost() {
@Override
public double value() {
return compositeScore;
}

@Override
public String toString() {
Bi3Function<HasClusterCost, Double, Double, String> descriptiveName =
(function, cost, weight) ->
"{\"" + function.toString() + "\" cost " + cost + " weight " + weight + "}";
return "WeightCompositeClusterCost["
+ costAndWeight.entrySet().stream()
.sorted(
Comparator.<Map.Entry<HasClusterCost, Double>>comparingDouble(
Map.Entry::getValue)
.reversed())
.map(
e ->
descriptiveName.apply(e.getKey(), scores.get(e.getKey()), e.getValue()))
.collect(Collectors.joining(", "))
+ "] = "
+ compositeScore;
}
};
return ClusterCost.of(
compositeScore,
() -> {
Bi3Function<HasClusterCost, ClusterCost, Double, String> descriptiveName =
(function, cost, weight) ->
"{\""
+ function.toString()
+ "\" cost "
+ cost.value()
+ " weight "
+ weight
+ " description "
+ cost
+ " }";
return "WeightCompositeClusterCost["
+ costAndWeight.entrySet().stream()
.sorted(Map.Entry.<HasClusterCost, Double>comparingByValue().reversed())
.map(
e ->
descriptiveName.apply(
e.getKey(), scores.get(e.getKey()), e.getValue()))
.collect(Collectors.joining(", "))
+ "] = "
+ compositeScore;
});
}

@Override
Expand Down
13 changes: 11 additions & 2 deletions common/src/main/java/org/astraea/common/cost/NetworkCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.common.DataRate;
import org.astraea.common.EnumInfo;
import org.astraea.common.admin.BrokerTopic;
import org.astraea.common.admin.ClusterBean;
Expand Down Expand Up @@ -144,9 +145,17 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean)
if (summary.getMin() < 0)
throw new IllegalStateException(
"Corrupted min rate: " + summary.getMin() + ", brokers: " + brokerRate);
if (summary.getMax() == 0) return () -> 0; // edge case to avoid divided by zero error
if (summary.getMax() == 0)
return ClusterCost.of(
0, () -> "network load zero"); // edge case to avoid divided by zero error
double score = (summary.getMax() - summary.getMin()) / (summary.getMax());
return () -> score;
return ClusterCost.of(
score,
() ->
brokerRate.values().stream()
.map(x -> DataRate.Byte.of(x.longValue()).perSecond())
.map(DataRate::toString)
.collect(Collectors.joining(", ", "{", "}")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerScore = brokerCost(clusterInfo, clusterBean).value();
var value = dispersion.calculate(brokerScore.values());
return () -> value;
return ClusterCost.of(
value,
() ->
brokerScore.values().stream()
.map(Object::toString)
.collect(Collectors.joining(", ", "{", "}")));
}

private static Map<Integer, Integer> leaderCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerCost = brokerCost(clusterInfo, clusterBean).value();
var value = dispersion.calculate(brokerCost.values());
return () -> value;
return ClusterCost.of(
value,
() ->
brokerCost.values().stream()
.map(Object::toString)
.collect(Collectors.joining(", ", "{", "}")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean)
var totalReplicas = clusterInfo.replicas().size();

// no need to rebalance
if (totalReplicas == 0) return () -> 0;
if (totalReplicas == 0) return ClusterCost.of(0, () -> "no replica");

var replicaPerBroker =
clusterInfo
Expand All @@ -85,15 +85,15 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean)
var max = summary.getMax();
var min = anyBrokerEmpty ? 0 : summary.getMin();
// complete balance
if (max - min == 0) return () -> 0;
if (max - min == 0) return ClusterCost.of(0, () -> "complete balance " + max);
// complete balance in terms of integer
// The following case will trigger if the number of replicas is not integer times of brokers.
// For example: allocate 4 replicas to 3 brokers. The ideal placement state will be (2,1,1),
// (1,2,1) or (1,1,2). All these cases should be considered as optimal solution since the number
// of replica must be integer. And this case will be trigger if the (max - min) equals 1. If
// such case is detected, return 0 as the optimal state of this cost function was found.
if (max - min == 1) return () -> 0;
return () -> (double) (max - min) / (totalReplicas);
if (max - min == 1) return ClusterCost.of(0, () -> "integer balance " + max);
return ClusterCost.of((double) (max - min) / (totalReplicas), summary::toString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ static void closeService() {

@Test
void testMerge() {
HasClusterCost cost0 = (c, b) -> () -> 0.2;
HasClusterCost cost1 = (c, b) -> () -> 0.5;
HasClusterCost cost2 = (c, b) -> () -> 0.8;
HasClusterCost cost0 = (c, b) -> ClusterCost.of(0.2, () -> "description0");
HasClusterCost cost1 = (c, b) -> ClusterCost.of(0.5, () -> "description1");
HasClusterCost cost2 = (c, b) -> ClusterCost.of(0.8, () -> "description2");
var merged = HasClusterCost.of(Map.of(cost0, 1D, cost1, 2D, cost2, 2D));
var result = merged.clusterCost(null, null).value();
var clusterCost = merged.clusterCost(null, null);
var result = clusterCost.value();
Assertions.assertEquals(0.56, Math.round(result * 100.0) / 100.0);
Assertions.assertTrue(clusterCost.toString().contains("description0"));
Assertions.assertTrue(clusterCost.toString().contains("description1"));
Assertions.assertTrue(clusterCost.toString().contains("description2"));
}

@Test
Expand Down