Skip to content

Commit 5567154

Browse files
authored
HDFS-16734. RBF: fix some bugs when handling getContentSummary RPC (#4763)
1 parent f8b9dd9 commit 5567154

File tree

2 files changed

+203
-20
lines changed

2 files changed

+203
-20
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 82 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import java.util.Set;
116116
import java.util.TreeMap;
117117
import java.util.concurrent.TimeUnit;
118+
import java.util.stream.Collectors;
118119

119120
/**
120121
* Module that implements all the RPC calls in {@link ClientProtocol} in the
@@ -1251,14 +1252,93 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
12511252
rpcClient.invokeConcurrent(nss, method, true, false);
12521253
}
12531254

1255+
/**
1256+
* Recursively get all the locations for the path.
1257+
* For example, there are some mount points:
1258+
* /a -> ns0 -> /a
1259+
* /a/b -> ns1 -> /a/b
1260+
* /a/b/c -> ns2 -> /a/b/c
1261+
* When the path is '/a', the result of locations should be
1262+
* {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]}
1263+
* @param path the path to get the locations.
1264+
* @return a map to store all the locations and key is namespace id.
1265+
* @throws IOException
1266+
*/
1267+
@VisibleForTesting
1268+
Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOException {
1269+
Map<String, List<RemoteLocation>> locations = new HashMap<>();
1270+
try {
1271+
List<RemoteLocation> parentLocations = rpcServer.getLocationsForPath(path, false, false);
1272+
parentLocations.forEach(
1273+
l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l));
1274+
} catch (NoLocationException | RouterResolveException e) {
1275+
LOG.debug("Cannot find locations for {}.", path);
1276+
}
1277+
1278+
final List<String> children = subclusterResolver.getMountPoints(path);
1279+
if (children != null) {
1280+
for (String child : children) {
1281+
String childPath = new Path(path, child).toUri().getPath();
1282+
Map<String, List<RemoteLocation>> childLocations = getAllLocations(childPath);
1283+
childLocations.forEach(
1284+
(k, v) -> locations.computeIfAbsent(k, l -> new ArrayList<>()).addAll(v));
1285+
}
1286+
}
1287+
return locations;
1288+
}
1289+
1290+
/**
1291+
* Get all the locations of the path for {@link this#getContentSummary(String)}.
1292+
* For example, there are some mount points:
1293+
* /a -> ns0 -> /a
1294+
* /a/b -> ns0 -> /a/b
1295+
* /a/b/c -> ns1 -> /a/b/c
1296+
* When the path is '/a', the result of locations should be
1297+
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
1298+
* When the path is '/b', will throw NoLocationException.
1299+
* @param path the path to get content summary
1300+
* @return one list contains all the remote location
1301+
* @throws IOException
1302+
*/
1303+
@VisibleForTesting
1304+
List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
1305+
// Try to get all the locations of the path.
1306+
final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
1307+
if (ns2Locations.isEmpty()) {
1308+
throw new NoLocationException(path, subclusterResolver.getClass());
1309+
}
1310+
1311+
final List<RemoteLocation> locations = new ArrayList<>();
1312+
// remove the redundancy remoteLocation order by destination.
1313+
ns2Locations.forEach((k, v) -> {
1314+
List<RemoteLocation> sortedList = v.stream().sorted().collect(Collectors.toList());
1315+
int size = sortedList.size();
1316+
for (int i = size - 1; i > -1; i--) {
1317+
RemoteLocation currentLocation = sortedList.get(i);
1318+
if (i == 0) {
1319+
locations.add(currentLocation);
1320+
} else {
1321+
RemoteLocation preLocation = sortedList.get(i - 1);
1322+
if (!currentLocation.getDest().startsWith(preLocation.getDest() + Path.SEPARATOR)) {
1323+
locations.add(currentLocation);
1324+
} else {
1325+
LOG.debug("Ignore redundant location {}, because there is an ancestor location {}",
1326+
currentLocation, preLocation);
1327+
}
1328+
}
1329+
}
1330+
});
1331+
1332+
return locations;
1333+
}
1334+
12541335
@Override
12551336
public ContentSummary getContentSummary(String path) throws IOException {
12561337
rpcServer.checkOperation(NameNode.OperationCategory.READ);
12571338

12581339
// Get the summaries from regular files
12591340
final Collection<ContentSummary> summaries = new ArrayList<>();
1260-
final List<RemoteLocation> locations =
1261-
rpcServer.getLocationsForPath(path, false, false);
1341+
final List<RemoteLocation> locations = getLocationsForContentSummary(path);
12621342
final RemoteMethod method = new RemoteMethod("getContentSummary",
12631343
new Class<?>[] {String.class}, new RemoteParam());
12641344
final List<RemoteResult<RemoteLocation, ContentSummary>> results =
@@ -1278,24 +1358,6 @@ public ContentSummary getContentSummary(String path) throws IOException {
12781358
}
12791359
}
12801360

1281-
// Add mount points at this level in the tree
1282-
final List<String> children = subclusterResolver.getMountPoints(path);
1283-
if (children != null) {
1284-
for (String child : children) {
1285-
Path childPath = new Path(path, child);
1286-
try {
1287-
ContentSummary mountSummary = getContentSummary(
1288-
childPath.toString());
1289-
if (mountSummary != null) {
1290-
summaries.add(mountSummary);
1291-
}
1292-
} catch (Exception e) {
1293-
LOG.error("Cannot get content summary for mount {}: {}",
1294-
childPath, e.getMessage());
1295-
}
1296-
}
1297-
}
1298-
12991361
// Throw original exception if no original nor mount points
13001362
if (summaries.isEmpty() && notFoundException != null) {
13011363
throw notFoundException;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
package org.apache.hadoop.hdfs.server.federation.router;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.ContentSummary;
22+
import org.apache.hadoop.fs.FSDataOutputStream;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
2125
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
2226
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
2327
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
2428
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
2529
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
2630
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
2731
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
32+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
2833
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
2934
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
3035
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
@@ -40,10 +45,13 @@
4045

4146
import java.io.IOException;
4247
import java.util.Collections;
48+
import java.util.List;
49+
import java.util.Map;
4350

4451
import static org.junit.Assert.assertEquals;
4552
import static org.junit.Assert.assertNotNull;
4653
import static org.junit.Assert.assertTrue;
54+
import static org.junit.Assert.fail;
4755

4856
/**
4957
* Test a router end-to-end including the MountTable without default nameservice.
@@ -53,6 +61,8 @@ public class TestRouterMountTableWithoutDefaultNS {
5361
private static RouterContext routerContext;
5462
private static MountTableResolver mountTable;
5563
private static ClientProtocol routerProtocol;
64+
private static FileSystem nnFs0;
65+
private static FileSystem nnFs1;
5666

5767
@BeforeClass
5868
public static void globalSetUp() throws Exception {
@@ -71,6 +81,8 @@ public static void globalSetUp() throws Exception {
7181
cluster.waitClusterUp();
7282

7383
// Get the end points
84+
nnFs0 = cluster.getNamenode("ns0", null).getFileSystem();
85+
nnFs1 = cluster.getNamenode("ns1", null).getFileSystem();
7486
routerContext = cluster.getRandomRouter();
7587
Router router = routerContext.getRouter();
7688
routerProtocol = routerContext.getClient().getNamenode();
@@ -144,4 +156,113 @@ public void testGetFileInfoWithoutSubMountPoint() throws Exception {
144156
LambdaTestUtils.intercept(RouterResolveException.class,
145157
() -> routerContext.getRouter().getRpcServer().getFileInfo("/testdir2"));
146158
}
159+
160+
/**
161+
* Verify that RBF that disable default nameservice should support
162+
* get information about ancestor mount points.
163+
*/
164+
@Test
165+
public void testGetContentSummaryWithSubMountPoint() throws IOException {
166+
MountTable addEntry = MountTable.newInstance("/testdir/1/2",
167+
Collections.singletonMap("ns0", "/testdir/1/2"));
168+
assertTrue(addMountTable(addEntry));
169+
170+
try {
171+
writeData(nnFs0, new Path("/testdir/1/2/3"), 10 * 1024 * 1024);
172+
173+
RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer();
174+
ContentSummary summaryFromRBF = routerRpcServer.getContentSummary("/testdir");
175+
assertNotNull(summaryFromRBF);
176+
assertEquals(1, summaryFromRBF.getFileCount());
177+
assertEquals(10 * 1024 * 1024, summaryFromRBF.getLength());
178+
} finally {
179+
nnFs0.delete(new Path("/testdir"), true);
180+
}
181+
}
182+
183+
@Test
184+
public void testGetAllLocations() throws IOException {
185+
// Add mount table entry.
186+
MountTable addEntry = MountTable.newInstance("/testA",
187+
Collections.singletonMap("ns0", "/testA"));
188+
assertTrue(addMountTable(addEntry));
189+
addEntry = MountTable.newInstance("/testA/testB",
190+
Collections.singletonMap("ns1", "/testA/testB"));
191+
assertTrue(addMountTable(addEntry));
192+
addEntry = MountTable.newInstance("/testA/testB/testC",
193+
Collections.singletonMap("ns2", "/testA/testB/testC"));
194+
assertTrue(addMountTable(addEntry));
195+
196+
RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule();
197+
Map<String, List<RemoteLocation>> locations = protocol.getAllLocations("/testA");
198+
assertEquals(3, locations.size());
199+
}
200+
201+
@Test
202+
public void testGetLocationsForContentSummary() throws Exception {
203+
// Add mount table entry.
204+
MountTable addEntry = MountTable.newInstance("/testA/testB",
205+
Collections.singletonMap("ns0", "/testA/testB"));
206+
assertTrue(addMountTable(addEntry));
207+
addEntry = MountTable.newInstance("/testA/testB/testC",
208+
Collections.singletonMap("ns1", "/testA/testB/testC"));
209+
assertTrue(addMountTable(addEntry));
210+
211+
RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule();
212+
List<RemoteLocation> locations = protocol.getLocationsForContentSummary("/testA");
213+
assertEquals(2, locations.size());
214+
215+
for (RemoteLocation location : locations) {
216+
String nsId = location.getNameserviceId();
217+
if ("ns0".equals(nsId)) {
218+
assertEquals("/testA/testB", location.getDest());
219+
} else if ("ns1".equals(nsId)) {
220+
assertEquals("/testA/testB/testC", location.getDest());
221+
} else {
222+
fail("Unexpected NS " + nsId);
223+
}
224+
}
225+
226+
LambdaTestUtils.intercept(NoLocationException.class,
227+
() -> protocol.getLocationsForContentSummary("/testB"));
228+
}
229+
230+
@Test
231+
public void testGetContentSummary() throws Exception {
232+
try {
233+
// Add mount table entry.
234+
MountTable addEntry = MountTable.newInstance("/testA",
235+
Collections.singletonMap("ns0", "/testA"));
236+
assertTrue(addMountTable(addEntry));
237+
addEntry = MountTable.newInstance("/testA/testB",
238+
Collections.singletonMap("ns0", "/testA/testB"));
239+
assertTrue(addMountTable(addEntry));
240+
addEntry = MountTable.newInstance("/testA/testB/testC",
241+
Collections.singletonMap("ns1", "/testA/testB/testC"));
242+
assertTrue(addMountTable(addEntry));
243+
244+
writeData(nnFs0, new Path("/testA/testB/file1"), 1024 * 1024);
245+
writeData(nnFs1, new Path("/testA/testB/testC/file2"), 1024 * 1024);
246+
writeData(nnFs1, new Path("/testA/testB/testC/file3"), 1024 * 1024);
247+
248+
RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer();
249+
ContentSummary summary = routerRpcServer.getContentSummary("/testA");
250+
assertEquals(3, summary.getFileCount());
251+
assertEquals(1024 * 1024 * 3, summary.getLength());
252+
253+
LambdaTestUtils.intercept(NoLocationException.class,
254+
() -> routerRpcServer.getContentSummary("/testB"));
255+
} finally {
256+
nnFs0.delete(new Path("/testA"), true);
257+
nnFs1.delete(new Path("/testA"), true);
258+
}
259+
}
260+
261+
void writeData(FileSystem fs, Path path, int fileLength) throws IOException {
262+
try (FSDataOutputStream outputStream = fs.create(path)) {
263+
for (int writeSize = 0; writeSize < fileLength; writeSize++) {
264+
outputStream.write(writeSize);
265+
}
266+
}
267+
}
147268
}

0 commit comments

Comments
 (0)