Skip to content

Commit 43afd17

Browse files
authored
HDFS-16394.RPCMetrics increases the number of handlers in processing. (#3822)
1 parent 7950548 commit 43afd17

File tree

4 files changed

+46
-0
lines changed

4 files changed

+46
-0
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ protected ResponseBuffer initialValue() {
498498
private Map<Integer, Listener> auxiliaryListenerMap;
499499
private Responder responder = null;
500500
private Handler[] handlers = null;
501+
private final AtomicInteger numInProcessHandler = new AtomicInteger();
501502

502503
private boolean logSlowRPC = false;
503504

@@ -509,6 +510,10 @@ protected boolean isLogSlowRPC() {
509510
return logSlowRPC;
510511
}
511512

513+
public int getNumInProcessHandler() {
514+
return numInProcessHandler.get();
515+
}
516+
512517
/**
513518
* Sets slow RPC flag.
514519
* @param logSlowRPCFlag
@@ -3080,6 +3085,7 @@ public void run() {
30803085

30813086
try {
30823087
call = callQueue.take(); // pop the queue; maybe blocked here
3088+
numInProcessHandler.incrementAndGet();
30833089
startTimeNanos = Time.monotonicNowNanos();
30843090
if (alignmentContext != null && call.isCallCoordinated() &&
30853091
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
@@ -3133,6 +3139,7 @@ public void run() {
31333139
}
31343140
} finally {
31353141
CurCall.set(null);
3142+
numInProcessHandler.decrementAndGet();
31363143
IOUtils.cleanupWithLogger(LOG, traceScope);
31373144
if (call != null) {
31383145
updateMetrics(call, startTimeNanos, connDropped);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public static RpcMetrics create(Server server, Configuration conf) {
133133
return server.getNumOpenConnections();
134134
}
135135

136+
@Metric("Number of in process handlers")
137+
public int getNumInProcessHandler() {
138+
return server.getNumInProcessHandler();
139+
}
140+
136141
@Metric("Number of open connections per user")
137142
public String numOpenConnectionsPerUser() {
138143
return server.getNumOpenConnectionsPerUser();
@@ -288,6 +293,7 @@ public void incrClientBackoff() {
288293
public void incrSlowRpc() {
289294
rpcSlowCalls.incr();
290295
}
296+
291297
/**
292298
* Returns a MutableRate Counter.
293299
* @return Mutable Rate

hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
8383
| `RpcAuthorizationFailures` | Total number of authorization failures |
8484
| `RpcAuthorizationSuccesses` | Total number of authorization successes |
8585
| `NumOpenConnections` | Current number of open connections |
86+
| `NumInProcessHandler` | Current number of handlers on working |
8687
| `CallQueueLength` | Current length of the call queue |
8788
| `numDroppedConnections` | Total number of dropped connections |
8889
| `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import org.apache.hadoop.ipc.metrics.RpcMetrics;
2122
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
2223
import org.apache.hadoop.HadoopIllegalArgumentException;
2324
import org.apache.hadoop.conf.Configuration;
@@ -1107,6 +1108,37 @@ public TestRpcService run() {
11071108
}
11081109
}
11091110

1111+
@Test
1112+
public void testNumInProcessHandlerMetrics() throws Exception {
1113+
UserGroupInformation ugi = UserGroupInformation.
1114+
createUserForTesting("user123", new String[0]);
1115+
// use 1 handler so the callq can be plugged
1116+
final Server server = setupTestServer(conf, 1);
1117+
try {
1118+
RpcMetrics rpcMetrics = server.getRpcMetrics();
1119+
assertEquals(0, rpcMetrics.getNumInProcessHandler());
1120+
1121+
ExternalCall<String> call1 = newExtCall(ugi, () -> {
1122+
assertEquals(1, rpcMetrics.getNumInProcessHandler());
1123+
return UserGroupInformation.getCurrentUser().getUserName();
1124+
});
1125+
ExternalCall<Void> call2 = newExtCall(ugi, () -> {
1126+
assertEquals(1, rpcMetrics.getNumInProcessHandler());
1127+
return null;
1128+
});
1129+
1130+
server.queueCall(call1);
1131+
server.queueCall(call2);
1132+
1133+
// Wait for call1 and call2 to enter the handler.
1134+
call1.get();
1135+
call2.get();
1136+
assertEquals(0, rpcMetrics.getNumInProcessHandler());
1137+
} finally {
1138+
server.stop();
1139+
}
1140+
}
1141+
11101142
/**
11111143
* Test RPC backoff by queue full.
11121144
*/

0 commit comments

Comments
 (0)