Skip to content

Commit fe4fa5c

Browse files
committed
HADOOP-16202. Enhance openFile()
Roll-up of the previous PR Change-Id: Ib0aec173afcd8aae33f52da3f99ac813bd38c32f HADOOP-16202. javadocs and style Change-Id: Id4294ac7034155a10be22fb4631edf43cbadc22b HADOOP-16202. openFile: read policies Change "fs.option.openfile.fadvise" to "fs.option.openfile.read.policy" and expand with "vectored", "parquet" and "orc", all of which map in s3a to random. The concept is that by choosing a read policy you can do more than just change seek policy -it could switch buffering, caching etc. Change-Id: I2147840f58fb54853c797d2cab5d668c3d1d2541 HADOOP-16202: documentation changes and IOStatistics of open operations * Thomas Marquardt's suggestions on the docs * standard action name for file opened * S3AInputStream measures the count and duration of this, and reports it Change-Id: I7feacf4eb4d6494bb93b3dfc05b060ad75e52c18 HADOOP-16202. rebase to trunk; add "whole-file" option +slacken checks on Open contract tests so that if tested against an external connector things are less likely to fail. TODO: make that a compliance switch Change-Id: I9a4535d785949822752571f82f9448b9aac66aad HADOOP-16202: remove the orc, parquet and vectored options from read policy Going through Thomas's feedback... Change-Id: Ibdf2c4ec64c54704f8631d5775d83444660c923a
1 parent e8566b3 commit fe4fa5c

File tree

72 files changed

+2776
-462
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2776
-462
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

28+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
30+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
31+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
32+
2833
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
2934
@InterfaceAudience.Public
3035
@InterfaceStability.Stable
@@ -42,7 +47,13 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
4247
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
4348
FileStatus status = fc.getFileStatus(p);
4449
this.len = status.getLen();
45-
this.stream = fc.open(p);
50+
this.stream = awaitFuture(fc.openFile(p)
51+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
52+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
53+
.opt(FS_OPTION_OPENFILE_LENGTH,
54+
status.getLen()) // file length hint for object stores
55+
.build());
56+
fc.open(p);
4657
}
4758

4859
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.InputStream;
2525
import java.nio.channels.ClosedChannelException;
2626
import java.util.Arrays;
27-
import java.util.Collections;
2827
import java.util.EnumSet;
2928
import java.util.List;
3029
import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,7 @@
4544
import org.apache.hadoop.util.LambdaUtils;
4645
import org.apache.hadoop.util.Progressable;
4746

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
4848
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
4949
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
5050

@@ -889,7 +889,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
889889
final OpenFileParameters parameters) throws IOException {
890890
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
891891
parameters.getMandatoryKeys(),
892-
Collections.emptySet(),
892+
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
893893
"for " + path);
894894
return LambdaUtils.eval(
895895
new CompletableFuture<>(),

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
6161
*/
6262
B opt(@Nonnull String key, float value);
6363

64+
/**
65+
* Set optional long parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, long value);
70+
6471
/**
6572
* Set optional double parameter for the Builder.
6673
*
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
104111
*/
105112
B must(@Nonnull String key, float value);
106113

114+
/**
115+
* Set mandatory long option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, long value);
120+
107121
/**
108122
* Set mandatory double option.
109123
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@
7070
import org.slf4j.Logger;
7171
import org.slf4j.LoggerFactory;
7272

73+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
74+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
7377
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
78+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
7479

7580
/**
7681
* The FileContext class provides an interface for users of the Hadoop
@@ -2198,7 +2203,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
21982203
EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
21992204
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
22002205
EnumSet.of(CreateFlag.CREATE);
2201-
InputStream in = open(qSrc);
2206+
InputStream in = awaitFuture(openFile(qSrc)
2207+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
2208+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
2209+
.opt(FS_OPTION_OPENFILE_LENGTH,
2210+
fs.getLen()) // file length hint for object stores
2211+
.build());
22022212
try (OutputStream out = create(qDst, createFlag)) {
22032213
IOUtils.copyBytes(in, out, conf, true);
22042214
} finally {
@@ -2930,9 +2940,13 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
29302940
final Path absF = fixRelativePart(getPath());
29312941
OpenFileParameters parameters = new OpenFileParameters()
29322942
.withMandatoryKeys(getMandatoryKeys())
2943+
.withOptionalKeys(getOptionalKeys())
29332944
.withOptions(getOptions())
29342945
.withBufferSize(getBufferSize())
29352946
.withStatus(getStatus());
2947+
parameters.withBufferSize(
2948+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE,
2949+
getBufferSize()));
29362950
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29372951
@Override
29382952
public CompletableFuture<FSDataInputStream> next(

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.slf4j.Logger;
8989
import org.slf4j.LoggerFactory;
9090

91+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
9192
import static org.apache.hadoop.util.Preconditions.checkArgument;
9293
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
9394
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -4616,7 +4617,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46164617
final OpenFileParameters parameters) throws IOException {
46174618
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46184619
parameters.getMandatoryKeys(),
4619-
Collections.emptySet(),
4620+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
46204621
"for " + path);
46214622
return LambdaUtils.eval(
46224623
new CompletableFuture<>(), () ->
@@ -4644,7 +4645,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46444645
final OpenFileParameters parameters) throws IOException {
46454646
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46464647
parameters.getMandatoryKeys(),
4647-
Collections.emptySet(), "");
4648+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
46484649
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
46494650
try {
46504651
result.complete(open(pathHandle, parameters.getBufferSize()));
@@ -4751,9 +4752,13 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
47514752
Optional<Path> optionalPath = getOptionalPath();
47524753
OpenFileParameters parameters = new OpenFileParameters()
47534754
.withMandatoryKeys(getMandatoryKeys())
4755+
.withOptionalKeys(getOptionalKeys())
47544756
.withOptions(getOptions())
4755-
.withBufferSize(getBufferSize())
47564757
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
4758+
// buffer size can be configured
4759+
parameters.withBufferSize(
4760+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE,
4761+
getBufferSize()));
47574762
if(optionalPath.isPresent()) {
47584763
return getFS().openFileWithOptions(optionalPath.get(),
47594764
parameters);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@
7272
import org.slf4j.Logger;
7373
import org.slf4j.LoggerFactory;
7474

75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
77+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
78+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
79+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
80+
7581
/**
7682
* A collection of file-processing util methods
7783
*/
@@ -391,7 +397,32 @@ public static boolean copy(FileSystem srcFS, Path src,
391397
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
392398
}
393399

394-
/** Copy files between FileSystems. */
400+
/**
401+
* Copy a file/directory tree within/between filesystems.
402+
* <p></p>
403+
* returns true if the operation succeeded. When deleteSource is true,
404+
* this means "after the copy, delete(source) returned true"
405+
* If the destination is a directory, and mkdirs (dest) fails,
406+
* the operation will return false rather than raise any exception.
407+
* <p></p>
408+
* The overwrite flag is about overwriting files; it has no effect about
409+
* handing an attempt to copy a file atop a directory (expect an IOException),
410+
* or a directory over a path which contains a file (mkdir will fail, so
411+
* "false").
412+
* <p></p>
413+
* The operation is recursive, and the deleteSource operation takes place
414+
* as each subdirectory is copied. Therefore, if an operation fails partway
415+
* through, the source tree may be partially deleted.
416+
* @param srcFS source filesystem
417+
* @param srcStatus status of source
418+
* @param dstFS destination filesystem
419+
* @param dst path of source
420+
* @param deleteSource delete the source?
421+
* @param overwrite overwrite files at destination?
422+
* @param conf configuration to use when opening files
423+
* @return true if the operation succeeded.
424+
* @throws IOException failure
425+
*/
395426
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
396427
FileSystem dstFS, Path dst,
397428
boolean deleteSource,
@@ -404,22 +435,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
404435
if (!dstFS.mkdirs(dst)) {
405436
return false;
406437
}
407-
FileStatus contents[] = srcFS.listStatus(src);
408-
for (int i = 0; i < contents.length; i++) {
409-
copy(srcFS, contents[i], dstFS,
410-
new Path(dst, contents[i].getPath().getName()),
411-
deleteSource, overwrite, conf);
438+
RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
439+
while (contents.hasNext()) {
440+
FileStatus next = contents.next();
441+
copy(srcFS, next, dstFS,
442+
new Path(dst, next.getPath().getName()),
443+
deleteSource, overwrite, conf);
412444
}
413445
} else {
414-
InputStream in=null;
446+
InputStream in = null;
415447
OutputStream out = null;
416448
try {
417-
in = srcFS.open(src);
449+
in = awaitFuture(srcFS.openFile(src)
450+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
451+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
452+
.opt(FS_OPTION_OPENFILE_LENGTH,
453+
srcStatus.getLen()) // file length hint for object stores
454+
.build());
418455
out = dstFS.create(dst, overwrite);
419456
IOUtils.copyBytes(in, out, conf, true);
420457
} catch (IOException e) {
421-
IOUtils.closeStream(out);
422-
IOUtils.closeStream(in);
458+
IOUtils.cleanupWithLogger(LOG, in, out);
423459
throw e;
424460
}
425461
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import javax.annotation.Nullable;
2021
import java.io.IOException;
2122
import java.util.concurrent.CompletableFuture;
2223

@@ -34,7 +35,7 @@
3435
* options accordingly, for example:
3536
*
3637
* If the option is not related to the file system, the option will be ignored.
37-
* If the option is must, but not supported by the file system, a
38+
* If the option is must, but not supported/known by the file system, an
3839
* {@link IllegalArgumentException} will be thrown.
3940
*
4041
*/
@@ -51,10 +52,11 @@ CompletableFuture<FSDataInputStream> build()
5152
/**
5253
* A FileStatus may be provided to the open request.
5354
* It is up to the implementation whether to use this or not.
54-
* @param status status.
55+
* @param status status: may be null
5556
* @return the builder.
5657
*/
57-
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
default FutureDataInputStreamBuilder withFileStatus(
59+
@Nullable FileStatus status) {
5860
return this;
5961
}
6062

0 commit comments

Comments
 (0)