Skip to content

Commit 8c8f1b0

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-15229. Add FileSystem builder-based openFile() API to match createFile();
S3A to implement S3 Select through this API. The new openFile() API is asynchronous, and implemented across FileSystem and FileContext. The MapReduce V2 inputs are moved to this API, and you can actually set must/may options to pass in. This is more useful for setting things like s3a seek policy than for S3 select, as the existing input format/record readers can't handle S3 select output where the stream is shorter than the file length, and splitting plain text is suboptimal. Future work is needed there. In the meantime, any/all filesystem connectors are now free to add their own filesystem-specific configuration parameters which can be set in jobs and used to set filesystem input stream options (seek policy, retry, encryption secrets, etc). Contributed by Steve Loughran
1 parent 175aabd commit 8c8f1b0

File tree

71 files changed

+9694
-1025
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+9694
-1025
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import java.net.URISyntaxException;
2626
import java.util.ArrayList;
2727
import java.util.Collection;
28+
import java.util.Collections;
2829
import java.util.EnumSet;
2930
import java.util.HashMap;
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.NoSuchElementException;
34+
import java.util.Set;
3335
import java.util.StringTokenizer;
36+
import java.util.concurrent.CompletableFuture;
3437
import java.util.concurrent.ConcurrentHashMap;
3538

3639
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,21 +44,21 @@
4144
import org.apache.hadoop.fs.Options.ChecksumOpt;
4245
import org.apache.hadoop.fs.Options.CreateOpts;
4346
import org.apache.hadoop.fs.Options.Rename;
47+
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
4448
import org.apache.hadoop.fs.permission.AclEntry;
4549
import org.apache.hadoop.fs.permission.AclStatus;
4650
import org.apache.hadoop.fs.permission.FsAction;
4751
import org.apache.hadoop.fs.permission.FsPermission;
4852
import org.apache.hadoop.security.AccessControlException;
4953
import org.apache.hadoop.security.SecurityUtil;
5054
import org.apache.hadoop.security.token.Token;
55+
import org.apache.hadoop.util.LambdaUtils;
5156
import org.apache.hadoop.util.Progressable;
5257

5358
import com.google.common.annotations.VisibleForTesting;
5459
import org.slf4j.Logger;
5560
import org.slf4j.LoggerFactory;
5661

57-
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
58-
5962
/**
6063
* This class provides an interface for implementors of a Hadoop file system
6164
* (analogous to the VFS of Unix). Applications do not access this class;
@@ -68,7 +71,7 @@
6871
*/
6972
@InterfaceAudience.Public
7073
@InterfaceStability.Stable
71-
public abstract class AbstractFileSystem implements PathCapabilities {
74+
public abstract class AbstractFileSystem {
7275
static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystem.class);
7376

7477
/** Recording statistics per a file system class. */
@@ -398,8 +401,11 @@ public void checkPath(Path path) {
398401
thatPort = this.getUriDefaultPort();
399402
}
400403
if (thisPort != thatPort) {
401-
throw new InvalidPathException("Wrong FS: " + path + ", expected: "
402-
+ this.getUri());
404+
throw new InvalidPathException("Wrong FS: " + path
405+
+ " and port=" + thatPort
406+
+ ", expected: "
407+
+ this.getUri()
408+
+ " with port=" + thisPort);
403409
}
404410
}
405411

@@ -848,20 +854,6 @@ public abstract FileStatus getFileStatus(final Path f)
848854
throws AccessControlException, FileNotFoundException,
849855
UnresolvedLinkException, IOException;
850856

851-
/**
852-
* Synchronize client metadata state.
853-
* <p>
854-
* In some FileSystem implementations such as HDFS metadata
855-
* synchronization is essential to guarantee consistency of read requests
856-
* particularly in HA setting.
857-
* @throws IOException
858-
* @throws UnsupportedOperationException
859-
*/
860-
public void msync() throws IOException, UnsupportedOperationException {
861-
throw new UnsupportedOperationException(getClass().getCanonicalName() +
862-
" does not support method msync");
863-
}
864-
865857
/**
866858
* The specification of this method matches that of
867859
* {@link FileContext#access(Path, FsAction)}
@@ -1343,16 +1335,31 @@ public boolean equals(Object other) {
13431335
return myUri.equals(((AbstractFileSystem) other).myUri);
13441336
}
13451337

1346-
public boolean hasPathCapability(final Path path,
1347-
final String capability)
1348-
throws IOException {
1349-
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
1350-
case CommonPathCapabilities.FS_SYMLINKS:
1351-
// delegate to the existing supportsSymlinks() call.
1352-
return supportsSymlinks();
1353-
default:
1354-
// the feature is not implemented.
1355-
return false;
1356-
}
1338+
/**
1339+
* Open a file with the given set of options.
1340+
* The base implementation performs a blocking
1341+
* call to {@link #open(Path, int)}in this call;
1342+
* the actual outcome is in the returned {@code CompletableFuture}.
1343+
* This avoids having to create some thread pool, while still
1344+
* setting up the expectation that the {@code get()} call
1345+
* is needed to evaluate the result.
1346+
* @param path path to the file
1347+
* @param mandatoryKeys set of options declared as mandatory.
1348+
* @param options options set during the build sequence.
1349+
* @param bufferSize buffer size
1350+
* @return a future which will evaluate to the opened file.
1351+
* @throws IOException failure to resolve the link.
1352+
* @throws IllegalArgumentException unknown mandatory key
1353+
*/
1354+
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
1355+
Set<String> mandatoryKeys,
1356+
Configuration options,
1357+
int bufferSize) throws IOException {
1358+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
1359+
Collections.emptySet(),
1360+
"for " + path);
1361+
return LambdaUtils.eval(
1362+
new CompletableFuture<>(), () -> open(path, bufferSize));
13571363
}
1364+
13581365
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Arrays;
2525
import java.util.EnumSet;
2626
import java.util.List;
27+
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
2729

2830
import org.apache.hadoop.classification.InterfaceAudience;
2931
import org.apache.hadoop.classification.InterfaceStability;
@@ -262,10 +264,21 @@ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
262264
return Arrays.asList(fsImpl.addDelegationTokens(renewer, null));
263265
}
264266

265-
@Override
266-
public boolean hasPathCapability(final Path path,
267-
final String capability)
268-
throws IOException {
269-
return fsImpl.hasPathCapability(path, capability);
267+
/**
268+
* Open a file by delegating to
269+
* {@link FileSystem#openFileWithOptions(Path, Set, Configuration, int)}.
270+
* @param path path to the file
271+
* @param mandatoryKeys set of options declared as mandatory.
272+
* @param options options set during the build sequence.
273+
* @param bufferSize buffer size
274+
* @return a future which will evaluate to the opened file.
275+
* @throws IOException failure to resolve the link.
276+
* @throws IllegalArgumentException unknown mandatory key
277+
*/
278+
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
279+
Set<String> mandatoryKeys,
280+
Configuration options,
281+
int bufferSize) throws IOException {
282+
return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
270283
}
271284
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs;
20+
21+
import javax.annotation.Nonnull;
22+
import java.io.IOException;
23+
24+
import org.apache.hadoop.classification.InterfaceAudience;
25+
import org.apache.hadoop.classification.InterfaceStability;
26+
27+
/**
28+
* The base interface which various FileSystem FileContext Builder
29+
* interfaces can extend, and which underlying implementations
30+
* will then implement.
31+
* @param <S> Return type on the {@link #build()} call.
32+
* @param <B> type of builder itself.
33+
*/
34+
@InterfaceAudience.Public
35+
@InterfaceStability.Unstable
36+
public interface FSBuilder<S, B extends FSBuilder<S, B>> {
37+
38+
/**
39+
* Set optional Builder parameter.
40+
*/
41+
B opt(@Nonnull String key, @Nonnull String value);
42+
43+
/**
44+
* Set optional boolean parameter for the Builder.
45+
*
46+
* @see #opt(String, String)
47+
*/
48+
B opt(@Nonnull String key, boolean value);
49+
50+
/**
51+
* Set optional int parameter for the Builder.
52+
*
53+
* @see #opt(String, String)
54+
*/
55+
B opt(@Nonnull String key, int value);
56+
57+
/**
58+
* Set optional float parameter for the Builder.
59+
*
60+
* @see #opt(String, String)
61+
*/
62+
B opt(@Nonnull String key, float value);
63+
64+
/**
65+
* Set optional double parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, double value);
70+
71+
/**
72+
* Set an array of string values as optional parameter for the Builder.
73+
*
74+
* @see #opt(String, String)
75+
*/
76+
B opt(@Nonnull String key, @Nonnull String... values);
77+
78+
/**
79+
* Set mandatory option to the Builder.
80+
*
81+
* If the option is not supported or unavailable,
82+
* the client should expect {@link #build()} throws IllegalArgumentException.
83+
*/
84+
B must(@Nonnull String key, @Nonnull String value);
85+
86+
/**
87+
* Set mandatory boolean option.
88+
*
89+
* @see #must(String, String)
90+
*/
91+
B must(@Nonnull String key, boolean value);
92+
93+
/**
94+
* Set mandatory int option.
95+
*
96+
* @see #must(String, String)
97+
*/
98+
B must(@Nonnull String key, int value);
99+
100+
/**
101+
* Set mandatory float option.
102+
*
103+
* @see #must(String, String)
104+
*/
105+
B must(@Nonnull String key, float value);
106+
107+
/**
108+
* Set mandatory double option.
109+
*
110+
* @see #must(String, String)
111+
*/
112+
B must(@Nonnull String key, double value);
113+
114+
/**
115+
* Set a string array as mandatory option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, @Nonnull String... values);
120+
121+
/**
122+
* Instantiate the object which was being built.
123+
*
124+
* @throws IllegalArgumentException if the parameters are not valid.
125+
* @throws UnsupportedOperationException if the filesystem does not support
126+
* the specific operation.
127+
* @throws IOException on filesystem IO errors.
128+
*/
129+
S build() throws IllegalArgumentException,
130+
UnsupportedOperationException, IOException;
131+
}

0 commit comments

Comments
 (0)