Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ClusterCost in ReplicaDiskInCost #563

Merged
merged 6 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions app/src/main/java/org/astraea/app/cost/Dispersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.cost;

import java.util.Collection;

/** used to aggregate a sequence into a number */
public interface Dispersion {
/**
* use correlation coefficient to a aggregate a sequence * @return correlation coefficient
* Dispersion
*/
static Dispersion correlationCoefficient() {
return brokerCost -> {
var dataRateMean = brokerCost.stream().mapToDouble(x -> x).sum() / brokerCost.size();
var dataRateSD =
Math.sqrt(
brokerCost.stream().mapToDouble(score -> Math.pow((score - dataRateMean), 2)).sum()
/ brokerCost.size());
return dataRateSD / dataRateMean;
};
}

/**
* aggregated the values into a number
*
* @param scores origin data
* @return aggregated data
*/
double calculate(Collection<Double> scores);
}
78 changes: 37 additions & 41 deletions app/src/main/java/org/astraea/app/cost/ReplicaDiskInCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.admin.TopicPartitionReplica;
import org.astraea.app.metrics.HasBeanObject;
Expand All @@ -37,48 +35,38 @@
* responds to the replica log size of brokers. The calculation method of the score is the rate of
* increase of log size per unit time divided by the upper limit of broker bandwidth.
*/
public class ReplicaDiskInCost implements HasBrokerCost, HasPartitionCost {
public class ReplicaDiskInCost implements HasClusterCost, HasBrokerCost, HasPartitionCost {
private final Duration duration;
private final Dispersion dispersion = Dispersion.correlationCoefficient();
static final double OVERFLOW_SCORE = 9999.0;

public ReplicaDiskInCost(Configuration configuration) {
duration =
Duration.ofSeconds(Integer.parseInt(configuration.string("metrics.duration").orElse("30")));
}

@Override
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerCost = brokerCost(clusterInfo, clusterBean).value();
// when retention occur, brokerCost will be set to -1 , and return a big score to reject this
// plan.
if (brokerCost.containsValue(-1.0)) return () -> OVERFLOW_SCORE;
return () -> dispersion.calculate(brokerCost.values());
}

@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
final Map<Integer, List<TopicPartitionReplica>> topicPartitionOfEachBroker =
clusterInfo.topics().stream()
.flatMap(topic -> clusterInfo.replicas(topic).stream())
.map(
replica ->
TopicPartitionReplica.of(
replica.topic(), replica.partition(), replica.nodeInfo().id()))
.collect(Collectors.groupingBy(TopicPartitionReplica::brokerId));
final var actual =
var partitionCost = partitionCost(clusterInfo, clusterBean);
var brokerLoad =
clusterInfo.nodes().stream()
.collect(
Collectors.toUnmodifiableMap(
NodeInfo::id,
node -> topicPartitionOfEachBroker.getOrDefault(node.id(), List.of())));

final var topicPartitionDataRate = topicPartitionDataRate(clusterBean, duration);

final var brokerLoad =
actual.entrySet().stream()
.map(
entry ->
node ->
Map.entry(
entry.getKey(),
entry.getValue().stream()
.mapToDouble(
x ->
topicPartitionDataRate.get(
TopicPartition.of(x.topic(), x.partition())))
node.id(),
partitionCost.value(node.id()).values().stream()
.mapToDouble(rate -> rate)
.sum()))
.map(entry -> Map.entry(entry.getKey(), entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return () -> brokerLoad;
}

Expand Down Expand Up @@ -140,7 +128,24 @@ public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterB
Collectors.toUnmodifiableMap(
Map.Entry::getKey, Map.Entry::getValue))))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
// when retention occur, set all partitionScore to -1.
if (replicaIn.containsValue(-1.0)) {
return new PartitionCost() {
@Override
public Map<TopicPartition, Double> value(String topic) {
return scoreForTopic.get(topic).keySet().stream()
.map(x -> Map.entry(x, -1.0))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Map<TopicPartition, Double> value(int brokerId) {
return scoreForBroker.get(brokerId).keySet().stream()
.map(x -> Map.entry(x, -1.0))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
};
}
return new PartitionCost() {
@Override
public Map<TopicPartition, Double> value(String topic) {
Expand Down Expand Up @@ -168,17 +173,6 @@ public Optional<Fetcher> fetcher() {
* doesn't have the sufficient old metric then an exception will likely be thrown.
* @return a map contain the maximum increase rate of each topic/partition log
*/
public static Map<TopicPartition, Double> topicPartitionDataRate(
ClusterBean clusterBean, Duration sampleWindow) {
return replicaDataRate(clusterBean, sampleWindow).entrySet().stream()
.map(
x -> {
var tpr = x.getKey();
return Map.entry(TopicPartition.of(tpr.topic(), tpr.partition()), x.getValue());
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x1, x2) -> x1));
}

public static Map<TopicPartitionReplica, Double> replicaDataRate(
ClusterBean clusterBean, Duration sampleWindow) {
return clusterBean.mapByReplica().entrySet().parallelStream()
Expand Down Expand Up @@ -207,6 +201,8 @@ public static Map<TopicPartitionReplica, Double> replicaDataRate(
/ 1024.0
/ ((double) (latestSize.createdTimestamp() - windowSize.createdTimestamp())
/ 1000);
// when retention occur, set all data rate to -1.
if (dataRate < 0) dataRate = -1.0;
return Map.entry(metrics.getKey(), dataRate);
})
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
Expand Down
16 changes: 2 additions & 14 deletions app/src/main/java/org/astraea/app/cost/ReplicaLeaderCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,7 @@

/** more replica leaders -> higher cost */
public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost {
static double coefficientVariation(Map<Integer, Double> brokerScore) {
var dataRateMean =
brokerScore.values().stream().mapToDouble(x -> (double) x).sum() / brokerScore.size();
var dataRateSD =
Math.sqrt(
brokerScore.values().stream()
.mapToDouble(score -> Math.pow((score - dataRateMean), 2))
.sum()
/ brokerScore.size());
var cv = dataRateSD / dataRateMean;
if (cv > 1) return 1.0;
return cv;
}
private final Dispersion dispersion = Dispersion.correlationCoefficient();

@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
Expand All @@ -54,7 +42,7 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
@Override
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerScore = brokerCost(clusterInfo, clusterBean).value();
return () -> coefficientVariation(brokerScore);
return () -> dispersion.calculate(brokerScore.values());
}

Map<Integer, Integer> leaderCount(ClusterInfo ignored, ClusterBean clusterBean) {
Expand Down
31 changes: 31 additions & 0 deletions app/src/test/java/org/astraea/app/cost/DispersionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.cost;

import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class DispersionTest {

@Test
void testCorrelationCoefficient() {
var dispersion = Dispersion.correlationCoefficient();
var scores = List.of(0.2, 0.4, 0.7);
Assertions.assertEquals(0.47418569253607507, dispersion.calculate(scores));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ void testBrokerCost() {
Assertions.assertEquals(33.37843418121338, brokerLoad.get(3));
}

@Test
void testClusterCost() {
var configuration = Configuration.of(Map.of("metrics.duration", "3"));
var loadCostFunction = new ReplicaDiskInCost(configuration);
var brokerLoad = loadCostFunction.clusterCost(clusterInfo(), clusterBean()).value();
Assertions.assertEquals(0.20721255412897746, brokerLoad);
}

private ClusterInfo clusterInfo() {
ClusterInfo clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.nodes())
Expand Down