Skip to content

Commit 700f998

Browse files
committed
HADOOP-16202. Enhance openFile()
Roll-up of the previous PR Change-Id: Ib0aec173afcd8aae33f52da3f99ac813bd38c32f HADOOP-16202. javadocs and style Change-Id: Id4294ac7034155a10be22fb4631edf43cbadc22b HADOOP-16202. openFile: read policies Change "fs.option.openfile.fadvise" to "fs.option.openfile.read.policy" and expand with "vectored", "parquet" and "orc", all of which map in s3a to random. The concept is that by choosing a read policy you can do more than just change seek policy -it could switch buffering, caching etc. Change-Id: I2147840f58fb54853c797d2cab5d668c3d1d2541 HADOOP-16202: documentation changes and IOStatistics of open operations * Thomas Marquardt's suggestions on the docs * standard action name for file opened * S3AInputStream measures the count and duration of this, and reports it Change-Id: I7feacf4eb4d6494bb93b3dfc05b060ad75e52c18
1 parent 88a550b commit 700f998

File tree

70 files changed

+2797
-475
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2797
-475
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

28+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
30+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
31+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
32+
2833
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
2934
@InterfaceAudience.Public
3035
@InterfaceStability.Stable
@@ -42,7 +47,13 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
4247
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
4348
FileStatus status = fc.getFileStatus(p);
4449
this.len = status.getLen();
45-
this.stream = fc.open(p);
50+
this.stream = awaitFuture(fc.openFile(p)
51+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
52+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
53+
.opt(FS_OPTION_OPENFILE_LENGTH,
54+
status.getLen()) // file length hint for object stores
55+
.build());
56+
fc.open(p);
4657
}
4758

4859
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.InputStream;
2525
import java.nio.channels.ClosedChannelException;
2626
import java.util.Arrays;
27-
import java.util.Collections;
2827
import java.util.EnumSet;
2928
import java.util.List;
3029
import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,7 @@
4544
import org.apache.hadoop.util.LambdaUtils;
4645
import org.apache.hadoop.util.Progressable;
4746

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
4848
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
4949
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
5050

@@ -889,7 +889,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
889889
final OpenFileParameters parameters) throws IOException {
890890
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
891891
parameters.getMandatoryKeys(),
892-
Collections.emptySet(),
892+
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
893893
"for " + path);
894894
return LambdaUtils.eval(
895895
new CompletableFuture<>(),

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
6161
*/
6262
B opt(@Nonnull String key, float value);
6363

64+
/**
65+
* Set optional long parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, long value);
70+
6471
/**
6572
* Set optional double parameter for the Builder.
6673
*
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
104111
*/
105112
B must(@Nonnull String key, float value);
106113

114+
/**
115+
* Set mandatory long option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, long value);
120+
107121
/**
108122
* Set mandatory double option.
109123
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@
7171
import org.slf4j.Logger;
7272
import org.slf4j.LoggerFactory;
7373

74+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
77+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
7478
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
79+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
7580

7681
/**
7782
* The FileContext class provides an interface for users of the Hadoop
@@ -2204,7 +2209,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
22042209
EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
22052210
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
22062211
EnumSet.of(CreateFlag.CREATE);
2207-
InputStream in = open(qSrc);
2212+
InputStream in = awaitFuture(openFile(qSrc)
2213+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
2214+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
2215+
.opt(FS_OPTION_OPENFILE_LENGTH,
2216+
fs.getLen()) // file length hint for object stores
2217+
.build());
22082218
try (OutputStream out = create(qDst, createFlag)) {
22092219
IOUtils.copyBytes(in, out, conf, true);
22102220
} finally {
@@ -2936,9 +2946,13 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
29362946
final Path absF = fixRelativePart(getPath());
29372947
OpenFileParameters parameters = new OpenFileParameters()
29382948
.withMandatoryKeys(getMandatoryKeys())
2949+
.withOptionalKeys(getOptionalKeys())
29392950
.withOptions(getOptions())
29402951
.withBufferSize(getBufferSize())
29412952
.withStatus(getStatus());
2953+
parameters.withBufferSize(
2954+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE,
2955+
getBufferSize()));
29422956
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29432957
@Override
29442958
public CompletableFuture<FSDataInputStream> next(

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.slf4j.Logger;
9090
import org.slf4j.LoggerFactory;
9191

92+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
9293
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
9394
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
9495
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -4617,7 +4618,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46174618
final OpenFileParameters parameters) throws IOException {
46184619
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46194620
parameters.getMandatoryKeys(),
4620-
Collections.emptySet(),
4621+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
46214622
"for " + path);
46224623
return LambdaUtils.eval(
46234624
new CompletableFuture<>(), () ->
@@ -4645,7 +4646,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46454646
final OpenFileParameters parameters) throws IOException {
46464647
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46474648
parameters.getMandatoryKeys(),
4648-
Collections.emptySet(), "");
4649+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
46494650
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
46504651
try {
46514652
result.complete(open(pathHandle, parameters.getBufferSize()));
@@ -4752,9 +4753,13 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
47524753
Optional<Path> optionalPath = getOptionalPath();
47534754
OpenFileParameters parameters = new OpenFileParameters()
47544755
.withMandatoryKeys(getMandatoryKeys())
4756+
.withOptionalKeys(getOptionalKeys())
47554757
.withOptions(getOptions())
4756-
.withBufferSize(getBufferSize())
47574758
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
4759+
// buffer size can be configured
4760+
parameters.withBufferSize(
4761+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE,
4762+
getBufferSize()));
47584763
if(optionalPath.isPresent()) {
47594764
return getFS().openFileWithOptions(optionalPath.get(),
47604765
parameters);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@
7171
import org.slf4j.Logger;
7272
import org.slf4j.LoggerFactory;
7373

74+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
77+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
78+
7479
/**
7580
* A collection of file-processing util methods
7681
*/
@@ -390,7 +395,32 @@ public static boolean copy(FileSystem srcFS, Path src,
390395
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
391396
}
392397

393-
/** Copy files between FileSystems. */
398+
/**
399+
* Copy a file/directory tree within/between filesystems.
400+
* <p></p>
401+
* returns true if the operation succeeded. When deleteSource is true,
402+
* this means "after the copy, delete(source) returned true"
403+
* If the destination is a directory, and mkdirs (dest) fails,
404+
* the operation will return false rather than raise any exception.
405+
* <p></p>
406+
* The overwrite flag is about overwriting files; it has no effect about
407+
* handing an attempt to copy a file atop a directory (expect an IOException),
408+
* or a directory over a path which contains a file (mkdir will fail, so
409+
* "false").
410+
* <p></p>
411+
* The operation is recursive, and the deleteSource operation takes place
412+
* as each subdirectory is copied. Therefore, if an operation fails partway
413+
* through, the source tree may be partially deleted.
414+
* @param srcFS source filesystem
415+
* @param srcStatus status of source
416+
* @param dstFS destination filesystem
417+
* @param dst path of source
418+
* @param deleteSource delete the source?
419+
* @param overwrite overwrite files at destination?
420+
* @param conf configuration to use when opening files
421+
* @return true if the operation succeeded.
422+
* @throws IOException failure
423+
*/
394424
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
395425
FileSystem dstFS, Path dst,
396426
boolean deleteSource,
@@ -409,22 +439,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
409439
if (!dstFS.mkdirs(dst)) {
410440
return false;
411441
}
412-
FileStatus contents[] = srcFS.listStatus(src);
413-
for (int i = 0; i < contents.length; i++) {
414-
copy(srcFS, contents[i], dstFS,
415-
new Path(dst, contents[i].getPath().getName()),
416-
deleteSource, overwrite, conf);
442+
RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
443+
while (contents.hasNext()) {
444+
FileStatus next = contents.next();
445+
copy(srcFS, next, dstFS,
446+
new Path(dst, next.getPath().getName()),
447+
deleteSource, overwrite, conf);
417448
}
418449
} else {
419-
InputStream in=null;
450+
InputStream in = null;
420451
OutputStream out = null;
421452
try {
422-
in = srcFS.open(src);
453+
in = awaitFuture(srcFS.openFile(src)
454+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
455+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
456+
.opt(FS_OPTION_OPENFILE_LENGTH,
457+
srcStatus.getLen()) // file length hint for object stores
458+
.build());
423459
out = dstFS.create(dst, overwrite);
424460
IOUtils.copyBytes(in, out, conf, true);
425461
} catch (IOException e) {
426-
IOUtils.closeStream(out);
427-
IOUtils.closeStream(in);
462+
IOUtils.cleanupWithLogger(LOG, in, out);
428463
throw e;
429464
}
430465
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import javax.annotation.Nullable;
2021
import java.io.IOException;
2122
import java.util.concurrent.CompletableFuture;
2223

@@ -34,7 +35,7 @@
3435
* options accordingly, for example:
3536
*
3637
* If the option is not related to the file system, the option will be ignored.
37-
* If the option is must, but not supported by the file system, a
38+
* If the option is must, but not supported/known by the file system, an
3839
* {@link IllegalArgumentException} will be thrown.
3940
*
4041
*/
@@ -51,10 +52,11 @@ CompletableFuture<FSDataInputStream> build()
5152
/**
5253
* A FileStatus may be provided to the open request.
5354
* It is up to the implementation whether to use this or not.
54-
* @param status status.
55+
* @param status status: may be null
5556
* @return the builder.
5657
*/
57-
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
default FutureDataInputStreamBuilder withFileStatus(
59+
@Nullable FileStatus status) {
5860
return this;
5961
}
6062

0 commit comments

Comments
 (0)