Skip to content

Commit

Permalink
HBASE-27853 Add client side table metrics for rpc calls and request l…
Browse files Browse the repository at this point in the history
…atency. (#5406)

Co-authored-by: jay.zhu <jay.zhu@huolala.cn>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
  • Loading branch information
zhuyaogai and jay.zhu authored Sep 18, 2023
1 parent 1df357e commit e8cbc3f
Show file tree
Hide file tree
Showing 14 changed files with 406 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
calcPriority(serverReq.getPriority(), tableName), tableName);
controller.setRequestAttributes(requestAttributes);
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics =
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
this.metrics = Optional
.of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected final void resetCallTimeout() {
} else {
callTimeoutNs = rpcTimeoutNs;
}
resetController(controller, callTimeoutNs, priority);
resetController(controller, callTimeoutNs, priority, getTableName().orElse(null));
}

private void tryScheduleRetry(Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private long elapsedMs() {

private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable());
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
Expand Down Expand Up @@ -573,7 +573,7 @@ private void call() {
if (tries > 1) {
incRPCRetriesMetrics(scanMetrics, regionServerRemote);
}
resetController(controller, callTimeoutNs, priority);
resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable());
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
final Context context = Context.current();
Expand All @@ -595,7 +595,7 @@ private void next() {
private void renewLease() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
nextCallSeq++;
resetController(controller, rpcTimeoutNs, priority);
resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable());
ScanRequest req =
RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
stub.scan(controller, req, resp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {

if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = MetricsConnection.getMetricsConnection(this.metricsScope, this::getBatchPool,
this::getMetaLookupPool);
this.metrics = MetricsConnection.getMetricsConnection(conf, this.metricsScope,
this::getBatchPool, this::getMetaLookupPool);
} else {
this.metrics = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,17 @@ static boolean isEmptyStopRow(byte[] row) {
return Bytes.equals(row, EMPTY_END_ROW);
}

static void resetController(HBaseRpcController controller, long timeoutNs, int priority) {
static void resetController(HBaseRpcController controller, long timeoutNs, int priority,
TableName tableName) {
controller.reset();
if (timeoutNs >= 0) {
controller.setCallTimeout(
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
}
controller.setPriority(priority);
if (tableName != null) {
controller.setTableName(tableName);
}
}

static Throwable translateException(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -51,22 +53,22 @@
* This class is for maintaining the various connection statistics and publishing them through the
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
* all connections within this metrics instances are closed.
* {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and
* "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to
* terminate the thread pools they allocate. The metrics reporter will be shutdown
* {@link #shutdown()} when all connections within this metrics instances are closed.
*/
@InterfaceAudience.Private
public final class MetricsConnection implements StatisticTrackable {

private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
new ConcurrentHashMap<>();

static MetricsConnection getMetricsConnection(final String scope,
static MetricsConnection getMetricsConnection(final Configuration conf, final String scope,
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
if (metricsConnection == null) {
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool);
newMetricsConn.incrConnectionCount();
return newMetricsConn;
} else {
Expand All @@ -91,6 +93,10 @@ static void deleteMetricsConnection(final String scope) {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";

/** Set this key to {@code true} to enable table metrics collection of client requests. */
public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY =
"hbase.client.table.metrics.enable";

/**
* Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
* scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
Expand Down Expand Up @@ -311,6 +317,7 @@ private static interface NewMetric<T> {
private final MetricRegistry registry;
private final JmxReporter reporter;
private final String scope;
private final boolean tableMetricsEnabled;

private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
@Override
Expand Down Expand Up @@ -378,9 +385,10 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
private final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);

private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
private MetricsConnection(Configuration conf, String scope,
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
this.scope = scope;
this.tableMetricsEnabled = conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false);
addThreadPools(batchPool, metaPool);
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(), new RatioGauge() {
Expand Down Expand Up @@ -520,6 +528,16 @@ public ConcurrentMap<String, Counter> getRpcCounters() {
return rpcCounters;
}

/** rpcTimers metric */
public ConcurrentMap<String, Timer> getRpcTimers() {
return rpcTimers;
}

/** rpcHistograms metric */
public ConcurrentMap<String, Histogram> getRpcHistograms() {
return rpcHistograms;
}

/** getTracker metric */
public CallTracker getGetTracker() {
return getTracker;
Expand Down Expand Up @@ -694,7 +712,8 @@ private void shutdown() {
}

/** Report RPC context to metrics system. */
public void updateRpc(MethodDescriptor method, Message param, CallStats stats, Throwable e) {
public void updateRpc(MethodDescriptor method, TableName tableName, Message param,
CallStats stats, Throwable e) {
int callsPerServer = stats.getConcurrentCallsPerServer();
if (callsPerServer > 0) {
concurrentCallsPerServerHist.update(callsPerServer);
Expand Down Expand Up @@ -744,29 +763,33 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T
case 0:
assert "Get".equals(method.getName());
getTracker.updateRpc(stats);
updateTableMetric(methodName.toString(), tableName, stats, e);
return;
case 1:
assert "Mutate".equals(method.getName());
final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
switch (mutationType) {
case APPEND:
appendTracker.updateRpc(stats);
return;
break;
case DELETE:
deleteTracker.updateRpc(stats);
return;
break;
case INCREMENT:
incrementTracker.updateRpc(stats);
return;
break;
case PUT:
putTracker.updateRpc(stats);
return;
break;
default:
throw new RuntimeException("Unrecognized mutation type " + mutationType);
}
updateTableMetric(methodName.toString(), tableName, stats, e);
return;
case 2:
assert "Scan".equals(method.getName());
scanTracker.updateRpc(stats);
updateTableMetric(methodName.toString(), tableName, stats, e);
return;
case 3:
assert "BulkLoadHFile".equals(method.getName());
Expand All @@ -792,6 +815,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T
assert "Multi".equals(method.getName());
numActionsPerServerHist.update(stats.getNumActionsPerServer());
multiTracker.updateRpc(stats);
updateTableMetric(methodName.toString(), tableName, stats, e);
return;
default:
throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
Expand All @@ -801,6 +825,26 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T
updateRpcGeneric(methodName.toString(), stats);
}

/** Report table rpc context to metrics system. */
private void updateTableMetric(String methodName, TableName tableName, CallStats stats,
Throwable e) {
if (tableMetricsEnabled) {
if (methodName != null) {
String table = tableName != null && StringUtils.isNotEmpty(tableName.getNameAsString())
? tableName.getNameAsString()
: "unknown";
String metricKey = methodName + "_" + table;
// update table rpc context to metrics system,
// includes rpc call duration, rpc call request/response size(bytes).
updateRpcGeneric(metricKey, stats);
if (e != null) {
// rpc failure call counter with table name.
getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc();
}
}
}
}

public void incrCacheDroppingExceptions(Object exception) {
getMetric(
CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public T call(int callTimeout) throws IOException {
hrc.setPriority(priority);
hrc.setCallTimeout(callTimeout);
hrc.setRequestAttributes(requestAttributes);
if (tableName != null) {
hrc.setTableName(tableName);
}
}
}
return rpcCall();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
metrics.updateRpc(call.md, call.param, call.callStats, call.error);
metrics.updateRpc(call.md, hrc.getTableName(), call.param, call.callStats, call.error);
}
if (LOG.isTraceEnabled()) {
LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms, status: {}", call.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,14 @@ public void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback ac
throws IOException {
delegate.notifyOnCancel(callback, action);
}

@Override
public void setTableName(TableName tableName) {
delegate.setTableName(tableName);
}

@Override
public TableName getTableName() {
return delegate.getTableName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,14 @@ default boolean hasRegionInfo() {
default RegionInfo getRegionInfo() {
return null;
}

/** Sets Region's table name. */
default void setTableName(TableName tableName) {

}

/** Returns Region's table name or null if not available or pertinent. */
default TableName getTableName() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {

private IOException exception;

private TableName tableName;

/**
* Rpc target Region's RegionInfo we are going against. May be null.
* @see #hasRegionInfo()
Expand Down Expand Up @@ -144,6 +146,7 @@ public void reset() {
exception = null;
callTimeout = null;
regionInfo = null;
tableName = null;
// In the implementations of some callable with replicas, rpc calls are executed in a executor
// and we could cancel the operation from outside which means there could be a race between
// reset and startCancel. Although I think the race should be handled by the callable since the
Expand Down Expand Up @@ -281,4 +284,14 @@ public String toString() {
+ exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
+ cellScanner + '}';
}

@Override
public void setTableName(TableName tableName) {
this.tableName = tableName;
}

@Override
public TableName getTableName() {
return tableName;
}
}
Loading

0 comments on commit e8cbc3f

Please sign in to comment.