-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-16202. Enhance openFile() for better read performance against object stores #2584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c5f543c
068eb16
c0dfe72
97ebbef
756935f
1bc73e6
cf13d12
f1a68eb
2843391
c9c989e
8cc26f9
e7b29ef
98ebf76
bf8e1d4
60cb6b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> { | |
| */ | ||
| B opt(@Nonnull String key, float value); | ||
|
|
||
| /** | ||
| * Set optional long parameter for the Builder. | ||
| * | ||
| * @see #opt(String, String) | ||
| */ | ||
| B opt(@Nonnull String key, long value); | ||
|
|
||
|
||
| /** | ||
| * Set optional double parameter for the Builder. | ||
| * | ||
|
|
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> { | |
| */ | ||
| B must(@Nonnull String key, float value); | ||
|
|
||
| /** | ||
| * Set mandatory long option. | ||
| * | ||
| * @see #must(String, String) | ||
| */ | ||
| B must(@Nonnull String key, long value); | ||
|
|
||
| /** | ||
| * Set mandatory double option. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,6 +88,7 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; | ||
| import static org.apache.hadoop.util.Preconditions.checkArgument; | ||
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; | ||
| import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; | ||
|
|
@@ -4616,7 +4617,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions( | |
| final OpenFileParameters parameters) throws IOException { | ||
| AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( | ||
| parameters.getMandatoryKeys(), | ||
| Collections.emptySet(), | ||
| Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, | ||
| "for " + path); | ||
| return LambdaUtils.eval( | ||
| new CompletableFuture<>(), () -> | ||
|
|
@@ -4644,7 +4645,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions( | |
| final OpenFileParameters parameters) throws IOException { | ||
| AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( | ||
|
||
| parameters.getMandatoryKeys(), | ||
| Collections.emptySet(), ""); | ||
| Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, ""); | ||
| CompletableFuture<FSDataInputStream> result = new CompletableFuture<>(); | ||
| try { | ||
| result.complete(open(pathHandle, parameters.getBufferSize())); | ||
|
|
@@ -4751,9 +4752,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException { | |
| Optional<Path> optionalPath = getOptionalPath(); | ||
| OpenFileParameters parameters = new OpenFileParameters() | ||
| .withMandatoryKeys(getMandatoryKeys()) | ||
| .withOptionalKeys(getOptionalKeys()) | ||
| .withOptions(getOptions()) | ||
| .withBufferSize(getBufferSize()) | ||
| .withStatus(super.getStatus()); // explicit to avoid IDE warnings | ||
| .withStatus(super.getStatus()) | ||
| .withBufferSize( | ||
| getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize())); | ||
| if(optionalPath.isPresent()) { | ||
| return getFS().openFileWithOptions(optionalPath.get(), | ||
| parameters); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,11 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; | ||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; | ||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; | ||
| import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; | ||
|
|
||
| /** | ||
| * A collection of file-processing util methods | ||
| */ | ||
|
|
@@ -396,7 +401,32 @@ public static boolean copy(FileSystem srcFS, Path src, | |
| return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf); | ||
| } | ||
|
|
||
| /** Copy files between FileSystems. */ | ||
| /** | ||
| * Copy a file/directory tree within/between filesystems. | ||
| * <p></p> | ||
| * returns true if the operation succeeded. When deleteSource is true, | ||
| * this means "after the copy, delete(source) returned true" | ||
| * If the destination is a directory, and mkdirs (dest) fails, | ||
| * the operation will return false rather than raise any exception. | ||
| * <p></p> | ||
| * The overwrite flag is about overwriting files; it has no effect about | ||
| * handing an attempt to copy a file atop a directory (expect an IOException), | ||
| * or a directory over a path which contains a file (mkdir will fail, so | ||
| * "false"). | ||
| * <p></p> | ||
| * The operation is recursive, and the deleteSource operation takes place | ||
| * as each subdirectory is copied. Therefore, if an operation fails partway | ||
| * through, the source tree may be partially deleted. | ||
| * @param srcFS source filesystem | ||
| * @param srcStatus status of source | ||
| * @param dstFS destination filesystem | ||
| * @param dst path of source | ||
| * @param deleteSource delete the source? | ||
| * @param overwrite overwrite files at destination? | ||
| * @param conf configuration to use when opening files | ||
| * @return true if the operation succeeded. | ||
|
||
| * @throws IOException failure | ||
| */ | ||
| public static boolean copy(FileSystem srcFS, FileStatus srcStatus, | ||
| FileSystem dstFS, Path dst, | ||
| boolean deleteSource, | ||
|
|
@@ -409,22 +439,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus, | |
| if (!dstFS.mkdirs(dst)) { | ||
| return false; | ||
| } | ||
| FileStatus contents[] = srcFS.listStatus(src); | ||
| for (int i = 0; i < contents.length; i++) { | ||
| copy(srcFS, contents[i], dstFS, | ||
| new Path(dst, contents[i].getPath().getName()), | ||
| deleteSource, overwrite, conf); | ||
| RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src); | ||
| while (contents.hasNext()) { | ||
| FileStatus next = contents.next(); | ||
| copy(srcFS, next, dstFS, | ||
| new Path(dst, next.getPath().getName()), | ||
| deleteSource, overwrite, conf); | ||
| } | ||
| } else { | ||
| InputStream in=null; | ||
| InputStream in = null; | ||
| OutputStream out = null; | ||
| try { | ||
| in = srcFS.open(src); | ||
| in = awaitFuture(srcFS.openFile(src) | ||
| .opt(FS_OPTION_OPENFILE_READ_POLICY, | ||
| FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) | ||
| .opt(FS_OPTION_OPENFILE_LENGTH, | ||
| srcStatus.getLen()) // file length hint for object stores | ||
|
Comment on lines
+456
to
+457
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When should we use |
||
| .build()); | ||
| out = dstFS.create(dst, overwrite); | ||
| IOUtils.copyBytes(in, out, conf, true); | ||
| } catch (IOException e) { | ||
| IOUtils.closeStream(out); | ||
| IOUtils.closeStream(in); | ||
| IOUtils.cleanupWithLogger(LOG, in, out); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
@@ -503,7 +538,11 @@ private static boolean copy(FileSystem srcFS, FileStatus srcStatus, | |
| deleteSource, conf); | ||
| } | ||
| } else { | ||
| InputStream in = srcFS.open(src); | ||
| InputStream in = awaitFuture(srcFS.openFile(src) | ||
| .withFileStatus(srcStatus) | ||
| .opt(FS_OPTION_OPENFILE_READ_POLICY, | ||
| FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) | ||
| .build()); | ||
| IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf); | ||
| } | ||
| if (deleteSource) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, ChecksumFileSystem.openFileWithOptions has the same implementation as the base class, so you can remove this override.