Skip to content

Commit

Permalink
HDFS-17659. [ARR]Router Quota supports asynchronous rpc. (apache#7157)…
Browse files Browse the repository at this point in the history
…. Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
  • Loading branch information
hfutatzhanghb authored and Hexiaoqiao committed Feb 20, 2025
1 parent 060f6b8 commit 198288c
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;

import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;

public class AsyncQuota extends Quota {

/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;
private final Router router;

public AsyncQuota(Router router, RouterRpcServer server) {
super(router, server);
this.router = router;
this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient();
}

/**
* Async get aggregated quota usage for the federation path.
* @param path Federation path.
* @return Aggregated quota.
* @throws IOException If the quota system is disabled.
*/
public QuotaUsage getQuotaUsage(String path) throws IOException {
getEachQuotaUsage(path);

asyncApply(o -> {
Map<RemoteLocation, QuotaUsage> results = (Map<RemoteLocation, QuotaUsage>) o;
try {
return aggregateQuota(path, results);
} catch (IOException e) {
throw new CompletionException(e);
}
});
return asyncReturn(QuotaUsage.class);
}

/**
* Get quota usage for the federation path.
* @param path Federation path.
* @return quota usage for each remote location.
* @throws IOException If the quota system is disabled.
*/
Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
if (!router.isQuotaEnabled()) {
throw new IOException("The quota system is disabled in Router.");
}

final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
RemoteMethod method = new RemoteMethod("getQuotaUsage",
new Class<?>[] {String.class}, new RemoteParam());
rpcClient.invokeConcurrent(
quotaLocs, method, true, false, QuotaUsage.class);
return asyncReturn(Map.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ private boolean isMountEntry(String path) {
* method will do some additional filtering.
* @param path Federation path.
* @return List of valid quota remote locations.
* @throws IOException
* @throws IOException If the location for this path cannot be determined.
*/
private List<RemoteLocation> getValidQuotaLocations(String path)
protected List<RemoteLocation> getValidQuotaLocations(String path)
throws IOException {
final List<RemoteLocation> locations = getQuotaRemoteLocations(path);

Expand Down Expand Up @@ -359,7 +359,7 @@ public static boolean andByStorageType(Predicate<StorageType> predicate) {
* federation path.
* @param path Federation path.
* @return List of quota remote locations.
* @throws IOException
* @throws IOException If the location for this path cannot be determined.
*/
private List<RemoteLocation> getQuotaRemoteLocations(String path)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;

/**
* Service to periodically update the {@link RouterQuotaUsage}
* cached information in the {@link Router}.
Expand Down Expand Up @@ -99,6 +101,9 @@ protected void periodicInvoke() {
// This is because mount table does not have mtime.
// For other mount entry get current quota usage
HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
if (rpcServer.isAsync()) {
ret = syncReturn(HdfsFileStatus.class);
}
if (ret == null || ret.getModificationTime() == 0) {
long[] zeroConsume = new long[StorageType.values().length];
currentQuotaUsage =
Expand All @@ -113,6 +118,9 @@ protected void periodicInvoke() {
Quota quotaModule = this.rpcServer.getQuotaModule();
Map<RemoteLocation, QuotaUsage> usageMap =
quotaModule.getEachQuotaUsage(src);
if (this.rpcServer.isAsync()) {
usageMap = (Map<RemoteLocation, QuotaUsage>)syncReturn(Map.class);
}
currentQuotaUsage = quotaModule.aggregateQuota(src, usageMap);
remoteQuotaUsage.putAll(usageMap);
} catch (IOException ioe) {
Expand All @@ -136,6 +144,8 @@ protected void periodicInvoke() {
}
} catch (IOException e) {
LOG.error("Quota cache updated error.", e);
} catch (Exception e) {
LOG.error(e.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.ipc.CallerContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
import static org.junit.Assert.assertTrue;

public class TestRouterAsyncQuota {
private static Configuration routerConf;
/** Federated HDFS cluster. */
private static MiniRouterDFSCluster cluster;
private static String ns0;

/** Random Router for this federated cluster. */
private MiniRouterDFSCluster.RouterContext router;
private FileSystem routerFs;
private RouterRpcServer routerRpcServer;
private AsyncQuota asyncQuota;

private final String testfilePath = "/testdir/testAsyncQuota.file";

@BeforeClass
public static void setUpCluster() throws Exception {
cluster = new MiniRouterDFSCluster(true, 1, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
cluster.setNumDatanodesPerNameservice(3);
cluster.setRacks(
new String[] {"/rack1", "/rack2", "/rack3"});
cluster.startCluster();

// Making one Namenode active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
}
}
// Start routers with only an RPC service
routerConf = new RouterConfigBuilder()
.rpc()
.quota(true)
.build();

// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
// We decrease the DN cache times to make the test faster
routerConf.setTimeDuration(
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
routerConf.setBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY, true);
cluster.addRouterOverrides(routerConf);
// Start routers with only an RPC service
cluster.startRouters();

// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
cluster.waitActiveNamespaces();
ns0 = cluster.getNameservices().get(0);
}

@AfterClass
public static void shutdownCluster() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}

@Before
public void setUp() throws IOException {
router = cluster.getRandomRouter();
routerFs = router.getFileSystem();
routerRpcServer = router.getRouterRpcServer();
routerRpcServer.initAsyncThreadPool();
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
routerRpcServer.getRPCMonitor(),
routerRpcServer.getRouterStateIdContext());
RouterRpcServer spy = Mockito.spy(routerRpcServer);
Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
asyncQuota = new AsyncQuota(router.getRouter(), spy);

// Create mock locations
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
resolver.addLocation("/", ns0, "/");
FsPermission permission = new FsPermission("705");
routerFs.mkdirs(new Path("/testdir"), permission);
FSDataOutputStream fsDataOutputStream = routerFs.create(
new Path(testfilePath), true);
fsDataOutputStream.write(new byte[1024]);
fsDataOutputStream.close();
}

@After
public void tearDown() throws IOException {
// clear client context
CallerContext.setCurrent(null);
boolean delete = routerFs.delete(new Path("/testdir"));
assertTrue(delete);
if (routerFs != null) {
routerFs.close();
}
}

@Test
public void testRouterAsyncGetQuotaUsage() throws Exception {
asyncQuota.getQuotaUsage("/testdir");
QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
// 3-replication.
Assert.assertEquals(3 * 1024, quotaUsage.getSpaceConsumed());
// We have one directory and one file.
Assert.assertEquals(2, quotaUsage.getFileAndDirectoryCount());
}

@Test
public void testRouterAsyncSetQuotaUsage() throws Exception {
asyncQuota.setQuota("/testdir", Long.MAX_VALUE, 8096, StorageType.DISK, false);
syncReturn(void.class);
asyncQuota.getQuotaUsage("/testdir");
QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
Assert.assertEquals(8096, quotaUsage.getTypeQuota(StorageType.DISK));
}
}

0 comments on commit 198288c

Please sign in to comment.