Skip to content

Commit

Permalink
HBASE-25926 Cleanup MetaTableAccessor references in FavoredNodeBalanc…
Browse files Browse the repository at this point in the history
…er related code (#3313)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
  • Loading branch information
Apache9 authored May 27, 2021
1 parent a22e418 commit 63141bf
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -55,6 +54,8 @@

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes;
Expand All @@ -74,7 +75,6 @@ public class FavoredNodeAssignmentHelper {
// This map serves as a cache for rack to sn lookups. The num of
// region server entries might not match with that is in servers.
private Map<String, String> regionServerToRackMap;
private Random random;
private List<ServerName> servers;
public static final byte [] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn");
public final static short FAVORED_NODES_NUM = 3;
Expand All @@ -91,7 +91,6 @@ public FavoredNodeAssignmentHelper(final List<ServerName> servers,
this.rackToRegionServerMap = new HashMap<>();
this.regionServerToRackMap = new HashMap<>();
this.uniqueRackList = new ArrayList<>();
this.random = new Random();
}

// Always initialize() when FavoredNodeAssignmentHelper is constructed.
Expand Down Expand Up @@ -120,80 +119,58 @@ public void initialize() {
* Update meta table with favored nodes info
* @param regionToFavoredNodes map of RegionInfo's to their favored nodes
* @param connection connection to be used
* @throws IOException
*/
public static void updateMetaWithFavoredNodesInfo(
Map<RegionInfo, List<ServerName>> regionToFavoredNodes,
Connection connection) throws IOException {
Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Connection connection)
throws IOException {
List<Put> puts = new ArrayList<>();
for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue());
Put put = makePut(entry.getKey(), entry.getValue());
if (put != null) {
puts.add(put);
}
}
MetaTableAccessor.putsToMetaTable(connection, puts);
LOG.info("Added " + puts.size() + " regions in META");
try (Table table = connection.getTable(TableName.META_TABLE_NAME)) {
table.put(puts);
}
LOG.info("Added " + puts.size() + " region favored nodes in META");
}

/**
* Update meta table with favored nodes info
* @param regionToFavoredNodes
* @param conf
* @throws IOException
*/
public static void updateMetaWithFavoredNodesInfo(
Map<RegionInfo, List<ServerName>> regionToFavoredNodes,
Configuration conf) throws IOException {
List<Put> puts = new ArrayList<>();
for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue());
if (put != null) {
puts.add(put);
}
}
Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Configuration conf) throws IOException {
// Write the region assignments to the meta table.
// TODO: See above overrides take a Connection rather than a Configuration only the
// Connection is a short circuit connection. That is not going to good in all cases, when
// master and meta are not colocated. Fix when this favored nodes feature is actually used
// someday.
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) {
metaTable.put(puts);
}
try (Connection conn = ConnectionFactory.createConnection(conf)) {
updateMetaWithFavoredNodesInfo(regionToFavoredNodes, conn);
}
LOG.info("Added " + puts.size() + " regions in META");
}

/**
* Generates and returns a Put containing the region info for the catalog table and the servers
* @return Put object
*/
private static Put makePutFromRegionInfo(RegionInfo regionInfo, List<ServerName> favoredNodeList)
throws IOException {
Put put = null;
if (favoredNodeList != null) {
long time = EnvironmentEdgeManager.currentTime();
put = MetaTableAccessor.makePutFromRegionInfo(regionInfo, time);
byte[] favoredNodes = getFavoredNodes(favoredNodeList);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(FAVOREDNODES_QUALIFIER)
.setTimestamp(time)
.setType(Type.Put)
.setValue(favoredNodes)
.build());
LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
favoredNodeList);
private static Put makePut(RegionInfo regionInfo, List<ServerName> favoredNodeList)
throws IOException {
if (CollectionUtils.isEmpty(favoredNodeList)) {
return null;
}
long time = EnvironmentEdgeManager.currentTime();
Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
byte[] favoredNodes = getFavoredNodes(favoredNodeList);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY).setQualifier(FAVOREDNODES_QUALIFIER).setTimestamp(time)
.setType(Cell.Type.Put).setValue(favoredNodes).build());
LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
favoredNodeList);
return put;
}

/**
* Convert PB bytes to ServerName.
* @param favoredNodes The PB'ed bytes of favored nodes
* @return the array of {@link ServerName} for the byte array of favored nodes.
* @throws IOException
*/
public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException {
FavoredNodes f = FavoredNodes.parseFrom(favoredNodes);
Expand Down Expand Up @@ -235,7 +212,7 @@ public void placePrimaryRSAsRoundRobin(Map<ServerName, List<RegionInfo>> assignm
Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) {
List<String> rackList = new ArrayList<>(rackToRegionServerMap.size());
rackList.addAll(rackToRegionServerMap.keySet());
int rackIndex = random.nextInt(rackList.size());
int rackIndex = ThreadLocalRandom.current().nextInt(rackList.size());
int maxRackSize = 0;
for (Map.Entry<String,List<ServerName>> r : rackToRegionServerMap.entrySet()) {
if (r.getValue().size() > maxRackSize) {
Expand All @@ -244,7 +221,7 @@ public void placePrimaryRSAsRoundRobin(Map<ServerName, List<RegionInfo>> assignm
}
int numIterations = 0;
// Initialize the current processing host index.
int serverIndex = random.nextInt(maxRackSize);
int serverIndex = ThreadLocalRandom.current().nextInt(maxRackSize);
for (RegionInfo regionInfo : regions) {
List<ServerName> currentServerList;
String rackName;
Expand Down Expand Up @@ -589,7 +566,7 @@ protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerS
}

ServerName randomServer = null;
int randomIndex = random.nextInt(serversToChooseFrom.size());
int randomIndex = ThreadLocalRandom.current().nextInt(serversToChooseFrom.size());
int j = 0;
for (StartcodeAgnosticServerName sn : serversToChooseFrom) {
if (j == randomIndex) {
Expand All @@ -610,14 +587,14 @@ private ServerName getOneRandomServer(String rack) throws IOException {
return this.getOneRandomServer(rack, null);
}

protected String getOneRandomRack(Set<String> skipRackSet) throws IOException {
String getOneRandomRack(Set<String> skipRackSet) throws IOException {
if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) {
throw new IOException("Cannot randomly pick another random server");
}

String randomRack;
do {
int randomIndex = random.nextInt(this.uniqueRackList.size());
int randomIndex = ThreadLocalRandom.current().nextInt(this.uniqueRackList.size());
randomRack = this.uniqueRackList.get(randomIndex);
} while (skipRackSet.contains(randomRack));

Expand Down Expand Up @@ -771,7 +748,7 @@ private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredN
public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException {

List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
ServerName primary = servers.get(random.nextInt(servers.size()));
ServerName primary = servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE));

Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -98,78 +98,97 @@ public SnapshotOfRegionAssignmentFromMeta(Connection connection, Set<TableName>
this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
}

private void processMetaRecord(Result result) throws IOException {
if (result == null || result.isEmpty()) {
return;
}
RegionLocations rl = CatalogFamilyFormat.getRegionLocations(result);
if (rl == null) {
return;
}
RegionInfo hri = rl.getRegionLocation(0).getRegion();
if (hri == null) {
return;
}
if (hri.getTable() == null) {
return;
}
if (disabledTables.contains(hri.getTable())) {
return;
}
// Are we to include split parents in the list?
if (excludeOfflinedSplitParents && hri.isSplit()) {
return;
}
HRegionLocation[] hrls = rl.getRegionLocations();

// Add the current assignment to the snapshot for all replicas
for (int i = 0; i < hrls.length; i++) {
if (hrls[i] == null) {
continue;
}
hri = hrls[i].getRegion();
if (hri == null) {
continue;
}
addAssignment(hri, hrls[i].getServerName());
addRegion(hri);
}

hri = rl.getRegionLocation(0).getRegion();
// the code below is to handle favored nodes
byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
if (favoredNodes == null) {
return;
}
// Add the favored nodes into assignment plan
ServerName[] favoredServerList = FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
// Add the favored nodes into assignment plan
existingAssignmentPlan.updateFavoredNodesMap(hri, Arrays.asList(favoredServerList));

/*
* Typically there should be FAVORED_NODES_NUM favored nodes for a region in meta. If there is
* less than FAVORED_NODES_NUM, lets use as much as we can but log a warning.
*/
if (favoredServerList.length != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
LOG.warn("Insufficient favored nodes for region " + hri + " fn: " +
Arrays.toString(favoredServerList));
}
for (int i = 0; i < favoredServerList.length; i++) {
if (i == PRIMARY.ordinal()) {
addPrimaryAssignment(hri, favoredServerList[i]);
}
if (i == SECONDARY.ordinal()) {
addSecondaryAssignment(hri, favoredServerList[i]);
}
if (i == TERTIARY.ordinal()) {
addTeritiaryAssignment(hri, favoredServerList[i]);
}
}
}
/**
* Initialize the region assignment snapshot by scanning the hbase:meta table
* @throws IOException
*/
public void initialize() throws IOException {
LOG.info("Start to scan the hbase:meta for the current region assignment " +
"snappshot");
// TODO: at some point this code could live in the MetaTableAccessor
ClientMetaTableAccessor.Visitor v = new ClientMetaTableAccessor.Visitor() {
@Override
public boolean visit(Result result) throws IOException {
LOG.info("Start to scan the hbase:meta for the current region assignment " + "snappshot");
// Scan hbase:meta to pick up user regions
try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = metaTable.getScanner(HConstants.CATALOG_FAMILY)) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
try {
if (result == null || result.isEmpty()) return true;
RegionLocations rl = CatalogFamilyFormat.getRegionLocations(result);
if (rl == null) return true;
RegionInfo hri = rl.getRegionLocation(0).getRegion();
if (hri == null) return true;
if (hri.getTable() == null) return true;
if (disabledTables.contains(hri.getTable())) {
return true;
}
// Are we to include split parents in the list?
if (excludeOfflinedSplitParents && hri.isSplit()) return true;
HRegionLocation[] hrls = rl.getRegionLocations();

// Add the current assignment to the snapshot for all replicas
for (int i = 0; i < hrls.length; i++) {
if (hrls[i] == null) continue;
hri = hrls[i].getRegion();
if (hri == null) continue;
addAssignment(hri, hrls[i].getServerName());
addRegion(hri);
}

hri = rl.getRegionLocation(0).getRegion();
// the code below is to handle favored nodes
byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
if (favoredNodes == null) return true;
// Add the favored nodes into assignment plan
ServerName[] favoredServerList =
FavoredNodeAssignmentHelper.getFavoredNodesList(favoredNodes);
// Add the favored nodes into assignment plan
existingAssignmentPlan.updateFavoredNodesMap(hri,
Arrays.asList(favoredServerList));

/*
* Typically there should be FAVORED_NODES_NUM favored nodes for a region in meta. If
* there is less than FAVORED_NODES_NUM, lets use as much as we can but log a warning.
*/
if (favoredServerList.length != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
LOG.warn("Insufficient favored nodes for region " + hri + " fn: " + Arrays
.toString(favoredServerList));
}
for (int i = 0; i < favoredServerList.length; i++) {
if (i == PRIMARY.ordinal()) addPrimaryAssignment(hri, favoredServerList[i]);
if (i == SECONDARY.ordinal()) addSecondaryAssignment(hri, favoredServerList[i]);
if (i == TERTIARY.ordinal()) addTeritiaryAssignment(hri, favoredServerList[i]);
}
return true;
processMetaRecord(result);
} catch (RuntimeException e) {
LOG.error("Catche remote exception " + e.getMessage() +
" when processing" + result);
LOG.error("Catch remote exception " + e.getMessage() + " when processing" + result);
throw e;
}
}
};
// Scan hbase:meta to pick up user regions
MetaTableAccessor.fullScanRegions(connection, v);
//regionToRegionServerMap = regions;
LOG.info("Finished to scan the hbase:meta for the current region assignment" +
"snapshot");
}
LOG.info("Finished to scan the hbase:meta for the current region assignment" + "snapshot");
}

private void addRegion(RegionInfo regionInfo) {
Expand Down
Loading

0 comments on commit 63141bf

Please sign in to comment.