Skip to content

Commit

Permalink
Provide an API to get list of successful regions and total expected r…
Browse files Browse the repository at this point in the history
…egions in Canary (#612)

Signed-off-by: Xu Cang <xucang@apache.org>
  • Loading branch information
caroliney14 authored and xcangCRM committed Sep 12, 2019
1 parent 3f84591 commit e5fdef5
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 12 deletions.
138 changes: 137 additions & 1 deletion hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,19 @@ public interface Sink {
public long incWriteFailureCount();
public Map<String,String> getWriteFailures();
public void updateWriteFailures(String regionName, String serverName);
public long getReadSuccessCount();
public long incReadSuccessCount();
public long getWriteSuccessCount();
public long incWriteSuccessCount();
}

// Simple implementation of canary sink that allows to plot on
// file or standard output timings or failures.
public static class StdOutSink implements Sink {
private AtomicLong readFailureCount = new AtomicLong(0),
writeFailureCount = new AtomicLong(0);
writeFailureCount = new AtomicLong(0),
readSuccessCount = new AtomicLong(0),
writeSuccessCount = new AtomicLong(0);

private Map<String, String> readFailures = new ConcurrentHashMap<String, String>();
private Map<String, String> writeFailures = new ConcurrentHashMap<String, String>();
Expand Down Expand Up @@ -170,6 +176,26 @@ public Map<String, String> getWriteFailures() {
public void updateWriteFailures(String regionName, String serverName) {
writeFailures.put(regionName, serverName);
}

@Override
public long getReadSuccessCount() {
return readSuccessCount.get();
}

@Override
public long incReadSuccessCount() {
return readSuccessCount.incrementAndGet();
}

@Override
public long getWriteSuccessCount() {
return writeSuccessCount.get();
}

@Override
public long incWriteSuccessCount() {
return writeSuccessCount.incrementAndGet();
}
}

public static class RegionServerStdOutSink extends StdOutSink {
Expand Down Expand Up @@ -202,6 +228,7 @@ public static class RegionStdOutSink extends StdOutSink {

private Map<String, AtomicLong> perTableReadLatency = new HashMap<>();
private AtomicLong writeLatency = new AtomicLong();
private Map<String, RegionTaskResult> regionMap = new ConcurrentHashMap<>();

public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
incReadFailureCount();
Expand All @@ -215,6 +242,10 @@ public void publishReadFailure(ServerName serverName, HRegionInfo region, HColum
}

public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
incReadSuccessCount();
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
res.setReadSuccess();
res.setReadLatency(msTime);
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
}
Expand All @@ -231,6 +262,10 @@ public void publishWriteFailure(ServerName serverName, HRegionInfo region, HColu
}

public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
incWriteSuccessCount();
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
res.setWriteSuccess();
res.setWriteLatency(msTime);
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
}
Expand All @@ -252,6 +287,14 @@ public void initializeWriteLatency() {
public AtomicLong getWriteLatency() {
return this.writeLatency;
}

public Map<String, RegionTaskResult> getRegionMap() {
return this.regionMap;
}

public int getTotalExpectedRegions() {
return this.regionMap.size();
}
}

static class ZookeeperTask implements Callable<Void> {
Expand Down Expand Up @@ -883,6 +926,96 @@ private void printUsageAndExit() {
System.exit(USAGE_EXIT_CODE);
}

/**
* Canary region mode-specific data structure which stores information about each region
* to be scanned
*/
public static class RegionTaskResult {
private HRegionInfo region;
private TableName tableName;
private ServerName serverName;
private AtomicLong readLatency = null;
private AtomicLong writeLatency = null;
private boolean readSuccess = false;
private boolean writeSuccess = false;

public RegionTaskResult(HRegionInfo region, TableName tableName, ServerName serverName) {
this.region = region;
this.tableName = tableName;
this.serverName = serverName;
}

public HRegionInfo getRegionInfo() {
return this.region;
}

public String getRegionNameAsString() {
return this.region.getRegionNameAsString();
}

public TableName getTableName() {
return this.tableName;
}

public String getTableNameAsString() {
return this.tableName.getNameAsString();
}

public ServerName getServerName() {
return this.serverName;
}

public String getServerNameAsString() {
return this.serverName.getServerName();
}

public long getReadLatency() {
if (this.readLatency == null) {
return -1;
}
return this.readLatency.get();
}

public void setReadLatency(long readLatency) {
if (this.readLatency != null) {
this.readLatency.set(readLatency);
} else {
this.readLatency = new AtomicLong(readLatency);
}
}

public long getWriteLatency() {
if (this.writeLatency == null) {
return -1;
}
return this.writeLatency.get();
}

public void setWriteLatency(long writeLatency) {
if (this.writeLatency != null) {
this.writeLatency.set(writeLatency);
} else {
this.writeLatency = new AtomicLong(writeLatency);
}
}

public boolean isReadSuccess() {
return this.readSuccess;
}

public void setReadSuccess() {
this.readSuccess = true;
}

public boolean isWriteSuccess() {
return this.writeSuccess;
}

public void setWriteSuccess() {
this.writeSuccess = true;
}
}

/**
* A Factory method for {@link Monitor}.
* Can be overridden by user.
Expand Down Expand Up @@ -1295,6 +1428,9 @@ private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
HRegionInfo region = location.getRegionInfo();
tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, taskType, rawScanEnabled,
rwLatency));
Map<String, RegionTaskResult> regionMap = ((RegionStdOutSink) sink).getRegionMap();
regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region,
region.getTable(), rs));
}
} finally {
if (regionLocator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@

package org.apache.hadoop.hbase.tool;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -34,7 +53,6 @@
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent;
import com.google.common.collect.Iterables;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
Expand All @@ -45,16 +63,7 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.argThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
import com.google.common.collect.Iterables;

@RunWith(MockitoJUnitRunner.class)
@Category({MediumTests.class})
Expand Down Expand Up @@ -113,6 +122,55 @@ public void testBasicCanaryWorks() throws Exception {
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), isA(HColumnDescriptor.class), anyLong());
}

@Test
public void testCanaryRegionTaskResult() throws Exception {
TableName tableName = TableName.valueOf("testCanaryRegionTaskResult");
HTable table = testingUtility.createTable(tableName, new byte[][]{FAMILY});
// insert some test rows
for (int i = 0; i < 1000; i++) {
byte[] iBytes = Bytes.toBytes(i);
Put p = new Put(iBytes);
p.addColumn(FAMILY, COLUMN, iBytes);
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = {"-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult"};
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));

assertTrue("verify read success count > 0", sink.getReadSuccessCount() > 0);
assertTrue("verify write success count > 0", sink.getWriteSuccessCount() > 0);
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class),
isA(HColumnDescriptor.class), anyLong());
verify(sink, atLeastOnce()).publishWriteTiming(isA(ServerName.class), isA(HRegionInfo.class),
isA(HColumnDescriptor.class), anyLong());

assertTrue("canary should expect to scan at least 1 region",
sink.getTotalExpectedRegions() > 0);
Map<String, Canary.RegionTaskResult> regionMap = sink.getRegionMap();
assertFalse("verify region map has size > 0", regionMap.isEmpty());

for (String regionName : regionMap.keySet()) {
Canary.RegionTaskResult res = regionMap.get(regionName);
assertNotNull("verify each expected region has a RegionTaskResult object in the map", res);
assertNotNull("verify getRegionNameAsString()", regionName);
assertNotNull("verify getRegionInfo()", res.getRegionInfo());
assertNotNull("verify getTableName()", res.getTableName());
assertNotNull("verify getTableNameAsString()", res.getTableNameAsString());
assertNotNull("verify getServerName()", res.getServerName());
assertNotNull("verify getServerNameAsString()", res.getServerNameAsString());

if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) {
assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess());
assertTrue("write took some time", res.getWriteLatency() > -1);
} else {
assertTrue("read from region " + regionName + " succeeded", res.isReadSuccess());
assertTrue("read took some time", res.getReadLatency() > -1);
}
}
}

@Test
@Ignore("Intermittent argument matching failures, see HBASE-18813")
public void testReadTableTimeouts() throws Exception {
Expand Down

0 comments on commit e5fdef5

Please sign in to comment.