Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28186 Rebase CacheAwareBalance related commits into master branch #5500

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,6 +115,12 @@ class BalancerClusterState {
private float[][] rackLocalities;
// Maps localityType -> region -> [server|rack]Index with highest locality
private int[][] regionsToMostLocalEntities;
// Maps region -> serverIndex -> regionCacheRatio of a region on a server
private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
// Maps regionIndex -> serverIndex with best region cache ratio
private int[] regionServerIndexWithBestRegionCachedRatio;
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;

static class DefaultRackManager extends RackManager {
@Override
Expand All @@ -125,13 +132,20 @@ public String getRack(ServerName server) {
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager);
this(null, clusterState, loads, regionFinder, rackManager, null);
}

protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
}

@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
Expand All @@ -145,6 +159,8 @@ public String getRack(ServerName server) {
tables = new ArrayList<>();
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();

this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;

numRegions = 0;

List<List<Integer>> serversPerHostList = new ArrayList<>();
Expand Down Expand Up @@ -541,6 +557,142 @@ private void computeCachedLocalities() {

}

/**
* Returns the size of hFiles from the most recent RegionLoad for region
*/
public int getTotalRegionHFileSizeMB(int region) {
Deque<BalancerRegionLoad> load = regionLoads[region];
if (load == null) {
// This means, that the region has no actual data on disk
return 0;
}
return regionLoads[region].getLast().getRegionSizeMB();
}

/**
* Returns the weighted cache ratio of a region on the given region server
*/
public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
}

/**
* Returns the amount by which a region is cached on a given region server. If the region is not
* currently hosted on the given region server, then find out if it was previously hosted there
* and return the old cache ratio.
*/
protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
float regionCacheRatio = 0.0f;

// Get the current region cache ratio if the region is hosted on the server regionServerIndex
for (int regionIndex : regionsPerServer[regionServerIndex]) {
if (region != regionIndex) {
continue;
}

Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];

// The region is currently hosted on this region server. Get the region cache ratio for this
// region on this server
regionCacheRatio =
regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();

return regionCacheRatio;
}

// Region is not currently hosted on this server. Check if the region was cached on this
// server earlier. This can happen when the server was shutdown and the cache was persisted.
// Search using the region name and server name and not the index id and server id as these ids
// may change when a server is marked as dead or a new server is added.
String regionEncodedName = regions[region].getEncodedName();
ServerName serverName = servers[regionServerIndex];
if (
regionCacheRatioOnOldServerMap != null
&& regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
) {
Pair<ServerName, Float> cacheRatioOfRegionOnServer =
regionCacheRatioOnOldServerMap.get(regionEncodedName);
if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
if (LOG.isDebugEnabled()) {
LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
serverName, regionCacheRatio);
}
}
}
return regionCacheRatio;
}

/**
* Populate the maps containing information about how much a region is cached on a region server.
*/
private void computeRegionServerRegionCacheRatio() {
regionIndexServerIndexRegionCachedRatio = new HashMap<>();
regionServerIndexWithBestRegionCachedRatio = new int[numRegions];

for (int region = 0; region < numRegions; region++) {
float bestRegionCacheRatio = 0.0f;
int serverWithBestRegionCacheRatio = 0;
for (int server = 0; server < numServers; server++) {
float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
// A region with cache ratio 0 on a server means nothing. Hence, just make a note of
// cache ratio only if the cache ratio is greater than 0.
Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
}
if (regionCacheRatio > bestRegionCacheRatio) {
serverWithBestRegionCacheRatio = server;
// If the server currently hosting the region has equal cache ratio to a historical
// server, consider the current server to keep hosting the region
bestRegionCacheRatio = regionCacheRatio;
} else if (
regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
) {
// If two servers have same region cache ratio, then the server currently hosting the
// region
// should retain the region
serverWithBestRegionCacheRatio = server;
}
}
regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
Pair<Integer, Integer> regionServerPair =
new Pair<>(region, regionIndexToServerIndex[region]);
float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
if (tempRegionCacheRatio > bestRegionCacheRatio) {
LOG.warn(
"INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
+ "best region cache ratio {} on server {}",
regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
}
}
}

protected float getOrComputeRegionCacheRatio(int region, int server) {
if (
regionServerIndexWithBestRegionCachedRatio == null
|| regionIndexServerIndexRegionCachedRatio.isEmpty()
) {
computeRegionServerRegionCacheRatio();
}

Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
: 0.0f;
}

public int[] getOrComputeServerWithBestRegionCachedRatio() {
if (
regionServerIndexWithBestRegionCachedRatio == null
|| regionIndexServerIndexRegionCachedRatio.isEmpty()
) {
computeRegionServerRegionCacheRatio();
}
return regionServerIndexWithBestRegionCachedRatio;
}

/**
* Maps region index to rack index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ class BalancerRegionLoad {
private final long writeRequestsCount;
private final int memStoreSizeMB;
private final int storefileSizeMB;
private final int regionSizeMB;
private final float currentRegionPrefetchRatio;

BalancerRegionLoad(RegionMetrics regionMetrics) {
readRequestsCount = regionMetrics.getReadRequestCount();
cpRequestsCount = regionMetrics.getCpRequestCount();
writeRequestsCount = regionMetrics.getWriteRequestCount();
memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE);
storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE);
currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio();
}

public long getReadRequestsCount() {
Expand All @@ -62,4 +66,12 @@ public int getMemStoreSizeMB() {
public int getStorefileSizeMB() {
return storefileSizeMB;
}

public int getRegionSizeMB() {
return regionSizeMB;
}

public float getCurrentRegionCacheRatio() {
return currentRegionPrefetchRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List<ServerName> servers,
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager);
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
null);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
Expand Down
Loading