Skip to content

Commit ec2f908

Browse files
author
ZanderXu
committed
SPDI-166501. AsyncRouterHandlerExecutors in RouterRpcServer should use bounded queue
1 parent d491f0b commit ec2f908

File tree

6 files changed

+95
-45
lines changed

6 files changed

+95
-45
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,14 @@ public void routerFailureReadOnly() {
278278
}
279279
}
280280

281+
@Override
282+
public void recordAsyncQueueSize(String nsId, int queueSize) {
283+
if (nameserviceRPCMetricsMap != null &&
284+
nameserviceRPCMetricsMap.containsKey(nsId)) {
285+
nameserviceRPCMetricsMap.get(nsId).setAsyncHandlerQueueSize(queueSize);
286+
}
287+
}
288+
281289
@Override
282290
public void routerFailureLocked() {
283291
if (metrics != null) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
2525
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
2626
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
27+
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
2728
import org.apache.hadoop.metrics2.lib.MutableRate;
2829

2930
import java.util.concurrent.ThreadLocalRandom;
@@ -55,6 +56,8 @@ public class NameserviceRPCMetrics implements NameserviceRPCMBean {
5556
private MutableCounterLong proxyOpPermitRejected;
5657
@Metric("Number of operations accepted to hit a namenode")
5758
private MutableCounterLong proxyOpPermitAccepted;
59+
@Metric("Async Queue Size")
60+
private MutableGaugeInt asyncHandlerQueueSize;
5861

5962
public NameserviceRPCMetrics(Configuration conf, String nsId) {
6063
this.nsId = NAMESERVICE_RPC_METRICS_PREFIX + nsId;
@@ -116,6 +119,10 @@ public long getProxyOpPermitAccepted() {
116119
return proxyOpPermitAccepted.value();
117120
}
118121

122+
public void setAsyncHandlerQueueSize(int size) {
123+
asyncHandlerQueueSize.set(size);
124+
}
125+
119126
/**
120127
* Add the time to proxy an operation from the moment the Router sends it to
121128
* the Namenode until it replied.

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
8585
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
8686
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
8787
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
88+
public static final String DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE =
89+
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "queue.size";
90+
public static final int DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT = 1000;
8891
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
8992
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
9093
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,12 @@ void init(
138138
* If a path is in a read only mount point.
139139
*/
140140
void routerFailureReadOnly();
141+
142+
/**
143+
* Records the size of the async handler queue for the given namespace.
144+
*
145+
* @param nsId the namespace identifier
146+
* @param queueSize the current size of the async queue
147+
*/
148+
void recordAsyncQueueSize(String nsId, int queueSize);
141149
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
2626
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
2727
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
28+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE;
29+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT;
2830
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
2931
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
3032
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
@@ -58,7 +60,6 @@
5860
import java.util.ArrayList;
5961
import java.util.Collection;
6062
import java.util.EnumSet;
61-
import java.util.HashSet;
6263
import java.util.Iterator;
6364
import java.util.LinkedHashMap;
6465
import java.util.LinkedHashSet;
@@ -70,7 +71,9 @@
7071
import java.util.concurrent.ExecutionException;
7172
import java.util.concurrent.ExecutorService;
7273
import java.util.concurrent.Executors;
74+
import java.util.concurrent.LinkedBlockingQueue;
7375
import java.util.concurrent.ThreadFactory;
76+
import java.util.concurrent.ThreadPoolExecutor;
7477
import java.util.concurrent.TimeUnit;
7578
import java.util.concurrent.atomic.AtomicInteger;
7679
import java.util.stream.Collectors;
@@ -292,11 +295,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
292295
private RouterRenameOption routerRenameOption;
293296
/** Schedule the router federation rename jobs. */
294297
private BalanceProcedureScheduler fedRenameScheduler;
295-
private boolean enableAsync;
296-
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
297-
private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
298+
private final boolean enableAsync;
299+
private final Map<String, ThreadPoolExecutor> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
300+
private ThreadPoolExecutor routerDefaultAsyncHandlerExecutor;
298301
private ExecutorService routerAsyncResponderExecutor;
299-
private ExecutorService routerDefaultAsyncHandlerExecutor;
300302

301303
/**
302304
* Construct a router RPC server.
@@ -504,31 +506,38 @@ public RouterRpcServer(Configuration conf, Router router,
504506
*/
505507
public void initAsyncThreadPools(Configuration configuration) {
506508
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
507-
initNsAsyncHandlerCount();
509+
Map<String, Integer> nsAsyncHandlerCount = parseNsAsyncHandlerCount(configuration);
508510
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
509-
Set<String> unassignedNS = new HashSet<>();
510511
allConfiguredNS.add(CONCURRENT_NS);
511512

513+
int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
514+
DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
515+
if (asyncQueueSize <= 1) {
516+
throw new IllegalArgumentException("Async queue size must be greater than 1");
517+
}
518+
int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
519+
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
512520
for (String nsId : allConfiguredNS) {
513521
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
514-
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
522+
if (dedicatedHandlers <= 0) {
523+
dedicatedHandlers = asyncHandlerCountDefault;
524+
LOG.info("Use default async handler count {} for ns {}.", asyncHandlerCountDefault, nsId);
525+
} else {
526+
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
527+
}
528+
515529
if (dedicatedHandlers > 0) {
516-
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
530+
int finalDedicatedHandlers = dedicatedHandlers;
531+
asyncRouterHandlerExecutors.computeIfAbsent(nsId,
532+
id -> initAsyncHandlerThreadPools4Ns(id, asyncQueueSize, finalDedicatedHandlers));
517533
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
518-
} else {
519-
unassignedNS.add(nsId);
520534
}
521535
}
522536

523-
int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
524-
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
525-
526-
if (!unassignedNS.isEmpty()) {
527-
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
528-
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
529-
for (String nsId : unassignedNS) {
530-
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
531-
}
537+
if (routerDefaultAsyncHandlerExecutor == null) {
538+
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
539+
routerDefaultAsyncHandlerExecutor = initAsyncHandlerThreadPools4Ns(
540+
"default", asyncQueueSize, asyncHandlerCountDefault);
532541
}
533542

534543
int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
@@ -539,17 +548,35 @@ public void initAsyncThreadPools(Configuration configuration) {
539548
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
540549
}
541550
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
551+
}
542552

543-
if (routerDefaultAsyncHandlerExecutor == null) {
544-
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
545-
routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
546-
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
553+
private ThreadPoolExecutor initAsyncHandlerThreadPools4Ns(String ns, int asyncQueueSize, int handlerCount) {
554+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(asyncQueueSize);
555+
return new ThreadPoolExecutor(handlerCount, handlerCount,
556+
0L, TimeUnit.MILLISECONDS, queue,
557+
new AsyncThreadFactory("Router Async Handler for " + ns + " #"));
558+
}
559+
560+
/**
561+
* Returns the asynchronous executor for the specified namespace.
562+
* If no executor is configured for the given namespace ID, returns the default executor.
563+
*
564+
* @param nsId the namespace identifier
565+
* @return the corresponding ExecutorService
566+
*/
567+
public ThreadPoolExecutor getAsyncExecutorForNamespace(String nsId) {
568+
ThreadPoolExecutor executorService = asyncRouterHandlerExecutors.getOrDefault(
569+
nsId, routerDefaultAsyncHandlerExecutor);
570+
if (rpcMonitor != null) {
571+
rpcMonitor.recordAsyncQueueSize(nsId, executorService.getQueue().size());
547572
}
573+
return executorService;
548574
}
549575

550-
private void initNsAsyncHandlerCount() {
576+
private Map<String, Integer> parseNsAsyncHandlerCount(Configuration conf) {
551577
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
552578
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
579+
Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
553580
if (StringUtils.isEmpty(configNsHandler)) {
554581
LOG.error(
555582
"The value of config key: {} is empty. Will use default conf.",
@@ -566,11 +593,7 @@ private void initNsAsyncHandlerCount() {
566593
}
567594
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
568595
}
569-
}
570-
571-
private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
572-
asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
573-
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
596+
return nsAsyncHandlerCount;
574597
}
575598

576599
/**
@@ -2489,14 +2512,6 @@ public boolean isAsync() {
24892512
return this.enableAsync;
24902513
}
24912514

2492-
public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
2493-
return asyncRouterHandlerExecutors;
2494-
}
2495-
2496-
public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
2497-
return routerDefaultAsyncHandlerExecutor;
2498-
}
2499-
25002515
private static class AsyncThreadFactory implements ThreadFactory {
25012516
private final String namePrefix;
25022517
private final AtomicInteger threadNumber = new AtomicInteger(1);

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import java.util.Map;
5858
import java.util.concurrent.Callable;
5959
import java.util.concurrent.CompletableFuture;
60+
import java.util.concurrent.RejectedExecutionException;
61+
import java.util.concurrent.ThreadPoolExecutor;
6062

6163
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
6264
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
@@ -172,26 +174,33 @@ public Object invokeMethod(
172174
" with params " + Arrays.deepToString(params) + " from "
173175
+ router.getRouterId());
174176
}
175-
String nsid = namenodes.get(0).getNameserviceId();
177+
String nsId = namenodes.get(0).getNameserviceId();
178+
ThreadPoolExecutor executor = router.getRpcServer().getAsyncExecutorForNamespace(nsId);
179+
int queueSize = executor.getQueue().size();
176180
// transfer threadLocalContext to worker threads of executor.
177181
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
178182
asyncComplete(null);
179-
asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
183+
asyncTry(() -> asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
180184
if (LOG.isDebugEnabled()) {
181185
LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver,
182-
namenodes.toString(), params);
186+
namenodes, params);
183187
}
184188
threadLocalContext.transfer();
185189
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
186-
acquirePermit(nsid, ugi, method.getName(), controller);
190+
acquirePermit(nsId, ugi, method.getName(), controller);
187191
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
188192
useObserver, protocol, method, params);
189193
asyncFinally(object -> {
190-
releasePermit(nsid, ugi, method, controller);
194+
releasePermit(nsId, ugi, method, controller);
191195
return object;
192196
});
193-
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
194-
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
197+
}, executor));
198+
199+
// Convert RejectedExecutionException to StandbyException since
200+
asyncCatch((ret, e) -> {
201+
LOG.warn("Async handler queue is full for namespace '{}'. Current queue size: {}", nsId, queueSize);
202+
throw new StandbyException("Namespace '" + nsId + "' is overloaded (queue size: " + queueSize + ")");
203+
}, RejectedExecutionException.class);
195204
return null;
196205
}
197206

0 commit comments

Comments
 (0)