Skip to content

Commit e14c4b6

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-16202. Enhanced openFile(): hadoop-common changes. (apache#2584/1)
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
1 parent d890396 commit e14c4b6

File tree

10 files changed

+210
-26
lines changed

10 files changed

+210
-26
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,18 @@
2020

2121
import java.io.IOException;
2222
import java.io.UncheckedIOException;
23-
import java.util.concurrent.ExecutionException;
2423

2524
import com.google.common.base.Preconditions;
2625

2726
import org.apache.hadoop.classification.InterfaceAudience;
2827
import org.apache.hadoop.classification.InterfaceStability;
2928

3029
/**
31-
* A wrapper for an IOException which
32-
* {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to
33-
* always extract the exception.
30+
* A wrapper for an IOException.
3431
*
3532
* The constructor signature guarantees the cause will be an IOException,
3633
* and as it checks for a null-argument, non-null.
37-
* @deprecated use the {@code UncheckedIOException}.
34+
* @deprecated use the {@code UncheckedIOException} directly.]
3835
*/
3936
@Deprecated
4037
@InterfaceAudience.Private
@@ -52,8 +49,4 @@ public WrappedIOException(final IOException cause) {
5249
super(Preconditions.checkNotNull(cause));
5350
}
5451

55-
@Override
56-
public synchronized IOException getCause() {
57-
return (IOException) super.getCause();
58-
}
5952
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.classification.InterfaceAudience;
3030
import org.apache.hadoop.classification.InterfaceStability;
3131
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.fs.FSDataInputStream;
3233
import org.apache.hadoop.fs.FileStatus;
3334
import org.apache.hadoop.fs.FileSystem;
3435
import org.apache.hadoop.fs.LocalFileSystem;
@@ -39,6 +40,12 @@
3940
import org.apache.hadoop.fs.PathNotFoundException;
4041
import org.apache.hadoop.fs.RemoteIterator;
4142

43+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
44+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
45+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
46+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
47+
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
48+
4249
/**
4350
* Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).
4451
* PathData ensures that the returned path string will be the same as the
@@ -611,4 +618,34 @@ public boolean equals(Object o) {
611618
public int hashCode() {
612619
return path.hashCode();
613620
}
621+
622+
623+
/**
624+
* Open a file for sequential IO.
625+
* <p></p>
626+
* This uses FileSystem.openFile() to request sequential IO;
627+
* the file status is also passed in.
628+
* Filesystems may use to optimize their IO.
629+
* @return an input stream
630+
* @throws IOException failure
631+
*/
632+
protected FSDataInputStream openForSequentialIO()
633+
throws IOException {
634+
return openFile(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
635+
}
636+
637+
/**
638+
* Open a file.
639+
* @param policy fadvise policy.
640+
* @return an input stream
641+
* @throws IOException failure
642+
*/
643+
protected FSDataInputStream openFile(final String policy) throws IOException {
644+
return awaitFuture(fs.openFile(path)
645+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
646+
policy)
647+
.opt(FS_OPTION_OPENFILE_LENGTH,
648+
stat.getLen()) // file length hint for object stores
649+
.build());
650+
}
614651
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
3434

35+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
36+
3537
/**
3638
* Get a listing of all files in that match the file patterns.
3739
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public final class StoreStatisticNames {
103103
/** {@value}. */
104104
public static final String OP_OPEN = "op_open";
105105

106+
/** Call to openFile() {@value}. */
107+
public static final String OP_OPENFILE = "op_openfile";
108+
106109
/** {@value}. */
107110
public static final String OP_REMOVE_ACL = "op_remove_acl";
108111

@@ -283,6 +286,12 @@ public final class StoreStatisticNames {
283286
public static final String ACTION_EXECUTOR_ACQUIRED =
284287
"action_executor_acquired";
285288

289+
/**
290+
* A file was opened: {@value}.
291+
*/
292+
public static final String ACTION_FILE_OPENED
293+
= "action_file_opened";
294+
286295
/**
287296
* An HTTP HEAD request was made: {@value}.
288297
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public final class StreamStatisticNames {
7676
public static final String STREAM_READ_CLOSED = "stream_read_closed";
7777

7878
/**
79-
* Total count of times an attempt to close an input stream was made
79+
* Total count of times an attempt to close an input stream was made.
8080
* Value: {@value}.
8181
*/
8282
public static final String STREAM_READ_CLOSE_OPERATIONS
@@ -118,6 +118,23 @@ public final class StreamStatisticNames {
118118
public static final String STREAM_READ_OPERATIONS_INCOMPLETE
119119
= "stream_read_operations_incomplete";
120120

121+
/**
122+
* count/duration of aborting a remote stream during stream IO
123+
* IO.
124+
* Value: {@value}.
125+
*/
126+
public static final String STREAM_READ_REMOTE_STREAM_ABORTED
127+
= "stream_read_remote_stream_aborted";
128+
129+
/**
130+
* count/duration of closing a remote stream,
131+
* possibly including draining the stream to recycle
132+
* the HTTP connection.
133+
* Value: {@value}.
134+
*/
135+
public static final String STREAM_READ_REMOTE_STREAM_DRAINED
136+
= "stream_read_remote_stream_drain";
137+
121138
/**
122139
* Count of version mismatches encountered while reading an input stream.
123140
* Value: {@value}.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -484,23 +484,39 @@ public static <B> CallableRaisingIOE<B> trackDurationOfOperation(
484484
// create the tracker outside try-with-resources so
485485
// that failures can be set in the catcher.
486486
DurationTracker tracker = createTracker(factory, statistic);
487-
try {
488-
// exec the input function and return its value
489-
return input.apply();
490-
} catch (IOException | RuntimeException e) {
491-
// input function failed: note it
492-
tracker.failed();
493-
// and rethrow
494-
throw e;
495-
} finally {
496-
// update the tracker.
497-
// this is called after the catch() call would have
498-
// set the failed flag.
499-
tracker.close();
500-
}
487+
return invokeTrackingDuration(tracker, input);
501488
};
502489
}
503490

491+
/**
492+
* Given an IOException raising callable/lambda expression,
493+
* execute it, updating the tracker on success/failure.
494+
* @param tracker duration tracker.
495+
* @param input input callable.
496+
* @param <B> return type.
497+
* @return the result of the invocation
498+
* @throws IOException on failure.
499+
*/
500+
public static <B> B invokeTrackingDuration(
501+
final DurationTracker tracker,
502+
final CallableRaisingIOE<B> input)
503+
throws IOException {
504+
try {
505+
// exec the input function and return its value
506+
return input.apply();
507+
} catch (IOException | RuntimeException e) {
508+
// input function failed: note it
509+
tracker.failed();
510+
// and rethrow
511+
throw e;
512+
} finally {
513+
// update the tracker.
514+
// this is called after the catch() call would have
515+
// set the failed flag.
516+
tracker.close();
517+
}
518+
}
519+
504520
/**
505521
* Given an IOException raising Consumer,
506522
* return a new one which wraps the inner and tracks

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.IOException;
2222
import java.io.InterruptedIOException;
2323
import java.io.UncheckedIOException;
24+
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
2426
import java.util.concurrent.CompletionException;
2527
import java.util.concurrent.ExecutionException;
2628
import java.util.concurrent.Future;
@@ -29,6 +31,8 @@
2931

3032
import org.apache.hadoop.classification.InterfaceAudience;
3133
import org.apache.hadoop.classification.InterfaceStability;
34+
import org.apache.hadoop.conf.Configuration;
35+
import org.apache.hadoop.fs.FSBuilder;
3236

3337
/**
3438
* Future IO Helper methods.
@@ -86,6 +90,8 @@ public static <T> T awaitFuture(final Future<T> future)
8690
* extracted and rethrown.
8791
* </p>
8892
* @param future future to evaluate
93+
* @param timeout timeout to wait
94+
* @param unit time unit.
8995
* @param <T> type of the result.
9096
* @return the result, if all went well.
9197
* @throws InterruptedIOException future was interrupted
@@ -185,4 +191,88 @@ public static IOException unwrapInnerException(final Throwable e) {
185191
}
186192
}
187193

194+
/**
195+
* Propagate options to any builder, converting everything with the
196+
* prefix to an option where, if there were 2+ dot-separated elements,
197+
* it is converted to a schema.
198+
* See {@link #propagateOptions(FSBuilder, Configuration, String, boolean)}.
199+
* @param builder builder to modify
200+
* @param conf configuration to read
201+
* @param optionalPrefix prefix for optional settings
202+
* @param mandatoryPrefix prefix for mandatory settings
203+
* @param <T> type of result
204+
* @param <U> type of builder
205+
* @return the builder passed in.
206+
*/
207+
public static <T, U extends FSBuilder<T, U>>
208+
FSBuilder<T, U> propagateOptions(
209+
final FSBuilder<T, U> builder,
210+
final Configuration conf,
211+
final String optionalPrefix,
212+
final String mandatoryPrefix) {
213+
propagateOptions(builder, conf,
214+
optionalPrefix, false);
215+
propagateOptions(builder, conf,
216+
mandatoryPrefix, true);
217+
return builder;
218+
}
219+
220+
/**
221+
* Propagate options to any builder, converting everything with the
222+
* prefix to an option where, if there were 2+ dot-separated elements,
223+
* it is converted to a schema.
224+
* <pre>
225+
* fs.example.s3a.option becomes "s3a.option"
226+
* fs.example.fs.io.policy becomes "fs.io.policy"
227+
* fs.example.something becomes "something"
228+
* </pre>
229+
* @param builder builder to modify
230+
* @param conf configuration to read
231+
* @param prefix prefix to scan/strip
232+
* @param mandatory are the options to be mandatory or optional?
233+
*/
234+
public static void propagateOptions(
235+
final FSBuilder<?, ?> builder,
236+
final Configuration conf,
237+
final String prefix,
238+
final boolean mandatory) {
239+
240+
final String p = prefix.endsWith(".") ? prefix : (prefix + ".");
241+
final Map<String, String> propsWithPrefix = conf.getPropsWithPrefix(p);
242+
for (Map.Entry<String, String> entry : propsWithPrefix.entrySet()) {
243+
// change the schema off each entry
244+
String key = entry.getKey();
245+
String val = entry.getValue();
246+
if (mandatory) {
247+
builder.must(key, val);
248+
} else {
249+
builder.opt(key, val);
250+
}
251+
}
252+
}
253+
254+
/**
255+
* Evaluate a CallableRaisingIOE in the current thread,
256+
* converting IOEs to RTEs and propagating.
257+
* @param callable callable to invoke
258+
* @param <T> Return type.
259+
* @return the evaluated result.
260+
* @throws UnsupportedOperationException fail fast if unsupported
261+
* @throws IllegalArgumentException invalid argument
262+
*/
263+
public static <T> CompletableFuture<T> eval(
264+
CallableRaisingIOE<T> callable) {
265+
CompletableFuture<T> result = new CompletableFuture<>();
266+
try {
267+
result.complete(callable.apply());
268+
} catch (UnsupportedOperationException | IllegalArgumentException tx) {
269+
// fail fast here
270+
throw tx;
271+
} catch (Throwable tx) {
272+
// fail lazily here to ensure callers expect all File IO operations to
273+
// surface later
274+
result.completeExceptionally(tx);
275+
}
276+
return result;
277+
}
188278
}

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,4 @@ HDFS as these are commonly expected by Hadoop client applications.
3939
2. [Extending the specification and its tests](extending.html)
4040
1. [Uploading a file using Multiple Parts](multipartuploader.html)
4141
1. [IOStatistics](iostatistics.html)
42+
1. [openFile()](openfile.html).

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.hadoop.classification.InterfaceAudience;
3737
import org.apache.hadoop.classification.InterfaceStability;
3838

39+
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
40+
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
3941
import static org.assertj.core.api.Assertions.assertThat;
4042

4143
/**
@@ -347,6 +349,24 @@ public static AbstractLongAssert<?> assertThatStatisticMaximum(
347349
verifyStatisticsNotNull(stats).maximums());
348350
}
349351

352+
/**
353+
* Assert that a duration is within a given minimum/maximum range.
354+
* @param stats statistics source
355+
* @param key statistic key without any suffix
356+
* @param min minimum statistic must be equal to or greater than this.
357+
* @param max maximum statistic must be equal to or less than this.
358+
*/
359+
public static void assertDurationRange(
360+
final IOStatistics stats,
361+
final String key,
362+
final long min,
363+
final long max) {
364+
assertThatStatisticMinimum(stats, key + SUFFIX_MIN)
365+
.isGreaterThanOrEqualTo(min);
366+
assertThatStatisticMaximum(stats, key + SUFFIX_MAX)
367+
.isLessThanOrEqualTo(max);
368+
}
369+
350370
/**
351371
* Start an assertion chain on
352372
* a required mean statistic.

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33-
import org.apache.hadoop.fs.impl.FutureIOSupport;
3433
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
3534
import org.apache.hadoop.test.AbstractHadoopTestBase;
3635
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
@@ -276,7 +275,7 @@ public void testCallableIOEFailureDuration() throws Throwable {
276275
*/
277276
@Test
278277
public void testDurationThroughEval() throws Throwable {
279-
CompletableFuture<Object> eval = FutureIOSupport.eval(
278+
CompletableFuture<Object> eval = FutureIO.eval(
280279
trackDurationOfOperation(stats, REQUESTS, () -> {
281280
sleepf(100);
282281
throw new FileNotFoundException("oops");

0 commit comments

Comments
 (0)