Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
Expand Down Expand Up @@ -170,7 +166,7 @@ public class RouterClientProtocol implements ClientProtocol {
/** Router security manager to handle token operations. */
private RouterSecurityManager securityManager = null;

public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
this.rpcClient = rpcServer.getRPCClient();
this.subclusterResolver = rpcServer.getSubclusterResolver();
Expand Down Expand Up @@ -198,17 +194,10 @@ public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
this.superGroup = conf.get(
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
if (rpcServer.isAsync()) {
this.erasureCoding = new AsyncErasureCoding(rpcServer);
this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
} else {
this.erasureCoding = new ErasureCoding(rpcServer);
this.storagePolicy = new RouterStoragePolicy(rpcServer);
this.snapshotProto = new RouterSnapshot(rpcServer);
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
}
this.erasureCoding = new ErasureCoding(rpcServer);
this.storagePolicy = new RouterStoragePolicy(rpcServer);
this.snapshotProto = new RouterSnapshot(rpcServer);
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
this.securityManager = rpcServer.getRouterSecurityManager();
this.rbfRename = new RouterFederationRename(rpcServer, conf);
this.defaultNameServiceEnabled = conf.getBoolean(
Expand Down Expand Up @@ -358,7 +347,7 @@ protected static boolean isUnavailableSubclusterException(
* @throws IOException If this path is not fault tolerant or the exception
* should not be retried (e.g., NSQuotaExceededException).
*/
protected List<RemoteLocation> checkFaultTolerantRetry(
private List<RemoteLocation> checkFaultTolerantRetry(
final RemoteMethod method, final String src, final IOException ioe,
final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
throws IOException {
Expand Down Expand Up @@ -831,7 +820,7 @@ public void renewLease(String clientName, List<String> namespaces)
/**
* For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
*/
protected static class GetListingComparator
private static class GetListingComparator
implements Comparator<byte[]>, Serializable {
@Override
public int compare(byte[] o1, byte[] o2) {
Expand All @@ -842,10 +831,6 @@ public int compare(byte[] o1, byte[] o2) {
private static GetListingComparator comparator =
new GetListingComparator();

public static GetListingComparator getComparator() {
return comparator;
}

@Override
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
Expand Down Expand Up @@ -1119,7 +1104,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
return mergeDtanodeStorageReport(dnSubcluster);
}

protected DatanodeStorageReport[] mergeDtanodeStorageReport(
private DatanodeStorageReport[] mergeDtanodeStorageReport(
Map<String, DatanodeStorageReport[]> dnSubcluster) {
// Avoid repeating machines in multiple subclusters
Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -1350,23 +1335,20 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
}

/**
* Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}.
* Get all the locations of the path for {@link this#getContentSummary(String)}.
* For example, there are some mount points:
* <p>
* /a - [ns0 - /a]
* /a/b - [ns0 - /a/b]
* /a/b/c - [ns1 - /a/b/c]
* </p>
* /a -> ns0 -> /a
* /a/b -> ns0 -> /a/b
* /a/b/c -> ns1 -> /a/b/c
* When the path is '/a', the result of locations should be
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
* When the path is '/b', will throw NoLocationException.
*
* @param path the path to get content summary
* @return one list contains all the remote location
* @throws IOException if an I/O error occurs
* @throws IOException
*/
@VisibleForTesting
protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
// Try to get all the locations of the path.
final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
if (ns2Locations.isEmpty()) {
Expand Down Expand Up @@ -2057,7 +2039,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
* replacement value.
* @throws IOException If the dst paths could not be determined.
*/
protected RemoteParam getRenameDestinations(
private RemoteParam getRenameDestinations(
final List<RemoteLocation> srcLocations,
final List<RemoteLocation> dstLocations) throws IOException {

Expand Down Expand Up @@ -2105,7 +2087,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
* @param summaries Collection of individual summaries.
* @return Aggregated content summary.
*/
protected ContentSummary aggregateContentSummary(
private ContentSummary aggregateContentSummary(
Collection<ContentSummary> summaries) {
if (summaries.size() == 1) {
return summaries.iterator().next();
Expand Down Expand Up @@ -2160,7 +2142,7 @@ protected ContentSummary aggregateContentSummary(
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method) throws IOException {
return getFileInfoAll(locations, method, -1);
}
Expand All @@ -2175,7 +2157,7 @@ protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {

// Get the file info from everybody
Expand Down Expand Up @@ -2204,11 +2186,12 @@ protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,

/**
* Get the permissions for the parent of a child with given permissions.
* Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
* Add implicit u+wx permission for parent. This is based on
* @{FSDirMkdirOp#addImplicitUwx}.
* @param mask The permission mask of the child.
* @return The permission mask of the parent.
*/
protected static FsPermission getParentPermission(final FsPermission mask) {
private static FsPermission getParentPermission(final FsPermission mask) {
FsPermission ret = new FsPermission(
mask.getUserAction().or(FsAction.WRITE_EXECUTE),
mask.getGroupAction(),
Expand All @@ -2225,7 +2208,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date) {
return getMountPointStatus(name, childrenNum, date, true);
}
Expand All @@ -2240,7 +2223,7 @@ protected HdfsFileStatus getMountPointStatus(
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date, boolean setPath) {
long modTime = date;
long accessTime = date;
Expand Down Expand Up @@ -2317,7 +2300,7 @@ protected HdfsFileStatus getMountPointStatus(
* @param path Name of the path to start checking dates from.
* @return Map with the modification dates for all sub-entries.
*/
protected Map<String, Long> getMountPointDates(String path) {
private Map<String, Long> getMountPointDates(String path) {
Map<String, Long> ret = new TreeMap<>();
if (subclusterResolver instanceof MountTableResolver) {
try {
Expand Down Expand Up @@ -2378,15 +2361,9 @@ private long getModifiedTime(Map<String, Long> ret, String path,
}

/**
* Get a partial listing of the indicated directory.
*
* @param src the directory name
* @param startAfter the name to start after
* @param needLocation if blockLocations need to be returned
* @return a partial listing starting after startAfter
* @throws IOException if other I/O error occurred
* Get listing on remote locations.
*/
protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
String src, byte[] startAfter, boolean needLocation) throws IOException {
try {
List<RemoteLocation> locations =
Expand Down Expand Up @@ -2423,9 +2400,9 @@ protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
* @param startAfter starting listing from client, used to define listing
* start boundary
* @param remainingEntries how many entries left from subcluster
* @return true if should add mount point, otherwise false;
* @return
*/
protected static boolean shouldAddMountPoint(
private static boolean shouldAddMountPoint(
byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
int remainingEntries) {
if (comparator.compare(mountPoint, startAfter) > 0 &&
Expand All @@ -2448,7 +2425,7 @@ protected static boolean shouldAddMountPoint(
* @throws IOException if unable to get the file status.
*/
@VisibleForTesting
protected boolean isMultiDestDirectory(String src) throws IOException {
boolean isMultiDestDirectory(String src) throws IOException {
try {
if (rpcServer.isPathAll(src)) {
List<RemoteLocation> locations;
Expand All @@ -2472,56 +2449,4 @@ protected boolean isMultiDestDirectory(String src) throws IOException {
public int getRouterFederationRenameCount() {
return rbfRename.getRouterFederationRenameCount();
}

public RouterRpcServer getRpcServer() {
return rpcServer;
}

public RouterRpcClient getRpcClient() {
return rpcClient;
}

public FileSubclusterResolver getSubclusterResolver() {
return subclusterResolver;
}

public ActiveNamenodeResolver getNamenodeResolver() {
return namenodeResolver;
}

public long getServerDefaultsLastUpdate() {
return serverDefaultsLastUpdate;
}

public long getServerDefaultsValidityPeriod() {
return serverDefaultsValidityPeriod;
}

public boolean isAllowPartialList() {
return allowPartialList;
}

public long getMountStatusTimeOut() {
return mountStatusTimeOut;
}

public String getSuperUser() {
return superUser;
}

public String getSuperGroup() {
return superGroup;
}

public RouterStoragePolicy getStoragePolicy() {
return storagePolicy;
}

public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
}

public RouterFederationRename getRbfRename() {
return rbfRename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
* @throws IOException if rename fails.
* @return true if rename succeeds.
*/
public boolean routerFedRename(final String src, final String dst,
boolean routerFedRename(final String src, final String dst,
final List<RemoteLocation> srcLocations,
final List<RemoteLocation> dstLocations) throws IOException {
if (!rpcServer.isEnableRenameAcrossNamespace()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ protected boolean isObserverReadEligible(String nsId, Method method) {
* @param nsId namespaceID
* @return whether the 'namespace' has observer reads enabled.
*/
public boolean isNamespaceObserverReadEligible(String nsId) {
boolean isNamespaceObserverReadEligible(String nsId) {
return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
Expand Down Expand Up @@ -292,7 +288,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
* @param fileResolver File resolver to resolve file paths to subclusters.
* @throws IOException If the RPC server could not be created.
*/
@SuppressWarnings("checkstyle:MethodLength")
public RouterRpcServer(Configuration conf, Router router,
ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
throws IOException {
Expand Down Expand Up @@ -429,19 +424,14 @@ public RouterRpcServer(Configuration conf, Router router,
if (this.enableAsync) {
this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
this.clientProto = new RouterAsyncClientProtocol(conf, this);
this.nnProto = new RouterAsyncNamenodeProtocol(this);
this.routerProto = new RouterAsyncUserProtocol(this);
this.quotaCall = new AsyncQuota(this.router, this);
} else {
this.rpcClient = new RouterRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
this.clientProto = new RouterClientProtocol(conf, this);
this.nnProto = new RouterNamenodeProtocol(this);
this.routerProto = new RouterUserProtocol(this);
this.quotaCall = new Quota(this.router, this);
}

this.nnProto = new RouterNamenodeProtocol(this);
this.quotaCall = new Quota(this.router, this);
this.clientProto = new RouterClientProtocol(conf, this);
this.routerProto = new RouterUserProtocol(this);
long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -2203,7 +2193,7 @@ public FederationRPCMetrics getRPCMetrics() {
* @param path Path to check.
* @return If a path should be in all subclusters.
*/
public boolean isPathAll(final String path) {
boolean isPathAll(final String path) {
MountTable entry = getMountTable(path);
return entry != null && entry.isAll();
}
Expand Down
Loading
Loading