Skip to content

Commit

Permalink
HBASE-22618 added the possibility to load custom cost functions
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
PierreZ authored and wchevreuil committed Aug 17, 2019
1 parent 8cb531f commit 836f269
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
Expand All @@ -47,6 +49,7 @@
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,6 +85,13 @@
* <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
* </ul>
*
* <p>You can also add custom Cost function by setting the the following configuration value:</p>
* <ul>
* <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
* </ul>
*
* <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
*
* <p>In addition to the above configurations, the balancer can be tuned by the following
* configuration values:</p>
* <ul>
Expand Down Expand Up @@ -117,6 +127,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private static final String TABLE_FUNCTION_SEP = "_";
protected static final String MIN_COST_NEED_BALANCE_KEY =
"hbase.master.balancer.stochastic.minCostNeedBalance";
protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
"hbase.master.balancer.stochastic.additionalCostFunctions";

protected static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
Expand All @@ -133,7 +145,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {

private List<CandidateGenerator> candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC

// to save and report costs to JMX
private Double curOverallCost = 0d;
Expand Down Expand Up @@ -196,25 +208,57 @@ public synchronized void setConf(Configuration conf) {
};
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
costFunctions = new CostFunction[]{
new RegionCountSkewCostFunction(conf),
new PrimaryRegionCountSkewCostFunction(conf),
new MoveCostFunction(conf),
localityCost,
rackLocalityCost,
new TableSkewCostFunction(conf),
regionReplicaHostCostFunction,
regionReplicaRackCostFunction,
regionLoadFunctions[0],
regionLoadFunctions[1],
regionLoadFunctions[2],
regionLoadFunctions[3],
regionLoadFunctions[4]
};
curFunctionCosts= new Double[costFunctions.length];
tempFunctionCosts= new Double[costFunctions.length];

costFunctions = new ArrayList<>();
costFunctions.add(new RegionCountSkewCostFunction(conf));
costFunctions.add(new PrimaryRegionCountSkewCostFunction(conf));
costFunctions.add(new MoveCostFunction(conf));
costFunctions.add(localityCost);
costFunctions.add(rackLocalityCost);
costFunctions.add(new TableSkewCostFunction(conf));
costFunctions.add(regionReplicaHostCostFunction);
costFunctions.add(regionReplicaRackCostFunction);
costFunctions.add(regionLoadFunctions[0]);
costFunctions.add(regionLoadFunctions[1]);
costFunctions.add(regionLoadFunctions[2]);
costFunctions.add(regionLoadFunctions[3]);
costFunctions.add(regionLoadFunctions[4]);
loadCustomCostFunctions(conf);

curFunctionCosts= new Double[costFunctions.size()];
tempFunctionCosts= new Double[costFunctions.size()];
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
}

private void loadCustomCostFunctions(Configuration conf) {
String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);

if (null == functionsNames) {
return;
}

costFunctions.addAll(Arrays.stream(functionsNames)
.map(c -> {
Class<? extends CostFunction> klass = null;
try {
klass = (Class<? extends CostFunction>) Class.forName(c);
} catch (ClassNotFoundException e) {
LOG.warn("Cannot load class " + c + "': " + e.getMessage());
}
if (null == klass) {
return null;
}

CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
LOG.info("Successfully loaded custom CostFunction '" +
reflected.getClass().getSimpleName() + "'");

return reflected;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}

protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
Expand Down Expand Up @@ -468,8 +512,8 @@ private void updateStochasticCosts(TableName tableName, Double overall, Double[]
"Overall", "Overall cost", overall);

// each cost function
for (int i = 0; i < costFunctions.length; i++) {
CostFunction costFunction = costFunctions[i];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction costFunction = costFunctions.get(i);
String costFunctionName = costFunction.getClass().getSimpleName();
Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
// TODO: cost function may need a specific description
Expand Down Expand Up @@ -567,9 +611,9 @@ protected void updateCostsWithAction(Cluster cluster, Action action) {
*/
public String[] getCostFunctionNames() {
if (costFunctions == null) return null;
String[] ret = new String[costFunctions.length];
for (int i = 0; i < costFunctions.length; i++) {
CostFunction c = costFunctions[i];
String[] ret = new String[costFunctions.size()];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction c = costFunctions.get(i);
ret[i] = c.getClass().getSimpleName();
}

Expand All @@ -588,8 +632,8 @@ public String[] getCostFunctionNames() {
protected double computeCost(Cluster cluster, double previousCost) {
double total = 0;

for (int i = 0; i < costFunctions.length; i++) {
CostFunction c = costFunctions[i];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction c = costFunctions.get(i);
this.tempFunctionCosts[i] = 0.0;

if (c.getMultiplier() <= 0) {
Expand Down Expand Up @@ -972,13 +1016,13 @@ Cluster.Action generate(Cluster cluster) {
/**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
abstract static class CostFunction {
public abstract static class CostFunction {

private float multiplier = 0;

protected Cluster cluster;

CostFunction(Configuration c) {
public CostFunction(Configuration c) {
}

boolean isNeeded() {
Expand Down Expand Up @@ -1027,7 +1071,7 @@ void postAction(Action action) {
protected void regionMoved(int region, int oldServer, int newServer) {
}

abstract double cost();
protected abstract double cost();

@SuppressWarnings("checkstyle:linelength")
/**
Expand Down Expand Up @@ -1124,7 +1168,7 @@ static class MoveCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
DEFAULT_MAX_MOVES);
Expand Down Expand Up @@ -1159,7 +1203,7 @@ static class RegionCountSkewCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
Expand Down Expand Up @@ -1191,7 +1235,7 @@ static class PrimaryRegionCountSkewCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
if (!cluster.hasRegionReplicas) {
return 0;
}
Expand Down Expand Up @@ -1228,7 +1272,7 @@ static class TableSkewCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;
Expand Down Expand Up @@ -1311,7 +1355,7 @@ protected void regionMoved(int region, int oldServer, int newServer) {
}

@Override
double cost() {
protected double cost() {
return 1 - locality;
}

Expand Down Expand Up @@ -1389,7 +1433,7 @@ void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
}

@Override
double cost() {
protected double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}
Expand Down Expand Up @@ -1581,7 +1625,7 @@ boolean isNeeded() {
}

@Override
double cost() {
protected double cost() {
if (maxCost <= 0) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.conf.Configuration;

public class DummyCostFunction extends StochasticLoadBalancer.CostFunction {
public DummyCostFunction(Configuration c) {
super(c);
}

@Override
protected double cost() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -121,7 +122,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
};

private ServerMetrics mockServerMetricsWithCpRequests(ServerName server,
List<RegionInfo> regionsOnServer, long cpRequestCount) {
List<RegionInfo> regionsOnServer,
long cpRequestCount) {
ServerMetrics serverMetrics = mock(ServerMetrics.class);
Map<byte[], RegionMetrics> regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(RegionInfo info : regionsOnServer){
Expand Down Expand Up @@ -457,6 +459,17 @@ public void testLosingRs() throws Exception {
assertNull(plans);
}

@Test
public void testAdditionalCostFunction() {
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
DummyCostFunction.class.getName());

loadBalancer.setConf(conf);
assertTrue(Arrays.
asList(loadBalancer.getCostFunctionNames()).
contains(DummyCostFunction.class.getSimpleName()));
}

// This mock allows us to test the LocalityCostFunction
private class MockCluster extends BaseLoadBalancer.Cluster {

Expand Down

0 comments on commit 836f269

Please sign in to comment.