8585import org .apache .hadoop .hdfs .server .federation .resolver .MountTableResolver ;
8686import org .apache .hadoop .hdfs .server .federation .resolver .RemoteLocation ;
8787import org .apache .hadoop .hdfs .server .federation .resolver .RouterResolveException ;
88+ import org .apache .hadoop .hdfs .server .federation .router .async .AsyncErasureCoding ;
89+ import org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncCacheAdmin ;
90+ import org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncSnapshot ;
91+ import org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncStoragePolicy ;
8892import org .apache .hadoop .hdfs .server .federation .router .security .RouterSecurityManager ;
8993import org .apache .hadoop .hdfs .server .federation .store .records .MountTable ;
9094import org .apache .hadoop .hdfs .server .namenode .NameNode ;
@@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol {
166170 /** Router security manager to handle token operations. */
167171 private RouterSecurityManager securityManager = null ;
168172
169- RouterClientProtocol (Configuration conf , RouterRpcServer rpcServer ) {
173+ public RouterClientProtocol (Configuration conf , RouterRpcServer rpcServer ) {
170174 this .rpcServer = rpcServer ;
171175 this .rpcClient = rpcServer .getRPCClient ();
172176 this .subclusterResolver = rpcServer .getSubclusterResolver ();
@@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol {
194198 this .superGroup = conf .get (
195199 DFSConfigKeys .DFS_PERMISSIONS_SUPERUSERGROUP_KEY ,
196200 DFSConfigKeys .DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT );
197- this .erasureCoding = new ErasureCoding (rpcServer );
198- this .storagePolicy = new RouterStoragePolicy (rpcServer );
199- this .snapshotProto = new RouterSnapshot (rpcServer );
200- this .routerCacheAdmin = new RouterCacheAdmin (rpcServer );
201+ if (rpcServer .isAsync ()) {
202+ this .erasureCoding = new AsyncErasureCoding (rpcServer );
203+ this .storagePolicy = new RouterAsyncStoragePolicy (rpcServer );
204+ this .snapshotProto = new RouterAsyncSnapshot (rpcServer );
205+ this .routerCacheAdmin = new RouterAsyncCacheAdmin (rpcServer );
206+ } else {
207+ this .erasureCoding = new ErasureCoding (rpcServer );
208+ this .storagePolicy = new RouterStoragePolicy (rpcServer );
209+ this .snapshotProto = new RouterSnapshot (rpcServer );
210+ this .routerCacheAdmin = new RouterCacheAdmin (rpcServer );
211+ }
201212 this .securityManager = rpcServer .getRouterSecurityManager ();
202213 this .rbfRename = new RouterFederationRename (rpcServer , conf );
203214 this .defaultNameServiceEnabled = conf .getBoolean (
@@ -347,7 +358,7 @@ protected static boolean isUnavailableSubclusterException(
347358 * @throws IOException If this path is not fault tolerant or the exception
348359 * should not be retried (e.g., NSQuotaExceededException).
349360 */
350- private List <RemoteLocation > checkFaultTolerantRetry (
361+ protected List <RemoteLocation > checkFaultTolerantRetry (
351362 final RemoteMethod method , final String src , final IOException ioe ,
352363 final RemoteLocation excludeLoc , final List <RemoteLocation > locations )
353364 throws IOException {
@@ -820,7 +831,7 @@ public void renewLease(String clientName, List<String> namespaces)
820831 /**
821832 * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
822833 */
823- private static class GetListingComparator
834+ protected static class GetListingComparator
824835 implements Comparator <byte []>, Serializable {
825836 @ Override
826837 public int compare (byte [] o1 , byte [] o2 ) {
@@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) {
831842 private static GetListingComparator comparator =
832843 new GetListingComparator ();
833844
845+ public static GetListingComparator getComparator () {
846+ return comparator ;
847+ }
848+
834849 @ Override
835850 public DirectoryListing getListing (String src , byte [] startAfter ,
836851 boolean needLocation ) throws IOException {
@@ -1104,7 +1119,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
11041119 return mergeDtanodeStorageReport (dnSubcluster );
11051120 }
11061121
1107- private DatanodeStorageReport [] mergeDtanodeStorageReport (
1122+ protected DatanodeStorageReport [] mergeDtanodeStorageReport (
11081123 Map <String , DatanodeStorageReport []> dnSubcluster ) {
11091124 // Avoid repeating machines in multiple subclusters
11101125 Map <String , DatanodeStorageReport > datanodesMap = new LinkedHashMap <>();
@@ -1335,20 +1350,23 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
13351350 }
13361351
13371352 /**
1338- * Get all the locations of the path for {@link this #getContentSummary(String)}.
1353+ * Get all the locations of the path for {@link RouterClientProtocol #getContentSummary(String)}.
13391354 * For example, there are some mount points:
1340- * /a -> ns0 -> /a
1341- * /a/b -> ns0 -> /a/b
1342- * /a/b/c -> ns1 -> /a/b/c
1355+ * <p>
1356+ * /a - [ns0 - /a]
1357+ * /a/b - [ns0 - /a/b]
1358+ * /a/b/c - [ns1 - /a/b/c]
1359+ * </p>
13431360 * When the path is '/a', the result of locations should be
13441361 * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
13451362 * When the path is '/b', will throw NoLocationException.
1363+ *
13461364 * @param path the path to get content summary
13471365 * @return one list contains all the remote location
1348- * @throws IOException
1366+ * @throws IOException if an I/O error occurs
13491367 */
13501368 @ VisibleForTesting
1351- List <RemoteLocation > getLocationsForContentSummary (String path ) throws IOException {
1369+ protected List <RemoteLocation > getLocationsForContentSummary (String path ) throws IOException {
13521370 // Try to get all the locations of the path.
13531371 final Map <String , List <RemoteLocation >> ns2Locations = getAllLocations (path );
13541372 if (ns2Locations .isEmpty ()) {
@@ -2039,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
20392057 * replacement value.
20402058 * @throws IOException If the dst paths could not be determined.
20412059 */
2042- private RemoteParam getRenameDestinations (
2060+ protected RemoteParam getRenameDestinations (
20432061 final List <RemoteLocation > srcLocations ,
20442062 final List <RemoteLocation > dstLocations ) throws IOException {
20452063
@@ -2087,7 +2105,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
20872105 * @param summaries Collection of individual summaries.
20882106 * @return Aggregated content summary.
20892107 */
2090- private ContentSummary aggregateContentSummary (
2108+ protected ContentSummary aggregateContentSummary (
20912109 Collection <ContentSummary > summaries ) {
20922110 if (summaries .size () == 1 ) {
20932111 return summaries .iterator ().next ();
@@ -2142,7 +2160,7 @@ private ContentSummary aggregateContentSummary(
21422160 * everywhere.
21432161 * @throws IOException If all the locations throw an exception.
21442162 */
2145- private HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
2163+ protected HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
21462164 final RemoteMethod method ) throws IOException {
21472165 return getFileInfoAll (locations , method , -1 );
21482166 }
@@ -2157,7 +2175,7 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21572175 * everywhere.
21582176 * @throws IOException If all the locations throw an exception.
21592177 */
2160- private HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
2178+ protected HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
21612179 final RemoteMethod method , long timeOutMs ) throws IOException {
21622180
21632181 // Get the file info from everybody
@@ -2186,12 +2204,11 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21862204
21872205 /**
21882206 * Get the permissions for the parent of a child with given permissions.
2189- * Add implicit u+wx permission for parent. This is based on
2190- * @{FSDirMkdirOp#addImplicitUwx}.
2207+ * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
21912208 * @param mask The permission mask of the child.
21922209 * @return The permission mask of the parent.
21932210 */
2194- private static FsPermission getParentPermission (final FsPermission mask ) {
2211+ protected static FsPermission getParentPermission (final FsPermission mask ) {
21952212 FsPermission ret = new FsPermission (
21962213 mask .getUserAction ().or (FsAction .WRITE_EXECUTE ),
21972214 mask .getGroupAction (),
@@ -2208,7 +2225,7 @@ private static FsPermission getParentPermission(final FsPermission mask) {
22082225 * @return New HDFS file status representing a mount point.
22092226 */
22102227 @ VisibleForTesting
2211- HdfsFileStatus getMountPointStatus (
2228+ protected HdfsFileStatus getMountPointStatus (
22122229 String name , int childrenNum , long date ) {
22132230 return getMountPointStatus (name , childrenNum , date , true );
22142231 }
@@ -2223,7 +2240,7 @@ HdfsFileStatus getMountPointStatus(
22232240 * @return New HDFS file status representing a mount point.
22242241 */
22252242 @ VisibleForTesting
2226- HdfsFileStatus getMountPointStatus (
2243+ protected HdfsFileStatus getMountPointStatus (
22272244 String name , int childrenNum , long date , boolean setPath ) {
22282245 long modTime = date ;
22292246 long accessTime = date ;
@@ -2300,7 +2317,7 @@ HdfsFileStatus getMountPointStatus(
23002317 * @param path Name of the path to start checking dates from.
23012318 * @return Map with the modification dates for all sub-entries.
23022319 */
2303- private Map <String , Long > getMountPointDates (String path ) {
2320+ protected Map <String , Long > getMountPointDates (String path ) {
23042321 Map <String , Long > ret = new TreeMap <>();
23052322 if (subclusterResolver instanceof MountTableResolver ) {
23062323 try {
@@ -2361,9 +2378,15 @@ private long getModifiedTime(Map<String, Long> ret, String path,
23612378 }
23622379
23632380 /**
2364- * Get listing on remote locations.
2381+ * Get a partial listing of the indicated directory.
2382+ *
2383+ * @param src the directory name
2384+ * @param startAfter the name to start after
2385+ * @param needLocation if blockLocations need to be returned
2386+ * @return a partial listing starting after startAfter
2387+ * @throws IOException if other I/O error occurred
23652388 */
2366- private List <RemoteResult <RemoteLocation , DirectoryListing >> getListingInt (
2389+ protected List <RemoteResult <RemoteLocation , DirectoryListing >> getListingInt (
23672390 String src , byte [] startAfter , boolean needLocation ) throws IOException {
23682391 try {
23692392 List <RemoteLocation > locations =
@@ -2400,9 +2423,9 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
24002423 * @param startAfter starting listing from client, used to define listing
24012424 * start boundary
24022425 * @param remainingEntries how many entries left from subcluster
2403- * @return
2426+ * @return true if should add mount point, otherwise false;
24042427 */
2405- private static boolean shouldAddMountPoint (
2428+ protected static boolean shouldAddMountPoint (
24062429 byte [] mountPoint , byte [] lastEntry , byte [] startAfter ,
24072430 int remainingEntries ) {
24082431 if (comparator .compare (mountPoint , startAfter ) > 0 &&
@@ -2425,7 +2448,7 @@ private static boolean shouldAddMountPoint(
24252448 * @throws IOException if unable to get the file status.
24262449 */
24272450 @ VisibleForTesting
2428- boolean isMultiDestDirectory (String src ) throws IOException {
2451+ protected boolean isMultiDestDirectory (String src ) throws IOException {
24292452 try {
24302453 if (rpcServer .isPathAll (src )) {
24312454 List <RemoteLocation > locations ;
@@ -2449,4 +2472,56 @@ boolean isMultiDestDirectory(String src) throws IOException {
24492472 public int getRouterFederationRenameCount () {
24502473 return rbfRename .getRouterFederationRenameCount ();
24512474 }
2475+
2476+ public RouterRpcServer getRpcServer () {
2477+ return rpcServer ;
2478+ }
2479+
2480+ public RouterRpcClient getRpcClient () {
2481+ return rpcClient ;
2482+ }
2483+
2484+ public FileSubclusterResolver getSubclusterResolver () {
2485+ return subclusterResolver ;
2486+ }
2487+
2488+ public ActiveNamenodeResolver getNamenodeResolver () {
2489+ return namenodeResolver ;
2490+ }
2491+
2492+ public long getServerDefaultsLastUpdate () {
2493+ return serverDefaultsLastUpdate ;
2494+ }
2495+
2496+ public long getServerDefaultsValidityPeriod () {
2497+ return serverDefaultsValidityPeriod ;
2498+ }
2499+
2500+ public boolean isAllowPartialList () {
2501+ return allowPartialList ;
2502+ }
2503+
2504+ public long getMountStatusTimeOut () {
2505+ return mountStatusTimeOut ;
2506+ }
2507+
2508+ public String getSuperUser () {
2509+ return superUser ;
2510+ }
2511+
2512+ public String getSuperGroup () {
2513+ return superGroup ;
2514+ }
2515+
2516+ public RouterStoragePolicy getStoragePolicy () {
2517+ return storagePolicy ;
2518+ }
2519+
2520+ public void setServerDefaultsLastUpdate (long serverDefaultsLastUpdate ) {
2521+ this .serverDefaultsLastUpdate = serverDefaultsLastUpdate ;
2522+ }
2523+
2524+ public RouterFederationRename getRbfRename () {
2525+ return rbfRename ;
2526+ }
24522527}
0 commit comments