From e5fdef549b77c57bf8e0f71b03c4bdd92ea99764 Mon Sep 17 00:00:00 2001 From: caroliney14 Date: Thu, 12 Sep 2019 12:12:07 -0700 Subject: [PATCH] Provide an API to get list of successful regions and total expected regions in Canary (#612) Signed-off-by: Xu Cang --- .../org/apache/hadoop/hbase/tool/Canary.java | 138 +++++++++++++++++- .../hadoop/hbase/tool/TestCanaryTool.java | 80 ++++++++-- 2 files changed, 206 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 073df6858320..afec6b640810 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -120,13 +120,19 @@ public interface Sink { public long incWriteFailureCount(); public Map getWriteFailures(); public void updateWriteFailures(String regionName, String serverName); + public long getReadSuccessCount(); + public long incReadSuccessCount(); + public long getWriteSuccessCount(); + public long incWriteSuccessCount(); } // Simple implementation of canary sink that allows to plot on // file or standard output timings or failures. public static class StdOutSink implements Sink { private AtomicLong readFailureCount = new AtomicLong(0), - writeFailureCount = new AtomicLong(0); + writeFailureCount = new AtomicLong(0), + readSuccessCount = new AtomicLong(0), + writeSuccessCount = new AtomicLong(0); private Map readFailures = new ConcurrentHashMap(); private Map writeFailures = new ConcurrentHashMap(); @@ -170,6 +176,26 @@ public Map getWriteFailures() { public void updateWriteFailures(String regionName, String serverName) { writeFailures.put(regionName, serverName); } + + @Override + public long getReadSuccessCount() { + return readSuccessCount.get(); + } + + @Override + public long incReadSuccessCount() { + return readSuccessCount.incrementAndGet(); + } + + @Override + public long getWriteSuccessCount() { + return writeSuccessCount.get(); + } + + @Override + public long incWriteSuccessCount() { + return writeSuccessCount.incrementAndGet(); + } } public static class RegionServerStdOutSink extends StdOutSink { @@ -202,6 +228,7 @@ public static class RegionStdOutSink extends StdOutSink { private Map perTableReadLatency = new HashMap<>(); private AtomicLong writeLatency = new AtomicLong(); + private Map regionMap = new ConcurrentHashMap<>(); public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) { incReadFailureCount(); @@ -215,6 +242,10 @@ public void publishReadFailure(ServerName serverName, HRegionInfo region, HColum } public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) { + incReadSuccessCount(); + RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString()); + res.setReadSuccess(); + res.setReadLatency(msTime); LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms", region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime)); } @@ -231,6 +262,10 @@ public void publishWriteFailure(ServerName serverName, HRegionInfo region, HColu } public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) { + incWriteSuccessCount(); + RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString()); + res.setWriteSuccess(); + res.setWriteLatency(msTime); LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms", region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime)); } @@ -252,6 +287,14 @@ public void initializeWriteLatency() { public AtomicLong getWriteLatency() { return this.writeLatency; } + + public Map getRegionMap() { + return this.regionMap; + } + + public int getTotalExpectedRegions() { + return this.regionMap.size(); + } } static class ZookeeperTask implements Callable { @@ -883,6 +926,96 @@ private void printUsageAndExit() { System.exit(USAGE_EXIT_CODE); } + /** + * Canary region mode-specific data structure which stores information about each region + * to be scanned + */ + public static class RegionTaskResult { + private HRegionInfo region; + private TableName tableName; + private ServerName serverName; + private AtomicLong readLatency = null; + private AtomicLong writeLatency = null; + private boolean readSuccess = false; + private boolean writeSuccess = false; + + public RegionTaskResult(HRegionInfo region, TableName tableName, ServerName serverName) { + this.region = region; + this.tableName = tableName; + this.serverName = serverName; + } + + public HRegionInfo getRegionInfo() { + return this.region; + } + + public String getRegionNameAsString() { + return this.region.getRegionNameAsString(); + } + + public TableName getTableName() { + return this.tableName; + } + + public String getTableNameAsString() { + return this.tableName.getNameAsString(); + } + + public ServerName getServerName() { + return this.serverName; + } + + public String getServerNameAsString() { + return this.serverName.getServerName(); + } + + public long getReadLatency() { + if (this.readLatency == null) { + return -1; + } + return this.readLatency.get(); + } + + public void setReadLatency(long readLatency) { + if (this.readLatency != null) { + this.readLatency.set(readLatency); + } else { + this.readLatency = new AtomicLong(readLatency); + } + } + + public long getWriteLatency() { + if (this.writeLatency == null) { + return -1; + } + return this.writeLatency.get(); + } + + public void setWriteLatency(long writeLatency) { + if (this.writeLatency != null) { + this.writeLatency.set(writeLatency); + } else { + this.writeLatency = new AtomicLong(writeLatency); + } + } + + public boolean isReadSuccess() { + return this.readSuccess; + } + + public void setReadSuccess() { + this.readSuccess = true; + } + + public boolean isWriteSuccess() { + return this.writeSuccess; + } + + public void setWriteSuccess() { + this.writeSuccess = true; + } + } + /** * A Factory method for {@link Monitor}. * Can be overridden by user. @@ -1295,6 +1428,9 @@ private static List> sniff(final Admin admin, final Sink sink, HRegionInfo region = location.getRegionInfo(); tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, taskType, rawScanEnabled, rwLatency)); + Map regionMap = ((RegionStdOutSink) sink).getRegionMap(); + regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region, + region.getTable(), rs)); } } finally { if (regionLocator != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java index 976f02a04109..7c435a8703a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java @@ -19,6 +19,25 @@ package org.apache.hadoop.hbase.tool; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -34,7 +53,6 @@ import org.apache.log4j.Appender; import org.apache.log4j.LogManager; import org.apache.log4j.spi.LoggingEvent; -import com.google.common.collect.Iterables; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -45,16 +63,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static org.junit.Assert.assertNotEquals; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; -import static org.mockito.Matchers.argThat; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; +import com.google.common.collect.Iterables; @RunWith(MockitoJUnitRunner.class) @Category({MediumTests.class}) @@ -113,6 +122,55 @@ public void testBasicCanaryWorks() throws Exception { verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), isA(HColumnDescriptor.class), anyLong()); } + @Test + public void testCanaryRegionTaskResult() throws Exception { + TableName tableName = TableName.valueOf("testCanaryRegionTaskResult"); + HTable table = testingUtility.createTable(tableName, new byte[][]{FAMILY}); + // insert some test rows + for (int i = 0; i < 1000; i++) { + byte[] iBytes = Bytes.toBytes(i); + Put p = new Put(iBytes); + p.addColumn(FAMILY, COLUMN, iBytes); + table.put(p); + } + ExecutorService executor = new ScheduledThreadPoolExecutor(1); + Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); + Canary canary = new Canary(executor, sink); + String[] args = {"-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult"}; + assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); + + assertTrue("verify read success count > 0", sink.getReadSuccessCount() > 0); + assertTrue("verify write success count > 0", sink.getWriteSuccessCount() > 0); + verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), + isA(HColumnDescriptor.class), anyLong()); + verify(sink, atLeastOnce()).publishWriteTiming(isA(ServerName.class), isA(HRegionInfo.class), + isA(HColumnDescriptor.class), anyLong()); + + assertTrue("canary should expect to scan at least 1 region", + sink.getTotalExpectedRegions() > 0); + Map regionMap = sink.getRegionMap(); + assertFalse("verify region map has size > 0", regionMap.isEmpty()); + + for (String regionName : regionMap.keySet()) { + Canary.RegionTaskResult res = regionMap.get(regionName); + assertNotNull("verify each expected region has a RegionTaskResult object in the map", res); + assertNotNull("verify getRegionNameAsString()", regionName); + assertNotNull("verify getRegionInfo()", res.getRegionInfo()); + assertNotNull("verify getTableName()", res.getTableName()); + assertNotNull("verify getTableNameAsString()", res.getTableNameAsString()); + assertNotNull("verify getServerName()", res.getServerName()); + assertNotNull("verify getServerNameAsString()", res.getServerNameAsString()); + + if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) { + assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess()); + assertTrue("write took some time", res.getWriteLatency() > -1); + } else { + assertTrue("read from region " + regionName + " succeeded", res.isReadSuccess()); + assertTrue("read took some time", res.getReadLatency() > -1); + } + } + } + @Test @Ignore("Intermittent argument matching failures, see HBASE-18813") public void testReadTableTimeouts() throws Exception {