-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract path resolution in DoraCacheFS into static utility #18140
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,7 @@ | |
import alluxio.wire.WorkerNetAddress; | ||
|
||
import com.codahale.metrics.Counter; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableList; | ||
import io.grpc.Status; | ||
import io.grpc.StatusRuntimeException; | ||
|
@@ -133,7 +134,7 @@ protected DoraCacheFileSystem(FileSystem fs, FileSystemContext context, | |
@Override | ||
public URIStatus getStatus(AlluxioURI path, GetStatusPOptions options) | ||
throws IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsFullPath = convertToUfsPath(path); | ||
LOG.debug("DoraCacheFileSystem getStatus for {}", ufsFullPath); | ||
if (!mMetadataCacheEnabled) { | ||
return mDelegatedFileSystem.getStatus(ufsFullPath, options); | ||
|
@@ -146,7 +147,7 @@ public URIStatus getStatus(AlluxioURI path, GetStatusPOptions options) | |
// convert to proto and then back to get a clone of the object | ||
// as it may be cached by a `MetadataCachingFileSystem`, while we need to mutate its fields | ||
FileInfo info = GrpcUtils.fromProto(GrpcUtils.toProto(status.getFileInfo())); | ||
info.setPath(convertUfsPathToAlluxioPath(new AlluxioURI(info.getUfsPath())).getPath()); | ||
info.setPath(convertToAlluxioPath(new AlluxioURI(info.getUfsPath())).getPath()); | ||
URIStatus statusWithRelativeAlluxioPath = new URIStatus(info, status.getCacheContext()); | ||
return statusWithRelativeAlluxioPath; | ||
} catch (RuntimeException ex) { | ||
|
@@ -243,7 +244,7 @@ public PositionReader openPositionRead(URIStatus status, OpenFilePOptions option | |
@Override | ||
public List<URIStatus> listStatus(AlluxioURI path, ListStatusPOptions options) | ||
throws FileDoesNotExistException, IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsFullPath = convertToUfsPath(path); | ||
ufsFullPath = new AlluxioURI(PathUtils.normalizePath(ufsFullPath.toString(), "/")); | ||
|
||
try { | ||
|
@@ -255,7 +256,7 @@ public List<URIStatus> listStatus(AlluxioURI path, ListStatusPOptions options) | |
List<URIStatus> statusesWithRelativePath = uriStatuses.stream() | ||
.map(status -> new URIStatus( | ||
GrpcUtils.fromProto(GrpcUtils.toProto(status.getFileInfo())) | ||
.setPath(convertUfsPathToAlluxioPath(new AlluxioURI(status.getUfsPath())) | ||
.setPath(convertToAlluxioPath(new AlluxioURI(status.getUfsPath())) | ||
.getPath()))) | ||
.collect(Collectors.toList()); | ||
return statusesWithRelativePath; | ||
|
@@ -279,7 +280,7 @@ public List<URIStatus> listStatus(AlluxioURI path, ListStatusPOptions options) | |
@Override | ||
public FileOutStream createFile(AlluxioURI alluxioPath, CreateFilePOptions options) | ||
throws FileAlreadyExistsException, InvalidPathException, IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(alluxioPath); | ||
AlluxioURI ufsFullPath = convertToUfsPath(alluxioPath); | ||
|
||
try { | ||
CreateFilePOptions mergedOptions = FileSystemOptionsUtils.createFileDefaults( | ||
|
@@ -323,7 +324,7 @@ public FileOutStream createFile(AlluxioURI alluxioPath, CreateFilePOptions optio | |
@Override | ||
public void createDirectory(AlluxioURI path, CreateDirectoryPOptions options) | ||
throws FileAlreadyExistsException, InvalidPathException, IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsFullPath = convertToUfsPath(path); | ||
try { | ||
CreateDirectoryPOptions mergedOptions = FileSystemOptionsUtils.createDirectoryDefaults( | ||
mFsContext.getClusterConf()).toBuilder().mergeFrom(options).build(); | ||
|
@@ -343,7 +344,7 @@ public void createDirectory(AlluxioURI path, CreateDirectoryPOptions options) | |
@Override | ||
public void delete(AlluxioURI path, DeletePOptions options) | ||
throws DirectoryNotEmptyException, FileDoesNotExistException, IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsFullPath = convertToUfsPath(path); | ||
|
||
try { | ||
DeletePOptions mergedOptions = FileSystemOptionsUtils.deleteDefaults( | ||
|
@@ -364,8 +365,8 @@ public void delete(AlluxioURI path, DeletePOptions options) | |
@Override | ||
public void rename(AlluxioURI src, AlluxioURI dst, RenamePOptions options) | ||
throws FileDoesNotExistException, IOException, AlluxioException { | ||
AlluxioURI srcUfsFullPath = convertAlluxioPathToUFSPath(src); | ||
AlluxioURI dstUfsFullPath = convertAlluxioPathToUFSPath(dst); | ||
AlluxioURI srcUfsFullPath = convertToUfsPath(src); | ||
AlluxioURI dstUfsFullPath = convertToUfsPath(dst); | ||
try { | ||
RenamePOptions mergedOptions = FileSystemOptionsUtils.renameDefaults( | ||
mFsContext.getClusterConf()).toBuilder().mergeFrom(options).build(); | ||
|
@@ -392,7 +393,7 @@ public void iterateStatus(AlluxioURI path, ListStatusPOptions options, | |
@Override | ||
public boolean exists(AlluxioURI path, ExistsPOptions options) | ||
throws InvalidPathException, IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsFullPath = convertToUfsPath(path); | ||
|
||
try { | ||
ExistsPOptions mergedOptions = FileSystemOptionsUtils.existsDefaults( | ||
|
@@ -413,7 +414,7 @@ public boolean exists(AlluxioURI path, ExistsPOptions options) | |
@Override | ||
public void setAttribute(AlluxioURI path, SetAttributePOptions options) | ||
throws FileDoesNotExistException, IOException, AlluxioException { | ||
AlluxioURI ufsFullPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsFullPath = convertToUfsPath(path); | ||
|
||
try { | ||
SetAttributePOptions mergedOptions = FileSystemOptionsUtils.setAttributeDefaults( | ||
|
@@ -433,73 +434,37 @@ public void setAttribute(AlluxioURI path, SetAttributePOptions options) | |
|
||
/** | ||
* Converts the Alluxio based path to UfsBaseFileSystem based path if needed. | ||
* <p> | ||
* UfsBaseFileSystem expects absolute/full file path. The Dora Worker | ||
* expects absolute/full file path, too. So we need to convert the input path from Alluxio | ||
* relative path to full UFS path if it is an Alluxio relative path. | ||
* We do this by checking if the path is leading with the UFS root. If the input path | ||
* is already considered to be UFS path, it should be leading a UFS path with appropriate scheme. | ||
* If local file system is used, please add "file://" scheme before the path. | ||
* | ||
* @param alluxioPath Alluxio based path | ||
* @return UfsBaseFileSystem based full path | ||
*/ | ||
public AlluxioURI convertAlluxioPathToUFSPath(AlluxioURI alluxioPath) { | ||
if (mDelegatedFileSystem instanceof UfsBaseFileSystem) { | ||
UfsBaseFileSystem under = (UfsBaseFileSystem) mDelegatedFileSystem; | ||
AlluxioURI rootUFS = under.getRootUFS(); | ||
try { | ||
if (rootUFS.isAncestorOf(alluxioPath)) { | ||
// Treat this path as a full UFS path. | ||
return alluxioPath; | ||
} | ||
} catch (InvalidPathException e) { | ||
LOG.error("Invalid path {}", alluxioPath); | ||
throw new RuntimeException(e); | ||
} | ||
|
||
// Treat this path as Alluxio relative, and add the UFS root before it. | ||
String ufsFullPath = PathUtils.concatPath(rootUFS, alluxioPath.toString()); | ||
if (alluxioPath.isRoot()) { | ||
ufsFullPath = ufsFullPath + AlluxioURI.SEPARATOR; | ||
} | ||
|
||
return new AlluxioURI(ufsFullPath); | ||
} else { | ||
return alluxioPath; | ||
} | ||
public AlluxioURI convertToUfsPath(AlluxioURI alluxioPath) { | ||
Preconditions.checkArgument(mDelegatedFileSystem instanceof UfsBaseFileSystem, | ||
"FileSystem is not UfsBaseFileSystem"); | ||
UfsBaseFileSystem under = (UfsBaseFileSystem) mDelegatedFileSystem; | ||
AlluxioURI rootUFS = under.getRootUFS(); | ||
return convertAlluxioPathToUfsPath(alluxioPath, rootUFS); | ||
} | ||
|
||
/** | ||
* Converts the UFS path back to Alluxio path. | ||
* <p> | ||
* This is the opposite operation to {@link #convertAlluxioPathToUFSPath(AlluxioURI)}. | ||
* This is the opposite operation to {@link #convertToUfsPath(AlluxioURI)}. | ||
* | ||
* @param ufsPath UfsBaseFileSystem based full path | ||
* @return an Alluxio path | ||
*/ | ||
public AlluxioURI convertUfsPathToAlluxioPath(AlluxioURI ufsPath) { | ||
if (mDelegatedFileSystem instanceof UfsBaseFileSystem) { | ||
AlluxioURI rootUfs = ((UfsBaseFileSystem) mDelegatedFileSystem).getRootUFS(); | ||
try { | ||
if (rootUfs.isAncestorOf(ufsPath)) { | ||
return new AlluxioURI(PathUtils.concatPath(AlluxioURI.SEPARATOR, | ||
PathUtils.subtractPaths(ufsPath.getPath(), rootUfs.getPath()))); | ||
} | ||
} catch (InvalidPathException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
return ufsPath; | ||
} else { | ||
return ufsPath; | ||
} | ||
public AlluxioURI convertToAlluxioPath(AlluxioURI ufsPath) { | ||
Preconditions.checkArgument(mDelegatedFileSystem instanceof UfsBaseFileSystem, | ||
"FileSystem is not UfsBaseFileSystem"); | ||
AlluxioURI rootUfs = ((UfsBaseFileSystem) mDelegatedFileSystem).getRootUFS(); | ||
return convertUfsPathToAlluxioPath(ufsPath, rootUfs); | ||
} | ||
|
||
@Override | ||
public List<BlockLocationInfo> getBlockLocations(AlluxioURI path) | ||
throws IOException, AlluxioException { | ||
AlluxioURI ufsPath = convertAlluxioPathToUFSPath(path); | ||
AlluxioURI ufsPath = convertToUfsPath(path); | ||
URIStatus status = mDoraClient.getStatus(ufsPath.toString(), | ||
FileSystemOptionsUtils.getStatusDefaults(mFsContext.getClusterConf())); | ||
return getBlockLocations(status); | ||
|
@@ -508,7 +473,7 @@ public List<BlockLocationInfo> getBlockLocations(AlluxioURI path) | |
@Override | ||
public List<BlockLocationInfo> getBlockLocations(URIStatus status) | ||
throws IOException, AlluxioException { | ||
AlluxioURI ufsPath = convertAlluxioPathToUFSPath(new AlluxioURI(status.getUfsPath())); | ||
AlluxioURI ufsPath = convertToUfsPath(new AlluxioURI(status.getUfsPath())); | ||
WorkerNetAddress workerNetAddress = mDoraClient.getWorkerNetAddress(ufsPath.toString()); | ||
// Dora does not have blocks; to apps who need block location info, we split multiple virtual | ||
// blocks from a file according to a fixed size | ||
|
@@ -537,4 +502,63 @@ public List<BlockLocationInfo> getBlockLocations(URIStatus status) | |
} | ||
return listBuilder.build(); | ||
} | ||
|
||
/** | ||
* Converts the Alluxio based path to UfsBaseFileSystem based path if needed. | ||
* This is a static utility intended for reuse. | ||
* <p> | ||
* UfsBaseFileSystem expects absolute/full file path. The Dora Worker | ||
* expects absolute/full file path, too. So we need to convert the input path from Alluxio | ||
* relative path to full UFS path if it is an Alluxio relative path. | ||
* We do this by checking if the path is leading with the UFS root. If the input path | ||
* is already considered to be UFS path, it should be leading a UFS path with appropriate scheme. | ||
* If local file system is used, please add "file://" scheme before the path. | ||
* | ||
* @param alluxioPath Alluxio based path | ||
* @param ufsRootPath the UFS root path to resolve against | ||
* @return UfsBaseFileSystem based full path | ||
*/ | ||
public static AlluxioURI convertAlluxioPathToUfsPath( | ||
AlluxioURI alluxioPath, AlluxioURI ufsRootPath) { | ||
try { | ||
if (ufsRootPath.isAncestorOf(alluxioPath)) { | ||
// Treat this path as a full UFS path. | ||
return alluxioPath; | ||
} | ||
} catch (InvalidPathException e) { | ||
LOG.error("Invalid path {}", alluxioPath); | ||
throw new RuntimeException(e); | ||
} | ||
|
||
// Treat this path as Alluxio relative, and add the UFS root before it. | ||
String ufsFullPath = PathUtils.concatPath(ufsRootPath, alluxioPath.toString()); | ||
if (alluxioPath.isRoot()) { | ||
ufsFullPath = ufsFullPath + AlluxioURI.SEPARATOR; | ||
} | ||
Comment on lines
+535
to
+537
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is this supposed to do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't know, this is a refactor |
||
|
||
return new AlluxioURI(ufsFullPath); | ||
} | ||
|
||
/** | ||
* Converts the UFS path back to Alluxio path. | ||
* This is a static utility intended for reuse. | ||
* <p> | ||
* This is the opposite operation to {@link #convertAlluxioPathToUfsPath(AlluxioURI, AlluxioURI)}. | ||
* | ||
* @param ufsPath UfsBaseFileSystem based full path | ||
* @param ufsRootPath the UFS root path to resolve against | ||
* @return an Alluxio path | ||
*/ | ||
public static AlluxioURI convertUfsPathToAlluxioPath(AlluxioURI ufsPath, AlluxioURI ufsRootPath) { | ||
try { | ||
if (ufsRootPath.isAncestorOf(ufsPath)) { | ||
return new AlluxioURI(PathUtils.concatPath(AlluxioURI.SEPARATOR, | ||
PathUtils.subtractPaths(ufsPath.getPath(), ufsRootPath.getPath()))); | ||
} | ||
} catch (InvalidPathException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
return ufsPath; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.