Skip to content

Commit

Permalink
HBASE-26172 Deprecated MasterRegistry and allow getBootstrapNodes to …
Browse files Browse the repository at this point in the history
…return master address instead of region server
  • Loading branch information
Apache9 committed Aug 7, 2021
1 parent a79a9cc commit 9b19440
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
* it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
* @deprecated Since 2.5.0, will be removed in 4.0.0. Use {@link RpcConnectionRegistry} instead.
*/
@Deprecated
@InterfaceAudience.Private
public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ class RegistryEndpointsRefresher {

private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.rpc_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;

public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.rpc_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;

private final Thread thread;
Expand Down Expand Up @@ -79,6 +75,8 @@ private void mainLoop() {
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
if (waitTime <= 0) {
// we are going to refresh, reset this flag
refreshNow = false;
break;
}
try {
Expand All @@ -89,8 +87,6 @@ private void mainLoop() {
continue;
}
}
// we are going to refresh, reset this flag
refreshNow = false;
}
LOG.debug("Attempting to refresh registry end points");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {

/** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public class TestRegistryEndpointsRefresher {

private Configuration conf;
private RegistryEndpointsRefresher refresher;
private AtomicInteger getMastersCallCounter;
private AtomicInteger refreshCallCounter;
private CopyOnWriteArrayList<Long> callTimestamps;

@Before
public void setUp() {
conf = HBaseConfiguration.create();
getMastersCallCounter = new AtomicInteger(0);
refreshCallCounter = new AtomicInteger(0);
callTimestamps = new CopyOnWriteArrayList<>();
}

Expand All @@ -70,7 +70,7 @@ public void tearDown() {
}

private void refresh() {
getMastersCallCounter.incrementAndGet();
refreshCallCounter.incrementAndGet();
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}

Expand All @@ -86,8 +86,8 @@ private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
public void testPeriodicMasterEndPointRefresh() throws IOException {
// Refresh every 1 second.
createAndStartRefresher(1, 0);
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
// Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
}

@Test
Expand All @@ -101,10 +101,10 @@ public void testDurationBetweenRefreshes() throws IOException {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
// Overall wait time is 10000 ms, so the number of requests should be <=10
// Actual calls to getMasters() should be much lower than the refresh count.
assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
// Actual calls to refresh should be much lower than the refresh count.
assertTrue(String.valueOf(refreshCallCounter.get()), refreshCallCounter.get() <= 20);
assertTrue(callTimestamps.size() > 0);
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
// Verify that the delta between subsequent refresh is at least 1sec as configured.
for (int i = 1; i < callTimestamps.size() - 1; i++) {
long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
// Few ms cushion to account for any env jitter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
Expand Down Expand Up @@ -3046,13 +3044,16 @@ public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequ
}

@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
builder.addServerName(ProtobufUtil.toServerName(sn));
}
return builder.build();
protected List<ServerName> getRegionServers() {
return master.getServerManager().getOnlineServersList();
}

@Override
protected List<ServerName> getMasters() {
List<ServerName> list = new ArrayList<>();
master.getActiveMaster().ifPresent(list::add);
list.addAll(master.getBackupMasters());
return list;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -153,6 +154,7 @@
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -308,18 +310,35 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;

/*
/**
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
"hbase.rpc.rows.size.threshold.reject";

/*
/**
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
*/
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;

/**
* Determine the bootstrap nodes we want to return to the client connection registry.
* <ul>
* <li>{@link #MASTER}: return masters as bootstrap nodes.</li>
* <li>{@link #REGIONSERVER}: return region servers as bootstrap nodes.</li>
* <li>{@link #ALL}: return both masters and region servers as bootstrap nodes.</li>
* </ul>
*/
public enum ConnectionRegistryBootstrapNodeType {
MASTER, REGIONSERVER, ALL
}

public static final String CLIENT_BOOTSTRAP_NODE_TYPE = "hbase.client.bootstrap.node_type";

public static final ConnectionRegistryBootstrapNodeType DEFAULT_CLIENT_BOOTSTRAP_NODE_TYPE =
ConnectionRegistryBootstrapNodeType.REGIONSERVER;

// Request counter. (Includes requests that are not serviced by regions.)
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
final LongAdder requestCount = new LongAdder();
Expand Down Expand Up @@ -4102,21 +4121,17 @@ public GetActiveMasterResponse getActiveMaster(RpcController controller,
@Override
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
throws ServiceException {
try {
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
}
for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
}
for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
}
return builder.build();
}

@Override
Expand All @@ -4130,12 +4145,43 @@ public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController contr
return builder.build();
}

protected List<ServerName> getRegionServers() {
return regionServer.getRegionServerAddressTracker().getRegionServers();
}

protected List<ServerName> getMasters() {
List<ServerName> list = new ArrayList<>();
MasterAddressTracker tracker = regionServer.getMasterAddressTracker();
ServerName activeMaster = tracker.getMasterAddress();
if (activeMaster != null) {
list.add(activeMaster);
}
list.addAll(tracker.getBackupMasters());
return list;
}

@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
ConnectionRegistryBootstrapNodeType type =
ConnectionRegistryBootstrapNodeType.valueOf(regionServer.getConfiguration()
.get(CLIENT_BOOTSTRAP_NODE_TYPE, DEFAULT_CLIENT_BOOTSTRAP_NODE_TYPE.name()));
Stream<ServerName> bootstrapNodes;
switch (type) {
case MASTER:
bootstrapNodes = getMasters().stream();
break;
case REGIONSERVER:
bootstrapNodes = getRegionServers().stream();
break;
case ALL:
bootstrapNodes = Stream.concat(getMasters().stream(), getRegionServers().stream());
break;
default:
throw new IllegalArgumentException("Unknown bootstrap node type:" + type);
}
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
regionServer.getRegionServerAddressTracker().getRegionServers().stream()
.map(ProtobufUtil::toServerName).forEach(builder::addServerName);
bootstrapNodes.map(ProtobufUtil::toServerName).forEach(builder::addServerName);
return builder.build();
}
}
Loading

0 comments on commit 9b19440

Please sign in to comment.