Skip to content

Commit

Permalink
HBASE-26649 Support meta replica LoadBalance mode for RegionLocator#g…
Browse files Browse the repository at this point in the history
…etAllRegionLocations() (#4442) (#4485)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
huaxiangsun authored Jun 3, 2022
1 parent 2e08c69 commit 976dbe8
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;

import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -295,7 +297,37 @@ private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultCon
}

CompletableFuture<Void> future = new CompletableFuture<Void>();
metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
// Get the region locator's meta replica mode.
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration()
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));

if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
addListener(metaTable.getDescriptor(), (desc, error) -> {
if (error != null) {
LOG.error("Failed to get meta table descriptor, error: ", error);
future.completeExceptionally(error);
return;
}

int numOfReplicas = desc.getRegionReplication();
if (numOfReplicas > 1) {
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);

// When the replicaId is 0, do not set to Consistency.TIMELINE
if (replicaId > 0) {
scan.setReplicaId(replicaId);
scan.setConsistency(Consistency.TIMELINE);
}
}
metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
});
} else {
if (metaReplicaMode == CatalogReplicaMode.HEDGED_READ) {
scan.setConsistency(Consistency.TIMELINE);
}
metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
}

return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
package org.apache.hadoop.hbase;

import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -33,7 +33,7 @@
* </ol>
*/
@InterfaceAudience.Private
enum CatalogReplicaMode {
public enum CatalogReplicaMode {
NONE {
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -697,15 +698,21 @@ public static void fullScanMetaAndPrint(Connection connection) throws IOExceptio
scanMeta(connection, null, null, QueryType.ALL, v);
}

public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
TableName tableName, CatalogReplicaMode metaReplicaMode) throws IOException {
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor, metaReplicaMode);
}

public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
TableName tableName) throws IOException {
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
scanMetaForTableRegions(connection, visitor, tableName, CatalogReplicaMode.NONE);
}

private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
final Visitor visitor) throws IOException {
final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException {
scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type),
type, maxRows, visitor);
type, null, maxRows, visitor, metaReplicaMode);

}

private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
Expand Down Expand Up @@ -749,12 +756,12 @@ public static void scanMeta(Connection connection, final Visitor visitor,
static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor)
throws IOException {
scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor, CatalogReplicaMode.NONE);
}

private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final Visitor visitor) throws IOException {
final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);

Expand All @@ -779,6 +786,25 @@ private static void scanMeta(Connection connection, @Nullable final byte[] start

int currentRow = 0;
try (Table metaTable = getMetaHTable(connection)) {
switch (metaReplicaMode) {
case LOAD_BALANCE:
int numOfReplicas = metaTable.getDescriptor().getRegionReplication();
if (numOfReplicas > 1) {
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);

// When the replicaId is 0, do not set to Consistency.TIMELINE
if (replicaId > 0) {
scan.setReplicaId(replicaId);
scan.setConsistency(Consistency.TIMELINE);
}
}
break;
case HEDGED_READ:
scan.setConsistency(Consistency.TIMELINE);
break;
default:
// Do nothing
}
try (ResultScanner scanner = metaTable.getScanner(scan)) {
Result data;
while ((data = scanner.next()) != null) {
Expand Down Expand Up @@ -2056,7 +2082,7 @@ public static List<String> getTableEncodedRegionNamesForSerialReplication(Connec
new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
list.add(RegionInfo.encodeRegionName(r.getRow()));
return true;
});
}, CatalogReplicaMode.NONE);
return list;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
Expand Down Expand Up @@ -110,7 +111,9 @@ public boolean visitInternal(Result result) throws IOException {
return true;
}
};
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName);
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(connection.getConfiguration()
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName, metaReplicaMode);
return regions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,8 @@ private void primaryMayIncreaseReplicaNoChange(final long[] before, final long[]
}

private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
// There are read requests increase for primary meta replica.
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);

// There are read requests incrase for meta replica regions.
for (int i = 1; i < after.length; i++) {
// There are read requests increase for all meta replica regions,
for (int i = 0; i < after.length; i++) {
assertTrue(after[i] > before[i]);
}
}
Expand All @@ -541,6 +538,7 @@ public void testHBaseMetaReplicaGets() throws Exception {
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
Expand Down Expand Up @@ -588,6 +586,16 @@ public void testHBaseMetaReplicaGets() throws Exception {
// There are more reads against all meta replica regions, including the primary region.
primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);

RegionLocator locator = tableForGet.getRegionLocator();

for (int j = 0; j < numOfMetaReplica * 3; j++) {
locator.getAllRegionLocations();
}

getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
primaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
readReqsForMetaReplicasAfterGetAllLocations);

// move one of regions so it meta cache may be invalid.
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());

Expand All @@ -597,7 +605,7 @@ public void testHBaseMetaReplicaGets() throws Exception {

// There are read requests increase for primary meta replica.
// For rest of meta replicas, there is no change as regionMove will tell the new location
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
readReqsForMetaReplicasAfterMove);
// Move region again.
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
Expand Down

0 comments on commit 976dbe8

Please sign in to comment.