Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26182 Allow disabling refresh of connection registry endpoint #3605

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateStubs(getBootstrapNodes(conf));
registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName,
minRefreshIntervalSecsConfigName, this::refreshStubs);
registryEndpointRefresher.start();
// could return null here is refresh interval is less than zero
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
}

protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* {@code minIntervalSecsConfigName} seconds apart.
*/
@InterfaceAudience.Private
class RegistryEndpointsRefresher {
final class RegistryEndpointsRefresher {

private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);

Expand All @@ -51,11 +51,7 @@ class RegistryEndpointsRefresher {
private boolean refreshNow = false;
private boolean stopped = false;

public void start() {
thread.start();
}

public synchronized void stop() {
synchronized void stop() {
stopped = true;
notifyAll();
}
Expand Down Expand Up @@ -108,18 +104,15 @@ public interface Refresher {
void refresh() throws IOException;
}

RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) {
periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
Refresher refresher) {
this.periodicRefreshMs = periodicRefreshMs;
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
this.refresher = refresher;
thread = new Thread(this::mainLoop);
thread.setName("Registry-endpoints-refresh-end-points");
thread.setDaemon(true);
this.refresher = refresher;
thread.start();
}

/**
Expand All @@ -130,4 +123,22 @@ synchronized void refreshNow() {
refreshNow = true;
notifyAll();
}

/**
* Create a {@link RegistryEndpointsRefresher}. If the interval secs configured via
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
* refreshing of endpoints.
*/
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) {
long periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
if (periodicRefreshMs <= 0) {
return null;
}
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -74,18 +75,24 @@ private void refresh() {
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}

private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
private void createRefresher(long intervalSecs, long minIntervalSecs) {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME,
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
refresher.start();
}

@Test
public void testPeriodicMasterEndPointRefresh() throws IOException {
public void testDisableRefresh() {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
}

@Test
public void testPeriodicEndpointRefresh() throws IOException {
// Refresh every 1 second.
createAndStartRefresher(1, 0);
createRefresher(1, 0);
// Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
}
Expand All @@ -94,7 +101,7 @@ public void testPeriodicMasterEndPointRefresh() throws IOException {
public void testDurationBetweenRefreshes() throws IOException {
// Disable periodic refresh
// A minimum duration of 1s between refreshes
createAndStartRefresher(Integer.MAX_VALUE, 1);
createRefresher(Integer.MAX_VALUE, 1);
// Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) {
refresher.refreshNow();
Expand Down