Skip to content

Commit

Permalink
HBASE-26172 Deprecated MasterRegistry (#3566)
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
  • Loading branch information
Apache9 authored Aug 19, 2021
1 parent f2e2140 commit c8d9d4d
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
* it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
* @deprecated Since 2.5.0, will be removed in 4.0.0. Use {@link RpcConnectionRegistry} instead.
*/
@Deprecated
@InterfaceAudience.Private
public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ class RegistryEndpointsRefresher {

private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.rpc_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;

public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.rpc_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;

private final Thread thread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {

/** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public class TestRegistryEndpointsRefresher {

private Configuration conf;
private RegistryEndpointsRefresher refresher;
private AtomicInteger getMastersCallCounter;
private AtomicInteger refreshCallCounter;
private CopyOnWriteArrayList<Long> callTimestamps;

@Before
public void setUp() {
conf = HBaseConfiguration.create();
getMastersCallCounter = new AtomicInteger(0);
refreshCallCounter = new AtomicInteger(0);
callTimestamps = new CopyOnWriteArrayList<>();
}

Expand All @@ -70,7 +70,7 @@ public void tearDown() {
}

private void refresh() {
getMastersCallCounter.incrementAndGet();
refreshCallCounter.incrementAndGet();
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}

Expand All @@ -86,8 +86,8 @@ private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
public void testPeriodicMasterEndPointRefresh() throws IOException {
// Refresh every 1 second.
createAndStartRefresher(1, 0);
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
// Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
}

@Test
Expand All @@ -101,10 +101,10 @@ public void testDurationBetweenRefreshes() throws IOException {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
// Overall wait time is 10000 ms, so the number of requests should be <=10
// Actual calls to getMasters() should be much lower than the refresh count.
assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
// Actual calls to refresh should be much lower than the refresh count.
assertTrue(String.valueOf(refreshCallCounter.get()), refreshCallCounter.get() <= 20);
assertTrue(callTimestamps.size() > 0);
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
// Verify that the delta between subsequent refresh is at least 1sec as configured.
for (int i = 1; i < callTimestamps.size() - 1; i++) {
long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
// Few ms cushion to account for any env jitter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2700,10 +2700,21 @@ public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOExcept
return status;
}

List<ServerName> getBackupMasters() {
@Override
public Optional<ServerName> getActiveMaster() {
return activeMasterManager.getActiveMasterServerName();
}

@Override
public List<ServerName> getBackupMasters() {
return activeMasterManager.getBackupMasters();
}

@Override
public List<ServerName> getRegionServers() {
return serverManager.getOnlineServersList();
}

/**
* The set of loaded coprocessors is stored in a static set. Since it's
* statically allocated, it does not require that HMaster's cpHost be
Expand Down Expand Up @@ -3848,10 +3859,6 @@ public String getClusterId() {
return cachedClusterId.getFromCacheOrFetch();
}

public Optional<ServerName> getActiveMaster() {
return activeMasterManager.getActiveMasterServerName();
}

@Override
public void runReplicationBarrierCleaner() {
ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -372,15 +371,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
Expand Down Expand Up @@ -408,8 +398,7 @@
@SuppressWarnings("deprecation")
public class MasterRpcServices extends RSRpcServices implements
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
LockService.BlockingInterface, HbckService.BlockingInterface,
ClientMetaService.BlockingInterface {
LockService.BlockingInterface, HbckService.BlockingInterface {

private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
private static final Logger AUDITLOG =
Expand Down Expand Up @@ -3003,58 +2992,6 @@ private boolean shouldSubmitSCP(ServerName serverName) {
return true;
}

// Override this method since for backup master we will not set the clusterId field, which means
// we need to find another way to get cluster id for backup masters.
@Override
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
throws ServiceException {
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
String clusterId = master.getClusterId();
if (clusterId != null) {
resp.setClusterId(clusterId);
}
return resp.build();
}

// Override this method since we use ActiveMasterManager to get active master on HMaster while in
// HRegionServer we use MasterAddressTracker
@Override
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
return resp.build();
}

// Override this method since we use ActiveMasterManager to get backup masters on HMaster while in
// HRegionServer we use MasterAddressTracker
@Override
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
throws ServiceException {
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
// Active master
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
for (ServerName backupMaster : master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
return resp.build();
}

@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
builder.addServerName(ProtobufUtil.toServerName(sn));
}
return builder.build();
}

@Override
public GetRSGroupInfoResponse getRSGroupInfo(RpcController controller,
GetRSGroupInfoRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4007,11 +4007,19 @@ public long getRetryPauseTime() {
return this.retryPauseTime;
}

public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
public Optional<ServerName> getActiveMaster() {
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
}

public List<ServerName> getBackupMasters() {
return masterAddressTracker.getBackupMasters();
}

RegionServerAddressTracker getRegionServerAddressTracker() {
return regionServerAddressTracker;
public List<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers();
}

public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;

/*
/**
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
"hbase.rpc.rows.size.threshold.reject";

/*
/**
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
*/
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
Expand Down Expand Up @@ -4092,31 +4092,22 @@ public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdR
public GetActiveMasterResponse getActiveMaster(RpcController controller,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.setServerName(ProtobufUtil.toServerName(activeMaster));
}
regionServer.getActiveMaster()
.ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
return builder.build();
}

@Override
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
throws ServiceException {
try {
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
}
for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
regionServer.getActiveMaster()
.ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
regionServer.getBackupMasters()
.forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
return builder.build();
}

@Override
Expand All @@ -4131,11 +4122,11 @@ public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController contr
}

@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
regionServer.getRegionServerAddressTracker().getRegionServers().stream()
.map(ProtobufUtil::toServerName).forEach(builder::addServerName);
regionServer.getRegionServers().stream().map(ProtobufUtil::toServerName)
.forEach(builder::addServerName);
return builder.build();
}
}
Loading

0 comments on commit c8d9d4d

Please sign in to comment.