Skip to content

Commit b2f0c3f

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-16759. Filesystem openFile() builder to take a FileStatus param (apache#1761). Contributed by Steve Loughran
* Enhanced builder + FS spec * s3a FS to use this to skip HEAD on open * and to use version/etag when opening the file works with S3AFileStatus FS and S3ALocatedFileStatus
1 parent 9726b46 commit b2f0c3f

File tree

13 files changed

+252
-76
lines changed

13 files changed

+252
-76
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.NoSuchElementException;
34-
import java.util.Set;
3534
import java.util.StringTokenizer;
3635
import java.util.concurrent.CompletableFuture;
3736
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +44,7 @@
4544
import org.apache.hadoop.fs.Options.CreateOpts;
4645
import org.apache.hadoop.fs.Options.Rename;
4746
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
47+
import org.apache.hadoop.fs.impl.OpenFileParameters;
4848
import org.apache.hadoop.fs.permission.AclEntry;
4949
import org.apache.hadoop.fs.permission.AclStatus;
5050
import org.apache.hadoop.fs.permission.FsAction;
@@ -1355,22 +1355,20 @@ public boolean equals(Object other) {
13551355
* setting up the expectation that the {@code get()} call
13561356
* is needed to evaluate the result.
13571357
* @param path path to the file
1358-
* @param mandatoryKeys set of options declared as mandatory.
1359-
* @param options options set during the build sequence.
1360-
* @param bufferSize buffer size
1358+
* @param parameters open file parameters from the builder.
13611359
* @return a future which will evaluate to the opened file.
13621360
* @throws IOException failure to resolve the link.
13631361
* @throws IllegalArgumentException unknown mandatory key
13641362
*/
13651363
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
1366-
Set<String> mandatoryKeys,
1367-
Configuration options,
1368-
int bufferSize) throws IOException {
1369-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
1364+
final OpenFileParameters parameters) throws IOException {
1365+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
1366+
parameters.getMandatoryKeys(),
13701367
Collections.emptySet(),
13711368
"for " + path);
13721369
return LambdaUtils.eval(
1373-
new CompletableFuture<>(), () -> open(path, bufferSize));
1370+
new CompletableFuture<>(), () ->
1371+
open(path, parameters.getBufferSize()));
13741372
}
13751373

13761374
public boolean hasPathCapability(final Path path,

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import java.util.Arrays;
2525
import java.util.EnumSet;
2626
import java.util.List;
27-
import java.util.Set;
2827
import java.util.concurrent.CompletableFuture;
2928

3029
import org.apache.hadoop.classification.InterfaceAudience;
3130
import org.apache.hadoop.classification.InterfaceStability;
3231
import org.apache.hadoop.conf.Configuration;
3332
import org.apache.hadoop.fs.Options.ChecksumOpt;
33+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3434
import org.apache.hadoop.fs.permission.FsPermission;
3535
import org.apache.hadoop.security.token.Token;
3636
import org.apache.hadoop.util.Progressable;
@@ -266,20 +266,17 @@ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
266266

267267
/**
268268
* Open a file by delegating to
269-
* {@link FileSystem#openFileWithOptions(Path, Set, Configuration, int)}.
269+
* {@link FileSystem#openFileWithOptions(Path, org.apache.hadoop.fs.impl.OpenFileParameters)}.
270270
* @param path path to the file
271-
* @param mandatoryKeys set of options declared as mandatory.
272-
* @param options options set during the build sequence.
273-
* @param bufferSize buffer size
274-
* @return a future which will evaluate to the opened file.
271+
* @param parameters open file parameters from the builder.
272+
*
273+
* @return a future which will evaluate to the opened file.ControlAlpha
275274
* @throws IOException failure to resolve the link.
276275
* @throws IllegalArgumentException unknown mandatory key
277276
*/
278277
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
279-
Set<String> mandatoryKeys,
280-
Configuration options,
281-
int bufferSize) throws IOException {
282-
return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
278+
final OpenFileParameters parameters) throws IOException {
279+
return fsImpl.openFileWithOptions(path, parameters);
283280
}
284281

285282
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import org.apache.hadoop.fs.Options.CreateOpts;
4848
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
4949
import org.apache.hadoop.fs.impl.FsLinkResolution;
50-
import org.apache.hadoop.fs.impl.PathCapabilitiesSupport;
50+
import org.apache.hadoop.fs.impl.OpenFileParameters;
5151
import org.apache.hadoop.fs.permission.AclEntry;
5252
import org.apache.hadoop.fs.permission.AclStatus;
5353
import org.apache.hadoop.fs.permission.FsAction;
@@ -2924,16 +2924,18 @@ protected FSDataInputStreamBuilder(
29242924
@Override
29252925
public CompletableFuture<FSDataInputStream> build() throws IOException {
29262926
final Path absF = fixRelativePart(getPath());
2927+
OpenFileParameters parameters = new OpenFileParameters()
2928+
.withMandatoryKeys(getMandatoryKeys())
2929+
.withOptions(getOptions())
2930+
.withBufferSize(getBufferSize())
2931+
.withStatus(getStatus());
29272932
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29282933
@Override
29292934
public CompletableFuture<FSDataInputStream> next(
29302935
final AbstractFileSystem fs,
29312936
final Path p)
29322937
throws IOException {
2933-
return fs.openFileWithOptions(p,
2934-
getMandatoryKeys(),
2935-
getOptions(),
2936-
getBufferSize());
2938+
return fs.openFileWithOptions(p, parameters);
29372939
}
29382940
}.resolve(FileContext.this, absF);
29392941
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.fs.Options.Rename;
5959
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
6060
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
61+
import org.apache.hadoop.fs.impl.OpenFileParameters;
6162
import org.apache.hadoop.fs.permission.AclEntry;
6263
import org.apache.hadoop.fs.permission.AclStatus;
6364
import org.apache.hadoop.fs.permission.FsAction;
@@ -4443,43 +4444,39 @@ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
44434444
* the action of opening the file should begin.
44444445
*
44454446
* The base implementation performs a blocking
4446-
* call to {@link #open(Path, int)}in this call;
4447+
* call to {@link #open(Path, int)} in this call;
44474448
* the actual outcome is in the returned {@code CompletableFuture}.
44484449
* This avoids having to create some thread pool, while still
44494450
* setting up the expectation that the {@code get()} call
44504451
* is needed to evaluate the result.
44514452
* @param path path to the file
4452-
* @param mandatoryKeys set of options declared as mandatory.
4453-
* @param options options set during the build sequence.
4454-
* @param bufferSize buffer size
4453+
* @param parameters open file parameters from the builder.
44554454
* @return a future which will evaluate to the opened file.
44564455
* @throws IOException failure to resolve the link.
44574456
* @throws IllegalArgumentException unknown mandatory key
44584457
*/
44594458
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
44604459
final Path path,
4461-
final Set<String> mandatoryKeys,
4462-
final Configuration options,
4463-
final int bufferSize) throws IOException {
4464-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
4460+
final OpenFileParameters parameters) throws IOException {
4461+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
4462+
parameters.getMandatoryKeys(),
44654463
Collections.emptySet(),
44664464
"for " + path);
44674465
return LambdaUtils.eval(
4468-
new CompletableFuture<>(), () -> open(path, bufferSize));
4466+
new CompletableFuture<>(), () ->
4467+
open(path, parameters.getBufferSize()));
44694468
}
44704469

44714470
/**
44724471
* Execute the actual open file operation.
44734472
* The base implementation performs a blocking
4474-
* call to {@link #open(Path, int)}in this call;
4473+
* call to {@link #open(Path, int)} in this call;
44754474
* the actual outcome is in the returned {@code CompletableFuture}.
44764475
* This avoids having to create some thread pool, while still
44774476
* setting up the expectation that the {@code get()} call
44784477
* is needed to evaluate the result.
44794478
* @param pathHandle path to the file
4480-
* @param mandatoryKeys set of options declared as mandatory.
4481-
* @param options options set during the build sequence.
4482-
* @param bufferSize buffer size
4479+
* @param parameters open file parameters from the builder.
44834480
* @return a future which will evaluate to the opened file.
44844481
* @throws IOException failure to resolve the link.
44854482
* @throws IllegalArgumentException unknown mandatory key
@@ -4488,14 +4485,13 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
44884485
*/
44894486
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
44904487
final PathHandle pathHandle,
4491-
final Set<String> mandatoryKeys,
4492-
final Configuration options,
4493-
final int bufferSize) throws IOException {
4494-
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
4488+
final OpenFileParameters parameters) throws IOException {
4489+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
4490+
parameters.getMandatoryKeys(),
44954491
Collections.emptySet(), "");
44964492
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
44974493
try {
4498-
result.complete(open(pathHandle, bufferSize));
4494+
result.complete(open(pathHandle, parameters.getBufferSize()));
44994495
} catch (UnsupportedOperationException tx) {
45004496
// fail fast here
45014497
throw tx;
@@ -4551,12 +4547,17 @@ protected FSDataInputStreamBuilder(
45514547
@Override
45524548
public CompletableFuture<FSDataInputStream> build() throws IOException {
45534549
Optional<Path> optionalPath = getOptionalPath();
4550+
OpenFileParameters parameters = new OpenFileParameters()
4551+
.withMandatoryKeys(getMandatoryKeys())
4552+
.withOptions(getOptions())
4553+
.withBufferSize(getBufferSize())
4554+
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
45544555
if(optionalPath.isPresent()) {
45554556
return getFS().openFileWithOptions(optionalPath.get(),
4556-
getMandatoryKeys(), getOptions(), getBufferSize());
4557+
parameters);
45574558
} else {
45584559
return getFS().openFileWithOptions(getPathHandle(),
4559-
getMandatoryKeys(), getOptions(), getBufferSize());
4560+
parameters);
45604561
}
45614562
}
45624563

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import java.util.EnumSet;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.Set;
2928
import java.util.concurrent.CompletableFuture;
3029

3130
import org.apache.hadoop.classification.InterfaceAudience;
3231
import org.apache.hadoop.classification.InterfaceStability;
3332
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3434
import org.apache.hadoop.fs.permission.AclEntry;
3535
import org.apache.hadoop.fs.permission.AclStatus;
3636
import org.apache.hadoop.fs.permission.FsAction;
@@ -710,20 +710,15 @@ public FutureDataInputStreamBuilder openFile(final PathHandle pathHandle)
710710
@Override
711711
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
712712
final Path path,
713-
final Set<String> mandatoryKeys,
714-
final Configuration options,
715-
final int bufferSize) throws IOException {
716-
return fs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
713+
final OpenFileParameters parameters) throws IOException {
714+
return fs.openFileWithOptions(path, parameters);
717715
}
718716

719717
@Override
720718
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
721719
final PathHandle pathHandle,
722-
final Set<String> mandatoryKeys,
723-
final Configuration options,
724-
final int bufferSize) throws IOException {
725-
return fs.openFileWithOptions(pathHandle, mandatoryKeys, options,
726-
bufferSize);
720+
final OpenFileParameters parameters) throws IOException {
721+
return fs.openFileWithOptions(pathHandle, parameters);
727722
}
728723

729724
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@
2626
import java.util.EnumSet;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Set;
3029
import java.util.concurrent.CompletableFuture;
3130

3231
import org.apache.hadoop.classification.InterfaceAudience;
3332
import org.apache.hadoop.classification.InterfaceStability;
34-
import org.apache.hadoop.conf.Configuration;
3533
import org.apache.hadoop.fs.FileSystem.Statistics;
34+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3635
import org.apache.hadoop.fs.permission.AclEntry;
3736
import org.apache.hadoop.fs.permission.AclStatus;
3837
import org.apache.hadoop.fs.permission.FsAction;
@@ -440,10 +439,8 @@ public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
440439
@Override
441440
public CompletableFuture<FSDataInputStream> openFileWithOptions(
442441
final Path path,
443-
final Set<String> mandatoryKeys,
444-
final Configuration options,
445-
final int bufferSize) throws IOException {
446-
return myFs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
442+
final OpenFileParameters parameters) throws IOException {
443+
return myFs.openFileWithOptions(path, parameters);
447444
}
448445

449446
public boolean hasPathCapability(final Path path,

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,15 @@ public interface FutureDataInputStreamBuilder
4747
CompletableFuture<FSDataInputStream> build()
4848
throws IllegalArgumentException, UnsupportedOperationException,
4949
IOException;
50+
51+
/**
52+
* A FileStatus may be provided to the open request.
53+
* It is up to the implementation whether to use this or not.
54+
* @param status status.
55+
* @return the builder.
56+
*/
57+
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
return this;
59+
}
60+
5061
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@
2626
import org.apache.hadoop.classification.InterfaceStability;
2727
import org.apache.hadoop.fs.FSDataInputStream;
2828
import org.apache.hadoop.fs.FileContext;
29+
import org.apache.hadoop.fs.FileStatus;
2930
import org.apache.hadoop.fs.FileSystem;
3031
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3132
import org.apache.hadoop.fs.Path;
3233
import org.apache.hadoop.fs.PathHandle;
3334

34-
import static com.google.common.base.Preconditions.checkNotNull;
35+
import static java.util.Objects.requireNonNull;
3536
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
3637
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
3738

@@ -60,6 +61,12 @@ public abstract class FutureDataInputStreamBuilderImpl
6061

6162
private int bufferSize;
6263

64+
/**
65+
* File status passed in through a {@link #withFileStatus(FileStatus)}
66+
* call; null otherwise.
67+
*/
68+
private FileStatus status;
69+
6370
/**
6471
* Construct from a {@link FileContext}.
6572
*
@@ -69,8 +76,8 @@ public abstract class FutureDataInputStreamBuilderImpl
6976
*/
7077
protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc,
7178
@Nonnull Path path) throws IOException {
72-
super(checkNotNull(path));
73-
checkNotNull(fc);
79+
super(requireNonNull(path, "path"));
80+
requireNonNull(fc, "file context");
7481
this.fileSystem = null;
7582
bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
7683
}
@@ -82,8 +89,8 @@ protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc,
8289
*/
8390
protected FutureDataInputStreamBuilderImpl(@Nonnull FileSystem fileSystem,
8491
@Nonnull Path path) {
85-
super(checkNotNull(path));
86-
this.fileSystem = checkNotNull(fileSystem);
92+
super(requireNonNull(path, "path"));
93+
this.fileSystem = requireNonNull(fileSystem, "fileSystem");
8794
initFromFS();
8895
}
8996

@@ -108,7 +115,7 @@ private void initFromFS() {
108115
}
109116

110117
protected FileSystem getFS() {
111-
checkNotNull(fileSystem);
118+
requireNonNull(fileSystem, "fileSystem");
112119
return fileSystem;
113120
}
114121

@@ -138,4 +145,18 @@ public FutureDataInputStreamBuilder builder() {
138145
public FutureDataInputStreamBuilder getThisBuilder() {
139146
return this;
140147
}
148+
149+
@Override
150+
public FutureDataInputStreamBuilder withFileStatus(FileStatus st) {
151+
this.status = requireNonNull(st, "status");
152+
return this;
153+
}
154+
155+
/**
156+
* Get any status set in {@link #withFileStatus(FileStatus)}.
157+
* @return a status value or null.
158+
*/
159+
protected FileStatus getStatus() {
160+
return status;
161+
}
141162
}

0 commit comments

Comments
 (0)