Skip to content

Commit

Permalink
HBASE-28663 Graceful shutdown of CanaryTool timeouts (#5991)
Browse files Browse the repository at this point in the history
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Mihir Monani <mmonani@salesforce.com>
  • Loading branch information
d-c-manning authored Jun 19, 2024
1 parent bd9053c commit 67cc820
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ public interface Sink {
long getWriteSuccessCount();

long incWriteSuccessCount();

void stop();

boolean isStopped();
}

/**
Expand All @@ -208,6 +212,7 @@ public static class StdOutSink implements Sink {
readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0);
private Map<String, String> readFailures = new ConcurrentHashMap<>();
private Map<String, String> writeFailures = new ConcurrentHashMap<>();
private volatile boolean stopped = false;

@Override
public long getReadFailureCount() {
Expand Down Expand Up @@ -268,6 +273,15 @@ public long getWriteSuccessCount() {
public long incWriteSuccessCount() {
return writeSuccessCount.incrementAndGet();
}

public void stop() {
stopped = true;
}

@Override
public boolean isStopped() {
return stopped;
}
}

/**
Expand Down Expand Up @@ -444,6 +458,9 @@ public ZookeeperTask(Connection connection, String host, String znode, int timeo

@Override
public Void call() throws Exception {
if (this.sink.isStopped()) {
return null;
}
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
Expand Down Expand Up @@ -498,6 +515,9 @@ public enum TaskType {

@Override
public Void call() {
if (this.sink.isStopped()) {
return null;
}
switch (taskType) {
case READ:
return read();
Expand Down Expand Up @@ -685,6 +705,9 @@ static class RegionServerTask implements Callable<Void> {

@Override
public Void call() {
if (this.sink.isStopped()) {
return null;
}
TableName tableName = null;
Table table = null;
Get get = null;
Expand Down Expand Up @@ -1075,6 +1098,7 @@ private int runMonitor(String[] monitorTargets) throws Exception {
if (currentTimeLength > timeout) {
LOG.error("The monitor is running too long (" + currentTimeLength
+ ") after timeout limit:" + timeout + " will be killed itself !!");
monitorThread.interrupt();
if (monitor.initialized) {
return TIMEOUT_ERROR_EXIT_CODE;
} else {
Expand Down Expand Up @@ -1113,6 +1137,15 @@ public Map<String, String> getWriteFailures() {
return sink.getWriteFailures();
}

/**
* Return a CanaryTool.Sink object containing the detailed results of the canary run. The Sink may
* not have been created if a Monitor thread is not yet running.
* @return the active Sink if one exists, null otherwise.
*/
public Sink getActiveSink() {
return sink;
}

private void printUsageAndExit() {
System.err.println(
"Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
Expand Down Expand Up @@ -1159,10 +1192,11 @@ private void printUsageAndExit() {

Sink getSink(Configuration configuration, Class clazz) {
// In test context, this.sink might be set. Use it if non-null. For testing.
return this.sink != null
? this.sink
: (Sink) ReflectionUtils
if (this.sink == null) {
this.sink = (Sink) ReflectionUtils
.newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class));
}
return this.sink;
}

/**
Expand Down Expand Up @@ -1366,6 +1400,7 @@ public boolean finalCheckForErrors() {

@Override
public void close() throws IOException {
this.sink.stop();
if (this.admin != null) {
this.admin.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.tool;

import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand All @@ -49,6 +51,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
Expand Down Expand Up @@ -130,6 +133,51 @@ public void testBasicCanaryWorks() throws Exception {
isA(ColumnFamilyDescriptor.class), anyLong());
}

/**
* When CanaryTool times out, it should stop scanning and shutdown quickly and gracefully. This
* test helps to confirm that threadpools do not continue executing work after the canary
* finishes. It also verifies sink behavior and measures correct failure counts in the sink.
* @throws Exception if it can't create a table, communicate with minicluster, or run the canary.
*/
@Test
public void testCanaryStopsScanningAfterTimeout() throws Exception {
// Prepare a table with multiple regions, and close those regions on the regionserver.
// Do not notify HMaster or META. CanaryTool will scan and receive NotServingRegionExceptions.
final TableName tableName = TableName.valueOf(name.getMethodName());
// Close the unused Table reference returned by createMultiRegionTable.
testingUtility.createMultiRegionTable(tableName, new byte[][] { FAMILY }).close();
List<RegionInfo> regions = testingUtility.getAdmin().getRegions(tableName);
assertTrue("verify table has multiple regions", regions.size() > 1);
HRegionServer regionserver = testingUtility.getMiniHBaseCluster().getRegionServer(0);
for (RegionInfo region : regions) {
closeRegion(testingUtility, regionserver, region);
}

// Run CanaryTool with 1 thread. This thread will attempt to scan the first region.
// It will use default rpc retries and receive NotServingRegionExceptions for many seconds
// according to HConstants.RETRY_BACKOFF. The CanaryTool timeout is set to 4 seconds, so it
// will time out before the first region scan is complete.
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
CanaryTool canary = new CanaryTool(executor);
String[] args = { "-t", "4000", tableName.getNameAsString() };
int retCode = ToolRunner.run(testingUtility.getConfiguration(), canary, args);
executor.shutdown();
try {
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}

CanaryTool.Sink sink = canary.getActiveSink();
assertEquals("verify canary timed out with TIMEOUT_ERROR_EXIT_CODE", 3, retCode);
assertEquals("verify only the first region failed", 1, sink.getReadFailureCount());
assertEquals("verify no successful reads", 0, sink.getReadSuccessCount());
assertEquals("verify we were attempting to scan all regions", regions.size(),
((CanaryTool.RegionStdOutSink) sink).getTotalExpectedRegions());
}

@Test
public void testCanaryRegionTaskReadAllCF() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
Expand Down

0 comments on commit 67cc820

Please sign in to comment.