Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26172 Deprecated MasterRegistry #3566

Merged
merged 1 commit into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
* it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
* @deprecated Since 2.5.0, will be removed in 4.0.0. Use {@link RpcConnectionRegistry} instead.
*/
@Deprecated
@InterfaceAudience.Private
public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ class RegistryEndpointsRefresher {

private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.rpc_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;

public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.rpc_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;

private final Thread thread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {

/** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hbase.client.rpc_registry.hedged.fanout is a new config recently added. It is committed to a branch only currently? So it is fine changing the name? Does the config get doc'd in release notes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, no actual release yet, this is a missing part in the previous commit, @bharathv suggest that we do not use 'rpc_registry', just use 'bootstrap', but I forgot to change this one.
No release note yet. We could mention it in the release note for this issue. Or better describe it in the ref guide? Usually you do not need to change this value.


public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public class TestRegistryEndpointsRefresher {

private Configuration conf;
private RegistryEndpointsRefresher refresher;
private AtomicInteger getMastersCallCounter;
private AtomicInteger refreshCallCounter;
private CopyOnWriteArrayList<Long> callTimestamps;

@Before
public void setUp() {
conf = HBaseConfiguration.create();
getMastersCallCounter = new AtomicInteger(0);
refreshCallCounter = new AtomicInteger(0);
callTimestamps = new CopyOnWriteArrayList<>();
}

Expand All @@ -70,7 +70,7 @@ public void tearDown() {
}

private void refresh() {
getMastersCallCounter.incrementAndGet();
refreshCallCounter.incrementAndGet();
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}

Expand All @@ -86,8 +86,8 @@ private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
public void testPeriodicMasterEndPointRefresh() throws IOException {
// Refresh every 1 second.
createAndStartRefresher(1, 0);
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
// Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
}

@Test
Expand All @@ -101,10 +101,10 @@ public void testDurationBetweenRefreshes() throws IOException {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
// Overall wait time is 10000 ms, so the number of requests should be <=10
// Actual calls to getMasters() should be much lower than the refresh count.
assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
// Actual calls to refresh should be much lower than the refresh count.
assertTrue(String.valueOf(refreshCallCounter.get()), refreshCallCounter.get() <= 20);
assertTrue(callTimestamps.size() > 0);
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
// Verify that the delta between subsequent refresh is at least 1sec as configured.
for (int i = 1; i < callTimestamps.size() - 1; i++) {
long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
// Few ms cushion to account for any env jitter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2700,10 +2700,21 @@ public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOExcept
return status;
}

List<ServerName> getBackupMasters() {
@Override
public Optional<ServerName> getActiveMaster() {
return activeMasterManager.getActiveMasterServerName();
}

@Override
public List<ServerName> getBackupMasters() {
return activeMasterManager.getBackupMasters();
}

@Override
public List<ServerName> getRegionServers() {
return serverManager.getOnlineServersList();
}

/**
* The set of loaded coprocessors is stored in a static set. Since it's
* statically allocated, it does not require that HMaster's cpHost be
Expand Down Expand Up @@ -3848,10 +3859,6 @@ public String getClusterId() {
return cachedClusterId.getFromCacheOrFetch();
}

public Optional<ServerName> getActiveMaster() {
return activeMasterManager.getActiveMasterServerName();
}

@Override
public void runReplicationBarrierCleaner() {
ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -372,15 +371,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
Expand Down Expand Up @@ -408,8 +398,7 @@
@SuppressWarnings("deprecation")
public class MasterRpcServices extends RSRpcServices implements
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
LockService.BlockingInterface, HbckService.BlockingInterface,
ClientMetaService.BlockingInterface {
LockService.BlockingInterface, HbckService.BlockingInterface {

private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
private static final Logger AUDITLOG =
Expand Down Expand Up @@ -3003,58 +2992,6 @@ private boolean shouldSubmitSCP(ServerName serverName) {
return true;
}

// Override this method since for backup master we will not set the clusterId field, which means
// we need to find another way to get cluster id for backup masters.
@Override
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
throws ServiceException {
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
String clusterId = master.getClusterId();
if (clusterId != null) {
resp.setClusterId(clusterId);
}
return resp.build();
}

// Override this method since we use ActiveMasterManager to get active master on HMaster while in
// HRegionServer we use MasterAddressTracker
@Override
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
return resp.build();
}

// Override this method since we use ActiveMasterManager to get backup masters on HMaster while in
// HRegionServer we use MasterAddressTracker
@Override
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
throws ServiceException {
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
// Active master
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
for (ServerName backupMaster : master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
return resp.build();
}

@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
builder.addServerName(ProtobufUtil.toServerName(sn));
}
return builder.build();
}

@Override
public GetRSGroupInfoResponse getRSGroupInfo(RpcController controller,
GetRSGroupInfoRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4007,11 +4007,19 @@ public long getRetryPauseTime() {
return this.retryPauseTime;
}

public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
public Optional<ServerName> getActiveMaster() {
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
}

public List<ServerName> getBackupMasters() {
return masterAddressTracker.getBackupMasters();
}

RegionServerAddressTracker getRegionServerAddressTracker() {
return regionServerAddressTracker;
public List<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers();
}

public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;

/*
/**
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
"hbase.rpc.rows.size.threshold.reject";

/*
/**
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
*/
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
Expand Down Expand Up @@ -4092,31 +4092,22 @@ public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdR
public GetActiveMasterResponse getActiveMaster(RpcController controller,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.setServerName(ProtobufUtil.toServerName(activeMaster));
}
regionServer.getActiveMaster()
.ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
return builder.build();
}

@Override
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
throws ServiceException {
try {
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
}
for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
regionServer.getActiveMaster()
.ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
regionServer.getBackupMasters()
.forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
return builder.build();
}

@Override
Expand All @@ -4131,11 +4122,11 @@ public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController contr
}

@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
regionServer.getRegionServerAddressTracker().getRegionServers().stream()
.map(ProtobufUtil::toServerName).forEach(builder::addServerName);
regionServer.getRegionServers().stream().map(ProtobufUtil::toServerName)
.forEach(builder::addServerName);
return builder.build();
}
}
Loading