Skip to content

Commit

Permalink
HBASE-25739 TableSkewCostFunction need to use aggregated deviation - …
Browse files Browse the repository at this point in the history
…backport

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: stack <stack@duboce.net>
Reviewed-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
clarax authored and saintstack committed Jul 13, 2021
1 parent fa8bc25 commit dfc9ac8
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class BalancerClusterState {
int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; // regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
int[] numRegionsPerTable; // tableIndex -> region count
double[] meanRegionsPerTable; // mean region count per table
double[] regionSkewByTable; // skew on RS per by table
double[] minRegionSkewByTable; // min skew on RS per by table
double[] maxRegionSkewByTable; // max skew on RS per by table
int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS
int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; // whether there is regions with replicas
Expand Down Expand Up @@ -290,7 +295,9 @@ public String getRack(ServerName server) {
}

numTables = tables.size();
LOG.debug("Number of tables={}", numTables);
numRegionsPerServerPerTable = new int[numServers][numTables];
numRegionsPerTable = new int[numTables];

for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
Expand All @@ -301,15 +308,26 @@ public String getRack(ServerName server) {
for (int i = 0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
numRegionsPerTable[regionIndexToTableIndex[i]]++;
}
}

numMaxRegionsPerTable = new int[numTables];
// Avoid repeated computation for planning
meanRegionsPerTable = new double[numTables];
regionSkewByTable = new double[numTables];
maxRegionSkewByTable = new double[numTables];
minRegionSkewByTable = new double[numTables];

for (int i = 0; i < numTables; i++) {
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
minRegionSkewByTable[i] += DoubleArrayCost.getMinSkew(numRegionsPerTable[i], numServers);
maxRegionSkewByTable[i] += DoubleArrayCost.getMaxSkew(numRegionsPerTable[i], numServers);
}

for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
for (int tableIdx = 0; tableIdx < aNumRegionsPerServerPerTable.length; tableIdx++) {
regionSkewByTable[tableIdx] +=
Math.abs(aNumRegionsPerServerPerTable[tableIdx] - meanRegionsPerTable[tableIdx]);
}
}

Expand Down Expand Up @@ -671,22 +689,13 @@ void regionMoved(int region, int oldServer, int newServer) {
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
// update regionSkewPerTable for the move from old server
regionSkewByTable[tableIndex] += getSkewChangeFor(oldServer, tableIndex, -1);
}
numRegionsPerServerPerTable[newServer][tableIndex]++;

// check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex]
+ 1) == numMaxRegionsPerTable[tableIndex]) {
// recompute maxRegionsPerTable since the previous value was coming from the old server
numMaxRegionsPerTable[tableIndex] = 0;
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
// update regionSkewPerTable for the move to new server
regionSkewByTable[tableIndex] += getSkewChangeFor(newServer, tableIndex, 1);

// update for servers
int primary = regionIndexToPrimaryIndex[region];
Expand Down Expand Up @@ -856,10 +865,18 @@ public String toString() {
.append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
.append(Arrays.deepToString(regionsPerServer));

desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
desc.append(", regionSkewByTable=").append(Arrays.toString(regionSkewByTable))
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
.append('}');
return desc.toString();
}
}

private double getSkewChangeFor(int serverIndex, int tableIndex, double regionCountChange) {
double curSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
meanRegionsPerTable[tableIndex]);
double oldSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
regionCountChange - meanRegionsPerTable[tableIndex]);
return curSkew - oldSkew;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
@InterfaceAudience.Private
abstract class CostFunction {

public static final double COST_EPSILON = 0.0001;

private float multiplier = 0;

protected BalancerClusterState cluster;
Expand Down Expand Up @@ -89,13 +91,14 @@ protected void regionMoved(int region, int oldServer, int newServer) {
* @return The scaled value.
*/
protected static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
if (max <= min || value <= min
|| Math.abs(max - min) <= COST_EPSILON || Math.abs(value - min) <= COST_EPSILON) {
return 0;
}
if ((max - min) == 0) {
if (max <= min || Math.abs(max - min) <= COST_EPSILON) {
return 0;
}

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,14 @@ private static double computeCost(double[] stats) {
double count = stats.length;
double mean = total / count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = CostFunction.scale(min, max, totalCost);
return scaled;
return CostFunction.scale(getMinSkew(total, count),
getMaxSkew(total, count), totalCost);
}

private static double getSum(double[] stats) {
Expand All @@ -106,4 +89,34 @@ private static double getSum(double[] stats) {
}
return total;
}

/**
* Return the min skew of distribution
* @param total is total number of regions
*/
public static double getMinSkew(double total, double numServers) {
double mean = total / numServers;
// It's possible that there aren't enough regions to go around
double min;
if (numServers > total) {
min = ((numServers - total) * mean + (1 - mean) * total) ;
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
}
return min;
}

/**
* Return the max deviation of distribution
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
* a zero sum cost for this to make sense.
* @param total is total number of regions
*/
public static double getMaxSkew(double total, double numServers) {
double mean = total / numServers;
return (total - mean) + (numServers - 1) * mean;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private int stepsPerRegion = 800;
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private float minCostNeedBalance = 0.025f;
private boolean isBalancerDecisionRecording = false;
private boolean isBalancerRejectionRecording = false;

Expand Down Expand Up @@ -259,7 +259,8 @@ protected void loadConf(Configuration conf) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
}

LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps +
", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ class TableSkewCostFunction extends CostFunction {

@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;

for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
double cost = 0;
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
cost += scale(cluster.minRegionSkewByTable[tableIdx],
cluster.maxRegionSkewByTable[tableIdx], cluster.regionSkewByTable[tableIdx]);
}

return scale(min, max, value);
return cost;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static void beforeAllTests() throws Exception {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setFloat("hbase.regions.slop", 0.0f);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
conf.setLong(StochasticLoadBalancer.MAX_RUNNING_TIME_KEY, 3 * 60 * 1000L);
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
loadBalancer = new StochasticLoadBalancer();
MasterServices services = mock(MasterServices.class);
when(services.getConfiguration()).thenReturn(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception {

// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// check that the numMaxRegionsPerTable for "table" has increased to 2
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
// check that the regionSkewByTable for "table" has increased to 2
assertEquals(2, cluster.regionSkewByTable[0], 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000); // 3 min
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public static void beforeAllTests() throws IOException {
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
BalancerTestBase.conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
BalancerTestBase.conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
HeterogeneousRegionCountCostFunction.class.getName());
// Need to ensure test dir has been created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public void testLargeCluster() {
int numRegionsPerServer = 80; // all servers except one
int numTables = 100;
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
}

0 comments on commit dfc9ac8

Please sign in to comment.