Skip to content

Commit

Permalink
Merge branch 'HBASE-27389' into HBASE-27389-master-rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
wchevreuil authored Nov 2, 2023
2 parents fa4c896 + 69d980a commit 917bfce
Show file tree
Hide file tree
Showing 27 changed files with 1,731 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,6 +115,12 @@ class BalancerClusterState {
private float[][] rackLocalities;
// Maps localityType -> region -> [server|rack]Index with highest locality
private int[][] regionsToMostLocalEntities;
// Maps region -> serverIndex -> regionCacheRatio of a region on a server
private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
// Maps regionIndex -> serverIndex with best region cache ratio
private int[] regionServerIndexWithBestRegionCachedRatio;
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;

static class DefaultRackManager extends RackManager {
@Override
Expand All @@ -125,13 +132,20 @@ public String getRack(ServerName server) {
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager);
this(null, clusterState, loads, regionFinder, rackManager, null);
}

protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
}

@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
Expand All @@ -145,6 +159,8 @@ public String getRack(ServerName server) {
tables = new ArrayList<>();
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();

this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;

numRegions = 0;

List<List<Integer>> serversPerHostList = new ArrayList<>();
Expand Down Expand Up @@ -541,6 +557,142 @@ private void computeCachedLocalities() {

}

/**
* Returns the size of hFiles from the most recent RegionLoad for region
*/
public int getTotalRegionHFileSizeMB(int region) {
Deque<BalancerRegionLoad> load = regionLoads[region];
if (load == null) {
// This means, that the region has no actual data on disk
return 0;
}
return regionLoads[region].getLast().getRegionSizeMB();
}

/**
* Returns the weighted cache ratio of a region on the given region server
*/
public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
}

/**
* Returns the amount by which a region is cached on a given region server. If the region is not
* currently hosted on the given region server, then find out if it was previously hosted there
* and return the old cache ratio.
*/
protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
float regionCacheRatio = 0.0f;

// Get the current region cache ratio if the region is hosted on the server regionServerIndex
for (int regionIndex : regionsPerServer[regionServerIndex]) {
if (region != regionIndex) {
continue;
}

Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];

// The region is currently hosted on this region server. Get the region cache ratio for this
// region on this server
regionCacheRatio =
regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();

return regionCacheRatio;
}

// Region is not currently hosted on this server. Check if the region was cached on this
// server earlier. This can happen when the server was shutdown and the cache was persisted.
// Search using the region name and server name and not the index id and server id as these ids
// may change when a server is marked as dead or a new server is added.
String regionEncodedName = regions[region].getEncodedName();
ServerName serverName = servers[regionServerIndex];
if (
regionCacheRatioOnOldServerMap != null
&& regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
) {
Pair<ServerName, Float> cacheRatioOfRegionOnServer =
regionCacheRatioOnOldServerMap.get(regionEncodedName);
if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
if (LOG.isDebugEnabled()) {
LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
serverName, regionCacheRatio);
}
}
}
return regionCacheRatio;
}

/**
* Populate the maps containing information about how much a region is cached on a region server.
*/
private void computeRegionServerRegionCacheRatio() {
regionIndexServerIndexRegionCachedRatio = new HashMap<>();
regionServerIndexWithBestRegionCachedRatio = new int[numRegions];

for (int region = 0; region < numRegions; region++) {
float bestRegionCacheRatio = 0.0f;
int serverWithBestRegionCacheRatio = 0;
for (int server = 0; server < numServers; server++) {
float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
// A region with cache ratio 0 on a server means nothing. Hence, just make a note of
// cache ratio only if the cache ratio is greater than 0.
Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
}
if (regionCacheRatio > bestRegionCacheRatio) {
serverWithBestRegionCacheRatio = server;
// If the server currently hosting the region has equal cache ratio to a historical
// server, consider the current server to keep hosting the region
bestRegionCacheRatio = regionCacheRatio;
} else if (
regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
) {
// If two servers have same region cache ratio, then the server currently hosting the
// region
// should retain the region
serverWithBestRegionCacheRatio = server;
}
}
regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
Pair<Integer, Integer> regionServerPair =
new Pair<>(region, regionIndexToServerIndex[region]);
float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
if (tempRegionCacheRatio > bestRegionCacheRatio) {
LOG.warn(
"INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
+ "best region cache ratio {} on server {}",
regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
}
}
}

protected float getOrComputeRegionCacheRatio(int region, int server) {
if (
regionServerIndexWithBestRegionCachedRatio == null
|| regionIndexServerIndexRegionCachedRatio.isEmpty()
) {
computeRegionServerRegionCacheRatio();
}

Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
: 0.0f;
}

public int[] getOrComputeServerWithBestRegionCachedRatio() {
if (
regionServerIndexWithBestRegionCachedRatio == null
|| regionIndexServerIndexRegionCachedRatio.isEmpty()
) {
computeRegionServerRegionCacheRatio();
}
return regionServerIndexWithBestRegionCachedRatio;
}

/**
* Maps region index to rack index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ class BalancerRegionLoad {
private final long writeRequestsCount;
private final int memStoreSizeMB;
private final int storefileSizeMB;
private final int regionSizeMB;
private final float currentRegionPrefetchRatio;

BalancerRegionLoad(RegionMetrics regionMetrics) {
readRequestsCount = regionMetrics.getReadRequestCount();
cpRequestsCount = regionMetrics.getCpRequestCount();
writeRequestsCount = regionMetrics.getWriteRequestCount();
memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE);
storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE);
currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio();
}

public long getReadRequestsCount() {
Expand All @@ -62,4 +66,12 @@ public int getMemStoreSizeMB() {
public int getStorefileSizeMB() {
return storefileSizeMB;
}

public int getRegionSizeMB() {
return regionSizeMB;
}

public float getCurrentRegionCacheRatio() {
return currentRegionPrefetchRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List<ServerName> servers,
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager);
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
null);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
Expand Down
Loading

0 comments on commit 917bfce

Please sign in to comment.