diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index d376cfc6a696..d02d9daa5f39 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -395,7 +395,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr } HBaseRpcController controller = conn.rpcControllerFactory.newController(); resetController(controller, Math.min(rpcTimeoutNs, remainingNs), - calcPriority(serverReq.getPriority(), tableName)); + calcPriority(serverReq.getPriority(), tableName), tableName); controller.setRequestAttributes(requestAttributes); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 679efabaf050..f96a621a4423 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -139,8 +139,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri this.connConf = new AsyncConnectionConfiguration(conf); this.registry = registry; if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { - this.metrics = - Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null)); + this.metrics = Optional + .of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null)); } else { this.metrics = Optional.empty(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 5b1727f9f66e..ae1fba61d041 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -121,7 +121,7 @@ protected final void resetCallTimeout() { } else { callTimeoutNs = rpcTimeoutNs; } - resetController(controller, callTimeoutNs, priority); + resetController(controller, callTimeoutNs, priority, getTableName().orElse(null)); } private void tryScheduleRetry(Throwable error) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dd6eed6712df..6ef7d6a216a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -354,7 +354,7 @@ private long elapsedMs() { private void closeScanner() { incRPCCallsMetrics(scanMetrics, regionServerRemote); - resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS); + resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable()); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { @@ -573,7 +573,7 @@ private void call() { if (tries > 1) { incRPCRetriesMetrics(scanMetrics, regionServerRemote); } - resetController(controller, callTimeoutNs, priority); + resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable()); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); final Context context = Context.current(); @@ -595,7 +595,7 @@ private void next() { private void renewLease() { incRPCCallsMetrics(scanMetrics, regionServerRemote); nextCallSeq++; - resetController(controller, rpcTimeoutNs, priority); + resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable()); ScanRequest req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); stub.scan(controller, req, resp -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 80f7a7959f12..ff7418e39cd3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -359,8 +359,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); - this.metrics = MetricsConnection.getMetricsConnection(this.metricsScope, this::getBatchPool, - this::getMetaLookupPool); + this.metrics = MetricsConnection.getMetricsConnection(conf, this.metricsScope, + this::getBatchPool, this::getMetaLookupPool); } else { this.metrics = null; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index b9682a942da1..615f76e8c6de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -223,13 +223,17 @@ static boolean isEmptyStopRow(byte[] row) { return Bytes.equals(row, EMPTY_END_ROW); } - static void resetController(HBaseRpcController controller, long timeoutNs, int priority) { + static void resetController(HBaseRpcController controller, long timeoutNs, int priority, + TableName tableName) { controller.reset(); if (timeoutNs >= 0) { controller.setCallTimeout( (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); } controller.setPriority(priority); + if (tableName != null) { + controller.setTableName(tableName); + } } static Throwable translateException(Throwable t) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 89536d430bf5..af422fc12dc8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -34,8 +34,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -51,10 +53,10 @@ * This class is for maintaining the various connection statistics and publishing them through the * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so * as to not conflict with other uses of Yammer Metrics within the client application. Calling - * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts" - * instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate - * the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when - * all connections within this metrics instances are closed. + * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and + * "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to + * terminate the thread pools they allocate. The metrics reporter will be shutdown + * {@link #shutdown()} when all connections within this metrics instances are closed. */ @InterfaceAudience.Private public final class MetricsConnection implements StatisticTrackable { @@ -62,11 +64,11 @@ public final class MetricsConnection implements StatisticTrackable { private static final ConcurrentMap METRICS_INSTANCES = new ConcurrentHashMap<>(); - static MetricsConnection getMetricsConnection(final String scope, + static MetricsConnection getMetricsConnection(final Configuration conf, final String scope, Supplier batchPool, Supplier metaPool) { return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> { if (metricsConnection == null) { - MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool); + MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool); newMetricsConn.incrConnectionCount(); return newMetricsConn; } else { @@ -91,6 +93,10 @@ static void deleteMetricsConnection(final String scope) { /** Set this key to {@code true} to enable metrics collection of client requests. */ public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; + /** Set this key to {@code true} to enable table metrics collection of client requests. */ + public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY = + "hbase.client.table.metrics.enable"; + /** * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's @@ -311,6 +317,7 @@ private static interface NewMetric { private final MetricRegistry registry; private final JmxReporter reporter; private final String scope; + private final boolean tableMetricsEnabled; private final NewMetric timerFactory = new NewMetric() { @Override @@ -378,9 +385,10 @@ public Counter newMetric(Class clazz, String name, String scope) { private final ConcurrentMap rpcCounters = new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); - private MetricsConnection(String scope, Supplier batchPool, - Supplier metaPool) { + private MetricsConnection(Configuration conf, String scope, + Supplier batchPool, Supplier metaPool) { this.scope = scope; + this.tableMetricsEnabled = conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false); addThreadPools(batchPool, metaPool); this.registry = new MetricRegistry(); this.registry.register(getExecutorPoolName(), new RatioGauge() { @@ -520,6 +528,16 @@ public ConcurrentMap getRpcCounters() { return rpcCounters; } + /** rpcTimers metric */ + public ConcurrentMap getRpcTimers() { + return rpcTimers; + } + + /** rpcHistograms metric */ + public ConcurrentMap getRpcHistograms() { + return rpcHistograms; + } + /** getTracker metric */ public CallTracker getGetTracker() { return getTracker; @@ -694,7 +712,8 @@ private void shutdown() { } /** Report RPC context to metrics system. */ - public void updateRpc(MethodDescriptor method, Message param, CallStats stats, Throwable e) { + public void updateRpc(MethodDescriptor method, TableName tableName, Message param, + CallStats stats, Throwable e) { int callsPerServer = stats.getConcurrentCallsPerServer(); if (callsPerServer > 0) { concurrentCallsPerServerHist.update(callsPerServer); @@ -744,6 +763,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T case 0: assert "Get".equals(method.getName()); getTracker.updateRpc(stats); + updateTableMetric(methodName.toString(), tableName, stats, e); return; case 1: assert "Mutate".equals(method.getName()); @@ -751,22 +771,25 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T switch (mutationType) { case APPEND: appendTracker.updateRpc(stats); - return; + break; case DELETE: deleteTracker.updateRpc(stats); - return; + break; case INCREMENT: incrementTracker.updateRpc(stats); - return; + break; case PUT: putTracker.updateRpc(stats); - return; + break; default: throw new RuntimeException("Unrecognized mutation type " + mutationType); } + updateTableMetric(methodName.toString(), tableName, stats, e); + return; case 2: assert "Scan".equals(method.getName()); scanTracker.updateRpc(stats); + updateTableMetric(methodName.toString(), tableName, stats, e); return; case 3: assert "BulkLoadHFile".equals(method.getName()); @@ -792,6 +815,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T assert "Multi".equals(method.getName()); numActionsPerServerHist.update(stats.getNumActionsPerServer()); multiTracker.updateRpc(stats); + updateTableMetric(methodName.toString(), tableName, stats, e); return; default: throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); @@ -801,6 +825,26 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T updateRpcGeneric(methodName.toString(), stats); } + /** Report table rpc context to metrics system. */ + private void updateTableMetric(String methodName, TableName tableName, CallStats stats, + Throwable e) { + if (tableMetricsEnabled) { + if (methodName != null) { + String table = tableName != null && StringUtils.isNotEmpty(tableName.getNameAsString()) + ? tableName.getNameAsString() + : "unknown"; + String metricKey = methodName + "_" + table; + // update table rpc context to metrics system, + // includes rpc call duration, rpc call request/response size(bytes). + updateRpcGeneric(metricKey, stats); + if (e != null) { + // rpc failure call counter with table name. + getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc(); + } + } + } + } + public void incrCacheDroppingExceptions(Object exception) { getMetric( CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index da5d291a0998..03c2b7d458c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -123,6 +123,9 @@ public T call(int callTimeout) throws IOException { hrc.setPriority(priority); hrc.setCallTimeout(callTimeout); hrc.setRequestAttributes(requestAttributes); + if (tableName != null) { + hrc.setTableName(tableName); + } } } return rpcCall(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 5e42558671b7..fcded9f5b69d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -379,7 +379,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, RpcCallback callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); if (metrics != null) { - metrics.updateRpc(call.md, call.param, call.callStats, call.error); + metrics.updateRpc(call.md, hrc.getTableName(), call.param, call.callStats, call.error); } if (LOG.isTraceEnabled()) { LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms, status: {}", call.id, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index c752f4c18355..2b8839bf8462 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -143,4 +143,14 @@ public void notifyOnCancel(RpcCallback callback, CancellationCallback ac throws IOException { delegate.notifyOnCancel(callback, action); } + + @Override + public void setTableName(TableName tableName) { + delegate.setTableName(tableName); + } + + @Override + public TableName getTableName() { + return delegate.getTableName(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index cd303a5eda77..4d3e038bb5ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -130,4 +130,14 @@ default boolean hasRegionInfo() { default RegionInfo getRegionInfo() { return null; } + + /** Sets Region's table name. */ + default void setTableName(TableName tableName) { + + } + + /** Returns Region's table name or null if not available or pertinent. */ + default TableName getTableName() { + return null; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 6f8c65d6bdce..1245fc0f20d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { private IOException exception; + private TableName tableName; + /** * Rpc target Region's RegionInfo we are going against. May be null. * @see #hasRegionInfo() @@ -144,6 +146,7 @@ public void reset() { exception = null; callTimeout = null; regionInfo = null; + tableName = null; // In the implementations of some callable with replicas, rpc calls are executed in a executor // and we could cancel the operation from outside which means there could be a race between // reset and startCancel. Although I think the race should be handled by the callable since the @@ -281,4 +284,14 @@ public String toString() { + exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner=" + cellScanner + '}'; } + + @Override + public void setTableName(TableName tableName) { + this.tableName = tableName; + } + + @Override + public TableName getTableName() { + return tableName; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java index 469ecd4e10e9..6da0ee5bfa98 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -19,13 +19,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import com.codahale.metrics.Counter; import com.codahale.metrics.RatioGauge; import com.codahale.metrics.RatioGauge.Ratio; +import com.codahale.metrics.Timer; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -33,6 +36,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; import org.apache.hadoop.hbase.security.User; @@ -40,16 +44,21 @@ import org.apache.hadoop.hbase.testclassification.MetricsTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; @@ -59,25 +68,37 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +@RunWith(Parameterized.class) @Category({ ClientTests.class, MetricsTests.class, SmallTests.class }) public class TestMetricsConnection { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMetricsConnection.class); + private static final Configuration conf = new Configuration(); private static MetricsConnection METRICS; private static final ThreadPoolExecutor BATCH_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); private static final String MOCK_CONN_STR = "mocked-connection"; - @BeforeClass - public static void beforeClass() { - METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null); + @Parameter() + public boolean tableMetricsEnabled; + + @Parameters + public static List params() { + return Arrays.asList(false, true); + } + + @Before + public void before() { + conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled); + METRICS = + MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null); } - @AfterClass - public static void afterClass() { + @After + public void after() { MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR); } @@ -173,35 +194,52 @@ public void testMetricsConnectionScopeBlockingClient() throws IOException { @Test public void testStaticMetrics() throws IOException { final byte[] foo = Bytes.toBytes("foo"); - final RegionSpecifier region = RegionSpecifier.newBuilder().setValue(ByteString.EMPTY) - .setType(RegionSpecifierType.REGION_NAME).build(); + String table = "TableX"; + final RegionSpecifier region = RegionSpecifier.newBuilder() + .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build(); final int loop = 5; for (int i = 0; i < loop; i++) { METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"), - GetRequest.getDefaultInstance(), MetricsConnection.newCallStats(), null); + TableName.valueOf(table), + GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(), + MetricsConnection.newCallStats(), null); METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"), - ScanRequest.getDefaultInstance(), MetricsConnection.newCallStats(), + TableName.valueOf(table), + ScanRequest.newBuilder().setRegion(region) + .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(), + MetricsConnection.newCallStats(), new RemoteWithExtrasException("java.io.IOException", null, false, false)); METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"), - MultiRequest.getDefaultInstance(), MetricsConnection.newCallStats(), + TableName.valueOf(table), + MultiRequest.newBuilder() + .addRegionAction(ClientProtos.RegionAction.newBuilder() + .addAction( + ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build()) + .setRegion(region).build()) + .build(), + MetricsConnection.newCallStats(), new CallTimeoutException("test with CallTimeoutException")); METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), + TableName.valueOf(table), MutateRequest.newBuilder() .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) .setRegion(region).build(), MetricsConnection.newCallStats(), null); METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), + TableName.valueOf(table), MutateRequest.newBuilder() .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) .setRegion(region).build(), MetricsConnection.newCallStats(), null); METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), + TableName.valueOf(table), MutateRequest.newBuilder() .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) .setRegion(region).build(), MetricsConnection.newCallStats(), null); METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), + TableName.valueOf(table), MutateRequest.newBuilder() .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region) .build(), @@ -209,48 +247,12 @@ public void testStaticMetrics() throws IOException { new CallTimeoutException("test with CallTimeoutException")); } - final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_"; - final String rpcFailureCountPrefix = - "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_"; + testRpcCallMetrics(table, loop); + String metricKey; long metricVal; Counter counter; - for (String method : new String[] { "Get", "Scan", "Multi" }) { - metricKey = rpcCountPrefix + method; - metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); - assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop); - - metricKey = rpcFailureCountPrefix + method; - counter = METRICS.getRpcCounters().get(metricKey); - metricVal = (counter != null) ? counter.getCount() : 0; - if (method.equals("Get")) { - // no failure - assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); - } else { - // has failure - assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop); - } - } - - String method = "Mutate"; - for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { - metricKey = rpcCountPrefix + method + "(" + mutationType + ")"; - metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); - assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop); - - metricKey = rpcFailureCountPrefix + method + "(" + mutationType + ")"; - counter = METRICS.getRpcCounters().get(metricKey); - metricVal = (counter != null) ? counter.getCount() : 0; - if (mutationType.equals("Put")) { - // has failure - assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop); - } else { - // no failure - assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); - } - } - // remote exception metricKey = "rpcRemoteExceptions_IOException"; counter = METRICS.getRpcCounters().get(metricKey); @@ -269,6 +271,8 @@ public void testStaticMetrics() throws IOException { metricVal = (counter != null) ? counter.getCount() : 0; assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 3); + testRpcCallTableMetrics(table, loop); + for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(), METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(), @@ -284,4 +288,99 @@ public void testStaticMetrics() throws IOException { assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0); assertEquals(Double.NaN, metaMetrics.getValue(), 0); } + + private void testRpcCallTableMetrics(String table, int expectedVal) { + String metricKey; + Timer timer; + String numOpsSuffix = "_num_ops"; + String p95Suffix = "_95th_percentile"; + String p99Suffix = "_99th_percentile"; + String service = ClientService.getDescriptor().getName(); + for (String m : new String[] { "Get", "Scan", "Multi" }) { + metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table; + timer = METRICS.getRpcTimers().get(metricKey); + if (tableMetricsEnabled) { + long numOps = timer.getCount(); + double p95 = timer.getSnapshot().get95thPercentile(); + double p99 = timer.getSnapshot().get99thPercentile(); + assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, + numOps); + assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); + assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); + } else { + assertNull(timer); + } + } + + // Distinguish mutate types for mutate method. + String mutateMethod = "Mutate"; + for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { + metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")" + + "_" + table; + timer = METRICS.getRpcTimers().get(metricKey); + if (tableMetricsEnabled) { + long numOps = timer.getCount(); + double p95 = timer.getSnapshot().get95thPercentile(); + double p99 = timer.getSnapshot().get99thPercentile(); + assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, + numOps); + assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); + assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); + } else { + assertNull(timer); + } + } + } + + private void testRpcCallMetrics(String table, int expectedVal) { + final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_"; + final String rpcFailureCountPrefix = + "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_"; + String metricKey; + long metricVal; + Counter counter; + + for (String method : new String[] { "Get", "Scan", "Multi" }) { + // rpc call count + metricKey = rpcCountPrefix + method; + metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); + assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); + + // rpc failure call + metricKey = tableMetricsEnabled + ? rpcFailureCountPrefix + method + "_" + table + : rpcFailureCountPrefix + method; + counter = METRICS.getRpcCounters().get(metricKey); + metricVal = (counter != null) ? counter.getCount() : 0; + if (method.equals("Get")) { + // no failure + assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); + } else { + // has failure + assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); + } + } + + String method = "Mutate"; + for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { + // rpc call count + metricKey = rpcCountPrefix + method + "(" + mutationType + ")"; + metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); + assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); + + // rpc failure call + metricKey = tableMetricsEnabled + ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table + : rpcFailureCountPrefix + method + "(" + mutationType + ")"; + counter = METRICS.getRpcCounters().get(metricKey); + metricVal = (counter != null) ? counter.getCount() : 0; + if (mutationType.equals("Put")) { + // has failure + assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); + } else { + // no failure + assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTableMetrics.java new file mode 100644 index 000000000000..e3cfcaa81921 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTableMetrics.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.codahale.metrics.Timer; +import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; + +@Category(MediumTests.class) +public class TestClientTableMetrics { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClientTableMetrics.class); + + private static HBaseTestingUtility UTIL; + private static Connection CONN; + private static MetricsConnection METRICS; + private static final String tableName = "table_1"; + private static final TableName TABLE_1 = TableName.valueOf(tableName); + private static final byte[] FAMILY = Bytes.toBytes("f"); + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); + conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, true); + UTIL = new HBaseTestingUtility(conf); + UTIL.startMiniCluster(2); + UTIL.createTable(TABLE_1, FAMILY); + UTIL.waitTableAvailable(TABLE_1); + // Only test the sync connection mode. + CONN = UTIL.getConnection(); + METRICS = ((ConnectionImplementation) CONN).getConnectionMetrics(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.deleteTableIfAny(TABLE_1); + UTIL.shutdownMiniCluster(); + } + + @Test + public void testGetTableMetrics() throws IOException { + Table table = CONN.getTable(TABLE_1); + table.get(new Get(Bytes.toBytes("row1"))); + table.get(new Get(Bytes.toBytes("row2"))); + table.get(new Get(Bytes.toBytes("row3"))); + table.close(); + + String metricKey = + "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + "_Get_" + tableName; + verifyTableMetrics(metricKey, 3); + } + + @Test + public void testMutateTableMetrics() throws IOException { + Table table = CONN.getTable(TABLE_1); + // PUT + Put put = new Put(Bytes.toBytes("row1")); + put.addColumn(FAMILY, Bytes.toBytes("name"), Bytes.toBytes("tom")); + table.put(put); + put = new Put(Bytes.toBytes("row2")); + put.addColumn(FAMILY, Bytes.toBytes("name"), Bytes.toBytes("jerry")); + table.put(put); + // DELETE + table.delete(new Delete(Bytes.toBytes("row1"))); + table.close(); + + String metricKey = + "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + "_Mutate(Put)_" + tableName; + verifyTableMetrics(metricKey, 2); + + metricKey = "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + "_Mutate(Delete)_" + + tableName; + verifyTableMetrics(metricKey, 1); + } + + @Test + public void testScanTableMetrics() throws IOException { + Table table = CONN.getTable(TABLE_1); + table.getScanner(new Scan()); + table.close(); + + String metricKey = + "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + "_Scan_" + tableName; + verifyTableMetrics(metricKey, 1); + } + + @Test + public void testMultiTableMetrics() throws IOException { + Table table = CONN.getTable(TABLE_1); + table.put(Arrays.asList( + new Put(Bytes.toBytes("row1")).addColumn(FAMILY, Bytes.toBytes("name"), Bytes.toBytes("tom")), + new Put(Bytes.toBytes("row2")).addColumn(FAMILY, Bytes.toBytes("name"), + Bytes.toBytes("jerry")))); + table.get(Arrays.asList(new Get(Bytes.toBytes("row1")), new Get(Bytes.toBytes("row2")))); + table.close(); + + String metricKey = + "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + "_Multi_" + tableName; + verifyTableMetrics(metricKey, 2); + } + + private static void verifyTableMetrics(String metricKey, int expectedVal) { + String numOpsSuffix = "_num_ops"; + String p95Suffix = "_95th_percentile"; + String p99Suffix = "_99th_percentile"; + Timer timer = METRICS.getRpcTimers().get(metricKey); + long numOps = timer.getCount(); + double p95 = timer.getSnapshot().get95thPercentile(); + double p99 = timer.getSnapshot().get99thPercentile(); + assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, numOps); + assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); + assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); + } +}