diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index 3b1d5e55781df..576ba1bde3933 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -278,6 +278,14 @@ public void routerFailureReadOnly() { } } + @Override + public void recordAsyncQueueSize(String nsId, int queueSize) { + if (nameserviceRPCMetricsMap != null && + nameserviceRPCMetricsMap.containsKey(nsId)) { + nameserviceRPCMetricsMap.get(nsId).setAsyncHandlerQueueSize(queueSize); + } + } + @Override public void routerFailureLocked() { if (metrics != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java index 6874de0f60262..50e23e0dd252e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java @@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableRate; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,8 @@ public class NameserviceRPCMetrics implements NameserviceRPCMBean { private MutableCounterLong proxyOpPermitRejected; @Metric("Number of operations accepted to hit a namenode") private MutableCounterLong proxyOpPermitAccepted; + @Metric("Async Queue Size") + private MutableGaugeInt asyncHandlerQueueSize; public NameserviceRPCMetrics(Configuration conf, String nsId) { this.nsId = NAMESERVICE_RPC_METRICS_PREFIX + nsId; @@ -116,6 +119,10 @@ public long getProxyOpPermitAccepted() { return proxyOpPermitAccepted.value(); } + public void setAsyncHandlerQueueSize(int size) { + asyncHandlerQueueSize.set(size); + } + /** * Add the time to proxy an operation from the moment the Router sends it to * the Namenode until it replied. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index cedcddfdeb862..ab25e574f7dde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -85,6 +85,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count"; public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "queue.size"; + public static final int DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT = 1000; public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count"; public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index 27385a2686ce5..9c4642c1ed474 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -138,4 +138,12 @@ void init( * If a path is in a read only mount point. */ void routerFailureReadOnly(); + + /** + * Records the size of the async handler queue for the given namespace. + * + * @param nsId the namespace identifier + * @param queueSize the current size of the async queue + */ + void recordAsyncQueueSize(String nsId, int queueSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 77bebab4ade71..c7d00774f11ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; @@ -58,7 +60,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -70,7 +71,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -292,11 +295,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private RouterRenameOption routerRenameOption; /** Schedule the router federation rename jobs. */ private BalanceProcedureScheduler fedRenameScheduler; - private boolean enableAsync; - private Map nsAsyncHandlerCount = new ConcurrentHashMap<>(); - private Map asyncRouterHandlerExecutors = new ConcurrentHashMap<>(); + private final boolean enableAsync; + private final Map asyncRouterHandlerExecutors = new ConcurrentHashMap<>(); + private ThreadPoolExecutor routerDefaultAsyncHandlerExecutor; private ExecutorService routerAsyncResponderExecutor; - private ExecutorService routerDefaultAsyncHandlerExecutor; /** * Construct a router RPC server. @@ -504,31 +506,38 @@ public RouterRpcServer(Configuration conf, Router router, */ public void initAsyncThreadPools(Configuration configuration) { LOG.info("Begin initialize asynchronous handler and responder thread pool."); - initNsAsyncHandlerCount(); + Map nsAsyncHandlerCount = parseNsAsyncHandlerCount(configuration); Set allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration); - Set unassignedNS = new HashSet<>(); allConfiguredNS.add(CONCURRENT_NS); + int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE, + DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT); + if (asyncQueueSize <= 1) { + throw new IllegalArgumentException("Async queue size must be greater than 1"); + } + int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); for (String nsId : allConfiguredNS) { int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0); - LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); + if (dedicatedHandlers <= 0) { + dedicatedHandlers = asyncHandlerCountDefault; + LOG.info("Use default async handler count {} for ns {}.", asyncHandlerCountDefault, nsId); + } else { + LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); + } + if (dedicatedHandlers > 0) { - initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers); + int finalDedicatedHandlers = dedicatedHandlers; + asyncRouterHandlerExecutors.computeIfAbsent(nsId, + id -> initAsyncHandlerThreadPools4Ns(id, asyncQueueSize, finalDedicatedHandlers)); LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId); - } else { - unassignedNS.add(nsId); } } - int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, - DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); - - if (!unassignedNS.isEmpty()) { - LOG.warn("Async handler unassigned ns: {}", unassignedNS); - LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault); - for (String nsId : unassignedNS) { - initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault); - } + if (routerDefaultAsyncHandlerExecutor == null) { + LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault); + routerDefaultAsyncHandlerExecutor = initAsyncHandlerThreadPools4Ns( + "default", asyncQueueSize, asyncHandlerCountDefault); } int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, @@ -539,17 +548,35 @@ public void initAsyncThreadPools(Configuration configuration) { asyncResponderCount, new AsyncThreadFactory("Router Async Responder #")); } AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor); + } - if (routerDefaultAsyncHandlerExecutor == null) { - LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault); - routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool( - asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #")); + private ThreadPoolExecutor initAsyncHandlerThreadPools4Ns(String ns, int asyncQueueSize, int handlerCount) { + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(asyncQueueSize); + return new ThreadPoolExecutor(handlerCount, handlerCount, + 0L, TimeUnit.MILLISECONDS, queue, + new AsyncThreadFactory("Router Async Handler for " + ns + " #")); + } + + /** + * Returns the asynchronous executor for the specified namespace. + * If no executor is configured for the given namespace ID, returns the default executor. + * + * @param nsId the namespace identifier + * @return the corresponding ExecutorService + */ + public ThreadPoolExecutor getAsyncExecutorForNamespace(String nsId) { + ThreadPoolExecutor executorService = asyncRouterHandlerExecutors.getOrDefault( + nsId, routerDefaultAsyncHandlerExecutor); + if (rpcMonitor != null) { + rpcMonitor.recordAsyncQueueSize(nsId, executorService.getQueue().size()); } + return executorService; } - private void initNsAsyncHandlerCount() { + private Map parseNsAsyncHandlerCount(Configuration conf) { String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT); + Map nsAsyncHandlerCount = new ConcurrentHashMap<>(); if (StringUtils.isEmpty(configNsHandler)) { LOG.error( "The value of config key: {} is empty. Will use default conf.", @@ -566,11 +593,7 @@ private void initNsAsyncHandlerCount() { } nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1])); } - } - - private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) { - asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool( - dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #"))); + return nsAsyncHandlerCount; } /** @@ -2489,14 +2512,6 @@ public boolean isAsync() { return this.enableAsync; } - public Map getAsyncRouterHandlerExecutors() { - return asyncRouterHandlerExecutors; - } - - public ExecutorService getRouterAsyncHandlerDefaultExecutor() { - return routerDefaultAsyncHandlerExecutor; - } - private static class AsyncThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger threadNumber = new AtomicInteger(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 13f6dd3b952eb..d4b0f11aed101 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -57,6 +57,8 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; @@ -172,26 +174,33 @@ public Object invokeMethod( " with params " + Arrays.deepToString(params) + " from " + router.getRouterId()); } - String nsid = namenodes.get(0).getNameserviceId(); + String nsId = namenodes.get(0).getNameserviceId(); + ThreadPoolExecutor executor = router.getRpcServer().getAsyncExecutorForNamespace(nsId); + int queueSize = executor.getQueue().size(); // transfer threadLocalContext to worker threads of executor. ThreadLocalContext threadLocalContext = new ThreadLocalContext(); asyncComplete(null); - asyncApplyUseExecutor((AsyncApplyFunction) o -> { + asyncTry(() -> asyncApplyUseExecutor((AsyncApplyFunction) o -> { if (LOG.isDebugEnabled()) { LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver, - namenodes.toString(), params); + namenodes, params); } threadLocalContext.transfer(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsid, ugi, method.getName(), controller); + acquirePermit(nsId, ugi, method.getName(), controller); invokeMethodAsync(ugi, (List) namenodes, useObserver, protocol, method, params); asyncFinally(object -> { - releasePermit(nsid, ugi, method, controller); + releasePermit(nsId, ugi, method, controller); return object; }); - }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, - router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); + }, executor)); + + // Convert RejectedExecutionException to StandbyException since + asyncCatch((ret, e) -> { + LOG.warn("Async handler queue is full for namespace '{}'. Current queue size: {}", nsId, queueSize); + throw new StandbyException("Namespace '" + nsId + "' is overloaded (queue size: " + queueSize + ")"); + }, RejectedExecutionException.class); return null; }