Skip to content

Commit cfba8b1

Browse files
hfutatzhanghbYaniv Kunda
authored andcommitted
HDFS-17766. [ARR] Avoid initializing unused threadPool in RouterAsyncRpcClient. (apache#7561). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <keepromise@apache.org>
1 parent 96aad56 commit cfba8b1

File tree

5 files changed

+59
-23
lines changed

5 files changed

+59
-23
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public class RouterRpcClient {
128128
/** Connection pool to the Namenodes per user for performance. */
129129
private final ConnectionManager connectionManager;
130130
/** Service to run asynchronous calls. */
131-
private final ThreadPoolExecutor executorService;
131+
private ThreadPoolExecutor executorService;
132132
/** Retry policy for router -> NN communication. */
133133
private final RetryPolicy retryPolicy;
134134
/** Optional perf monitor. */
@@ -184,24 +184,7 @@ public RouterRpcClient(Configuration conf, Router router,
184184
this.connectionManager.start();
185185
this.routerRpcFairnessPolicyController =
186186
FederationUtil.newFairnessPolicyController(conf);
187-
188-
int numThreads = conf.getInt(
189-
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
190-
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
191-
ThreadFactory threadFactory = new ThreadFactoryBuilder()
192-
.setNameFormat("RPC Router Client-%d")
193-
.build();
194-
BlockingQueue<Runnable> workQueue;
195-
if (conf.getBoolean(
196-
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
197-
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
198-
workQueue = new ArrayBlockingQueue<>(numThreads);
199-
} else {
200-
workQueue = new LinkedBlockingQueue<>();
201-
}
202-
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
203-
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
204-
187+
initConcurrentCallExecutorService(conf);
205188
this.rpcMonitor = monitor;
206189

207190
int maxFailoverAttempts = conf.getInt(
@@ -245,6 +228,25 @@ public RouterRpcClient(Configuration conf, Router router,
245228
this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();
246229
}
247230

231+
protected void initConcurrentCallExecutorService(Configuration conf) {
232+
int numThreads = conf.getInt(
233+
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
234+
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
235+
ThreadFactory threadFactory = new ThreadFactoryBuilder()
236+
.setNameFormat("RPC Router Client-%d")
237+
.build();
238+
BlockingQueue<Runnable> workQueue;
239+
if (conf.getBoolean(
240+
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
241+
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
242+
workQueue = new ArrayBlockingQueue<>(numThreads);
243+
} else {
244+
workQueue = new LinkedBlockingQueue<>();
245+
}
246+
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
247+
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
248+
}
249+
248250
/**
249251
* Get the configuration for the RPC client. It takes the Router
250252
* configuration and transforms it into regular RPC Client configuration.
@@ -278,6 +280,15 @@ public ActiveNamenodeResolver getNamenodeResolver() {
278280
return this.namenodeResolver;
279281
}
280282

283+
/**
284+
* Get the executor service used by invoking concurrent calls.
285+
* @return the executor service.
286+
*/
287+
@VisibleForTesting
288+
public ThreadPoolExecutor getExecutorService() {
289+
return executorService;
290+
}
291+
281292
/**
282293
* Shutdown the client.
283294
*/
@@ -364,9 +375,11 @@ public String getJSON() {
364375
*/
365376
public String getAsyncCallerPoolJson() {
366377
final Map<String, Integer> info = new LinkedHashMap<>();
367-
info.put("active", executorService.getActiveCount());
368-
info.put("total", executorService.getPoolSize());
369-
info.put("max", executorService.getMaximumPoolSize());
378+
if (executorService != null) {
379+
info.put("active", executorService.getActiveCount());
380+
info.put("total", executorService.getPoolSize());
381+
info.put("max", executorService.getMaximumPoolSize());
382+
}
370383
return JSON.toString(info);
371384
}
372385

@@ -2027,7 +2040,6 @@ protected boolean shouldRotateCache(IOException ioe) {
20272040
return isUnavailableException(ioe);
20282041
}
20292042

2030-
20312043
/**
20322044
* The {@link ExecutionStatus} class is a utility class used to track the status of
20332045
* execution operations performed by the {@link RouterRpcClient}.

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public RouterAsyncRpcClient(Configuration conf,
116116
this.rpcMonitor = monitor;
117117
}
118118

119+
@Override
120+
protected void initConcurrentCallExecutorService(Configuration conf) {
121+
// No need to initialize the thread pool for concurrent call.
122+
}
123+
119124
/**
120125
* Invoke method in all locations and return success if any succeeds.
121126
*

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2426,4 +2426,9 @@ public void testCallerContextNotResetByAsyncHandler() throws IOException {
24262426
// The audit log should not contain async:true.
24272427
assertFalse(auditLog.getOutput().contains("async:true"));
24282428
}
2429+
2430+
@Test
2431+
public void testConcurrentCallExecutorInitial() {
2432+
assertNotNull(router.getRouterRpcClient().getExecutorService());
2433+
}
24292434
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
3737
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
3838
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
39+
import static org.junit.jupiter.api.Assertions.assertNull;
3940

4041
/**
4142
* Testing the asynchronous RPC functionality of the router.
@@ -83,4 +84,9 @@ public void testgetGroupsForUser() throws Exception {
8384
assertArrayEquals(group, result);
8485
}
8586

87+
@Test
88+
@Override
89+
public void testConcurrentCallExecutorInitial() {
90+
assertNull(rndRouter.getRouterRpcClient().getExecutorService());
91+
}
8692
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
3535
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
3636
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
37+
import static org.junit.jupiter.api.Assertions.assertNull;
3738

3839
/**
3940
* Testing the asynchronous RPC functionality of the router with multiple mounts.
@@ -70,4 +71,11 @@ public void testgetGroupsForUser() throws Exception {
7071
String[] result = syncReturn(String[].class);
7172
assertArrayEquals(group, result);
7273
}
74+
75+
@Test
76+
@Override
77+
public void testConcurrentCallExecutorInitial() {
78+
MiniRouterDFSCluster.RouterContext rndRouter = super.getRouterContext();
79+
assertNull(rndRouter.getRouterRpcClient().getExecutorService());
80+
}
7381
}

0 commit comments

Comments
 (0)