Skip to content

Commit 5b3d751

Browse files
committed
RouterAsyncClientProtocol
1 parent 70cac02 commit 5b3d751

File tree

3 files changed

+387
-412
lines changed

3 files changed

+387
-412
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@
8585
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
8686
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
8787
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
88+
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
89+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
90+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
91+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
8892
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
8993
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
9094
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -194,10 +198,17 @@ protected RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
194198
this.superGroup = conf.get(
195199
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
196200
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
197-
this.erasureCoding = new ErasureCoding(rpcServer);
198-
this.storagePolicy = new RouterStoragePolicy(rpcServer);
199-
this.snapshotProto = new RouterSnapshot(rpcServer);
200-
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
201+
if (rpcServer.isAsync()) {
202+
this.erasureCoding = new AsyncErasureCoding(rpcServer);
203+
this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
204+
this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
205+
this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
206+
} else {
207+
this.erasureCoding = new ErasureCoding(rpcServer);
208+
this.storagePolicy = new RouterStoragePolicy(rpcServer);
209+
this.snapshotProto = new RouterSnapshot(rpcServer);
210+
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
211+
}
201212
this.securityManager = rpcServer.getRouterSecurityManager();
202213
this.rbfRename = new RouterFederationRename(rpcServer, conf);
203214
this.defaultNameServiceEnabled = conf.getBoolean(
@@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) {
831842
private static GetListingComparator comparator =
832843
new GetListingComparator();
833844

845+
public static GetListingComparator getComparator() {
846+
return comparator;
847+
}
848+
834849
@Override
835850
public DirectoryListing getListing(String src, byte[] startAfter,
836851
boolean needLocation) throws IOException {
@@ -1338,9 +1353,9 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
13381353
* Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}.
13391354
* For example, there are some mount points:
13401355
* <p>
1341-
* /a -&gt ns0 -&gt /a
1342-
* /a/b -&gt ns0 -&gt /a/b
1343-
* /a/b/c -&gt ns1 -&gt /a/b/c
1356+
* /a - &gt ns0 - &gt /a
1357+
* /a/b - &gt ns0 - &gt /a/b
1358+
* /a/b/c - &gt ns1 - &gt /a/b/c
13441359
* </p>
13451360
* When the path is '/a', the result of locations should be
13461361
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
@@ -2042,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
20422057
* replacement value.
20432058
* @throws IOException If the dst paths could not be determined.
20442059
*/
2045-
private RemoteParam getRenameDestinations(
2060+
protected RemoteParam getRenameDestinations(
20462061
final List<RemoteLocation> srcLocations,
20472062
final List<RemoteLocation> dstLocations) throws IOException {
20482063

@@ -2210,7 +2225,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) {
22102225
* @return New HDFS file status representing a mount point.
22112226
*/
22122227
@VisibleForTesting
2213-
HdfsFileStatus getMountPointStatus(
2228+
protected HdfsFileStatus getMountPointStatus(
22142229
String name, int childrenNum, long date) {
22152230
return getMountPointStatus(name, childrenNum, date, true);
22162231
}
@@ -2433,7 +2448,7 @@ protected static boolean shouldAddMountPoint(
24332448
* @throws IOException if unable to get the file status.
24342449
*/
24352450
@VisibleForTesting
2436-
boolean isMultiDestDirectory(String src) throws IOException {
2451+
protected boolean isMultiDestDirectory(String src) throws IOException {
24372452
try {
24382453
if (rpcServer.isPathAll(src)) {
24392454
List<RemoteLocation> locations;
@@ -2502,11 +2517,11 @@ public RouterStoragePolicy getStoragePolicy() {
25022517
return storagePolicy;
25032518
}
25042519

2505-
public void setServerDefaults(FsServerDefaults serverDefaults) {
2506-
this.serverDefaults = serverDefaults;
2507-
}
2508-
25092520
public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
25102521
this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
25112522
}
2523+
2524+
public RouterFederationRename getRbfRename() {
2525+
return rbfRename;
2526+
}
25122527
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
9393
* @throws IOException if rename fails.
9494
* @return true if rename succeeds.
9595
*/
96-
boolean routerFedRename(final String src, final String dst,
97-
final List<RemoteLocation> srcLocations,
98-
final List<RemoteLocation> dstLocations) throws IOException {
96+
public boolean routerFedRename(final String src, final String dst,
97+
final List<RemoteLocation> srcLocations,
98+
final List<RemoteLocation> dstLocations) throws IOException {
9999
if (!rpcServer.isEnableRenameAcrossNamespace()) {
100100
throw new IOException("Rename of " + src + " to " + dst
101101
+ " is not allowed, no eligible destination in the same namespace was"

0 commit comments

Comments
 (0)