Skip to content

Commit 7ab88fe

Browse files
HDFS-17744. [ARR] getEnclosingRoot RPC adapts to async rpc. (#7445). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <keepromise@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
1 parent 972b6e0 commit 7ab88fe

File tree

2 files changed

+196
-0
lines changed

2 files changed

+196
-0
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
3131
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
3232
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
33+
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
3334
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
3435
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
3536
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -43,6 +44,7 @@
4344
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
4445
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
4546
import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
47+
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
4648
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
4749
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
4850
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
@@ -104,6 +106,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
104106
private final boolean allowPartialList;
105107
/** Time out when getting the mount statistics. */
106108
private long mountStatusTimeOut;
109+
/** Default nameservice enabled. */
110+
private final boolean defaultNameServiceEnabled;
107111
/** Identifier for the super user. */
108112
private String superUser;
109113
/** Identifier for the super group. */
@@ -126,6 +130,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer)
126130
this.mountStatusTimeOut = getMountStatusTimeOut();
127131
this.superUser = getSuperUser();
128132
this.superGroup = getSuperGroup();
133+
this.defaultNameServiceEnabled = conf.getBoolean(
134+
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
135+
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
129136
}
130137

131138
@Override
@@ -1086,4 +1093,35 @@ public boolean isMultiDestDirectory(String src) {
10861093

10871094
return asyncReturn(boolean.class);
10881095
}
1096+
1097+
@Override
1098+
public Path getEnclosingRoot(String src) throws IOException {
1099+
final Path[] mountPath = new Path[1];
1100+
if (defaultNameServiceEnabled) {
1101+
mountPath[0] = new Path("/");
1102+
}
1103+
1104+
if (subclusterResolver instanceof MountTableResolver) {
1105+
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
1106+
if (mountTable.getMountPoint(src) != null) {
1107+
mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath());
1108+
}
1109+
}
1110+
1111+
if (mountPath[0] == null) {
1112+
throw new IOException(String.format("No mount point for %s", src));
1113+
}
1114+
1115+
getEZForPath(src);
1116+
asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> {
1117+
if (zone == null) {
1118+
return mountPath[0];
1119+
} else {
1120+
Path zonePath = new Path(zone.getPath());
1121+
return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0];
1122+
}
1123+
});
1124+
return asyncReturn(Path.class);
1125+
}
1126+
10891127
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FileSystem;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
24+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
25+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
26+
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
27+
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
28+
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
29+
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
30+
import org.apache.hadoop.hdfs.server.federation.router.Router;
31+
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
32+
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
33+
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
34+
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
35+
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
36+
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
37+
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
38+
import org.apache.hadoop.util.Time;
39+
import org.junit.After;
40+
import org.junit.AfterClass;
41+
import org.junit.BeforeClass;
42+
import org.junit.Test;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
import java.io.IOException;
47+
import java.util.Collections;
48+
49+
import static org.junit.Assert.assertEquals;
50+
import static org.junit.Assert.assertTrue;
51+
52+
/**
53+
* Test a router end-to-end including the MountTable using async rpc.
54+
*/
55+
public class TestRouterAsyncMountTable {
56+
public static final Logger LOG = LoggerFactory.getLogger(TestRouterAsyncMountTable.class);
57+
58+
private static StateStoreDFSCluster cluster;
59+
private static MiniRouterDFSCluster.NamenodeContext nnContext0;
60+
private static MiniRouterDFSCluster.NamenodeContext nnContext1;
61+
private static MiniRouterDFSCluster.RouterContext routerContext;
62+
private static MountTableResolver mountTable;
63+
private static FileSystem routerFs;
64+
65+
@BeforeClass
66+
public static void globalSetUp() throws Exception {
67+
// Build and start a federated cluster.
68+
cluster = new StateStoreDFSCluster(false, 2);
69+
Configuration conf = new RouterConfigBuilder()
70+
.stateStore()
71+
.admin()
72+
.rpc()
73+
.build();
74+
conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20);
75+
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
76+
cluster.addRouterOverrides(conf);
77+
cluster.startCluster();
78+
cluster.startRouters();
79+
cluster.waitClusterUp();
80+
81+
// Get the end points.
82+
nnContext0 = cluster.getNamenode("ns0", null);
83+
nnContext1 = cluster.getNamenode("ns1", null);
84+
routerContext = cluster.getRandomRouter();
85+
routerFs = routerContext.getFileSystem();
86+
Router router = routerContext.getRouter();
87+
mountTable = (MountTableResolver) router.getSubclusterResolver();
88+
}
89+
90+
@AfterClass
91+
public static void tearDown() {
92+
if (cluster != null) {
93+
cluster.stopRouter(routerContext);
94+
cluster.shutdown();
95+
cluster = null;
96+
}
97+
}
98+
99+
@After
100+
public void clearMountTable() throws IOException {
101+
RouterClient client = routerContext.getAdminClient();
102+
MountTableManager mountTableManager = client.getMountTableManager();
103+
GetMountTableEntriesRequest req1 =
104+
GetMountTableEntriesRequest.newInstance("/");
105+
GetMountTableEntriesResponse response =
106+
mountTableManager.getMountTableEntries(req1);
107+
for (MountTable entry : response.getEntries()) {
108+
RemoveMountTableEntryRequest req2 =
109+
RemoveMountTableEntryRequest.newInstance(entry.getSourcePath());
110+
mountTableManager.removeMountTableEntry(req2);
111+
}
112+
mountTable.setDefaultNSEnable(true);
113+
}
114+
115+
/**
116+
* Add a mount table entry to the mount table through the admin API.
117+
* @param entry Mount table entry to add.
118+
* @return If it was succesfully added.
119+
* @throws IOException Problems adding entries.
120+
*/
121+
private boolean addMountTable(final MountTable entry) throws IOException {
122+
RouterClient client = routerContext.getAdminClient();
123+
MountTableManager mountTableManager = client.getMountTableManager();
124+
AddMountTableEntryRequest addRequest =
125+
AddMountTableEntryRequest.newInstance(entry);
126+
AddMountTableEntryResponse addResponse =
127+
mountTableManager.addMountTableEntry(addRequest);
128+
129+
// Reload the Router cache.
130+
mountTable.loadCache(true);
131+
132+
return addResponse.getStatus();
133+
}
134+
135+
@Test
136+
public void testGetEnclosingRoot() throws Exception {
137+
138+
// Add a read only entry.
139+
MountTable readOnlyEntry = MountTable.newInstance(
140+
"/readonly", Collections.singletonMap("ns0", "/testdir"));
141+
readOnlyEntry.setReadOnly(true);
142+
assertTrue(addMountTable(readOnlyEntry));
143+
assertEquals(routerFs.getEnclosingRoot(new Path("/readonly")), new Path("/readonly"));
144+
145+
assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/"));
146+
assertEquals(routerFs.getEnclosingRoot(new Path("/regular")),
147+
routerFs.getEnclosingRoot(routerFs.getEnclosingRoot(new Path("/regular"))));
148+
149+
// Add a regular entry.
150+
MountTable regularEntry = MountTable.newInstance(
151+
"/regular", Collections.singletonMap("ns0", "/testdir"));
152+
assertTrue(addMountTable(regularEntry));
153+
assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/regular"));
154+
155+
// Path does not need to exist.
156+
assertEquals(routerFs.getEnclosingRoot(new Path("/regular/pathDNE")), new Path("/regular"));
157+
}
158+
}

0 commit comments

Comments
 (0)