Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28663 Graceful shutdown of CanaryTool timeouts (branch-2) #5992

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ public interface Sink {
long getWriteSuccessCount();

long incWriteSuccessCount();

void stop();

boolean isStopped();
}

/**
Expand All @@ -208,6 +212,7 @@ public static class StdOutSink implements Sink {
readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0);
private Map<String, String> readFailures = new ConcurrentHashMap<>();
private Map<String, String> writeFailures = new ConcurrentHashMap<>();
private volatile boolean stopped = false;

@Override
public long getReadFailureCount() {
Expand Down Expand Up @@ -268,6 +273,15 @@ public long getWriteSuccessCount() {
public long incWriteSuccessCount() {
return writeSuccessCount.incrementAndGet();
}

public void stop() {
stopped = true;
}

@Override
public boolean isStopped() {
return stopped;
}
}

/**
Expand Down Expand Up @@ -444,6 +458,9 @@ public ZookeeperTask(Connection connection, String host, String znode, int timeo

@Override
public Void call() throws Exception {
if (this.sink.isStopped()) {
return null;
}
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
Expand Down Expand Up @@ -498,6 +515,9 @@ public enum TaskType {

@Override
public Void call() {
if (this.sink.isStopped()) {
return null;
}
switch (taskType) {
case READ:
return read();
Expand Down Expand Up @@ -685,6 +705,9 @@ static class RegionServerTask implements Callable<Void> {

@Override
public Void call() {
if (this.sink.isStopped()) {
return null;
}
TableName tableName = null;
Table table = null;
Get get = null;
Expand Down Expand Up @@ -1075,6 +1098,7 @@ private int runMonitor(String[] monitorTargets) throws Exception {
if (currentTimeLength > timeout) {
LOG.error("The monitor is running too long (" + currentTimeLength
+ ") after timeout limit:" + timeout + " will be killed itself !!");
monitorThread.interrupt();
if (monitor.initialized) {
return TIMEOUT_ERROR_EXIT_CODE;
} else {
Expand Down Expand Up @@ -1113,6 +1137,15 @@ public Map<String, String> getWriteFailures() {
return sink.getWriteFailures();
}

/**
* Return a CanaryTool.Sink object containing the detailed results of the canary run. The Sink may
* not have been created if a Monitor thread is not yet running.
* @return the active Sink if one exists, null otherwise.
*/
public Sink getActiveSink() {
return sink;
}

private void printUsageAndExit() {
System.err.println(
"Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
Expand Down Expand Up @@ -1159,10 +1192,11 @@ private void printUsageAndExit() {

Sink getSink(Configuration configuration, Class clazz) {
// In test context, this.sink might be set. Use it if non-null. For testing.
return this.sink != null
? this.sink
: (Sink) ReflectionUtils
if (this.sink == null) {
this.sink = (Sink) ReflectionUtils
.newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class));
}
return this.sink;
}

/**
Expand Down Expand Up @@ -1366,6 +1400,7 @@ public boolean finalCheckForErrors() {

@Override
public void close() throws IOException {
this.sink.stop();
if (this.admin != null) {
this.admin.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.tool;

import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -38,17 +39,20 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
Expand Down Expand Up @@ -130,6 +134,51 @@ public void testBasicCanaryWorks() throws Exception {
isA(ColumnFamilyDescriptor.class), anyLong());
}

/**
* When CanaryTool times out, it should stop scanning and shutdown quickly and gracefully. This
* test helps to confirm that threadpools do not continue executing work after the canary
* finishes. It also verifies sink behavior and measures correct failure counts in the sink.
* @throws Exception if it can't create a table, communicate with minicluster, or run the canary.
*/
@Test
public void testCanaryStopsScanningAfterTimeout() throws Exception {
// Prepare a table with multiple regions, and close those regions on the regionserver.
// Do not notify HMaster or META. CanaryTool will scan and receive NotServingRegionExceptions.
final TableName tableName = TableName.valueOf(name.getMethodName());
// Close the unused Table reference returned by createMultiRegionTable.
testingUtility.createMultiRegionTable(tableName, new byte[][] { FAMILY }).close();
List<RegionInfo> regions = testingUtility.getAdmin().getRegions(tableName);
assertTrue("verify table has multiple regions", regions.size() > 1);
HRegionServer regionserver = testingUtility.getMiniHBaseCluster().getRegionServer(0);
for (RegionInfo region : regions) {
closeRegion(testingUtility, regionserver, new HRegionInfo(region));
}

// Run CanaryTool with 1 thread. This thread will attempt to scan the first region.
// It will use default rpc retries and receive NotServingRegionExceptions for many seconds
// according to HConstants.RETRY_BACKOFF. The CanaryTool timeout is set to 4 seconds, so it
// will time out before the first region scan is complete.
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
CanaryTool canary = new CanaryTool(executor);
String[] args = { "-t", "4000", tableName.getNameAsString() };
int retCode = ToolRunner.run(testingUtility.getConfiguration(), canary, args);
executor.shutdown();
try {
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}

CanaryTool.Sink sink = canary.getActiveSink();
assertEquals("verify canary timed out with TIMEOUT_ERROR_EXIT_CODE", 3, retCode);
assertEquals("verify only the first region failed", 1, sink.getReadFailureCount());
assertEquals("verify no successful reads", 0, sink.getReadSuccessCount());
assertEquals("verify we were attempting to scan all regions", regions.size(),
((CanaryTool.RegionStdOutSink) sink).getTotalExpectedRegions());
}

@Test
public void testCanaryRegionTaskReadAllCF() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
Expand Down