Skip to content

Commit 6999acf

Browse files
committed
HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2)
These changes ensure that sequential files are opened with the right read policy, and split start/end is passed in. As well as offering opportunities for filesystem clients to choose fetch/cache/seek policies, the settings ensure that processing text files on an s3 bucket where the default policy is "random" will still be processed efficiently. This commit depends on the associated hadoop-common patch, which must be committed first. Contributed by Steve Loughran. Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
1 parent 1b4dba9 commit 6999acf

File tree

10 files changed

+94
-28
lines changed

10 files changed

+94
-28
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

38+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
39+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
40+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
41+
3842
/**
3943
* Reads in history events from the JobHistoryFile and sends them out again
4044
* to be recorded.
@@ -118,7 +122,11 @@ public static FSDataInputStream getPreviousJobHistoryFileStream(
118122
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
119123
jobId, (applicationAttemptId.getAttemptId() - 1)));
120124
LOG.info("History file is at " + historyFile);
121-
in = fc.open(historyFile);
125+
in = awaitFuture(
126+
fc.openFile(historyFile)
127+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
128+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
129+
.build());
122130
return in;
123131
}
124132

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2929
import org.apache.hadoop.fs.Path;
3030
import org.apache.hadoop.fs.Seekable;
31-
import org.apache.hadoop.fs.impl.FutureIOSupport;
3231
import org.apache.hadoop.io.LongWritable;
3332
import org.apache.hadoop.io.Text;
3433
import org.apache.hadoop.io.compress.CodecPool;
@@ -41,9 +40,13 @@
4140
import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
4241
import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
4342
import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
43+
import org.apache.hadoop.util.functional.FutureIO;
4444
import org.slf4j.Logger;
4545
import org.slf4j.LoggerFactory;
4646

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
48+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
49+
4750
/**
4851
* Treats keys as offset in file and value as line.
4952
*/
@@ -109,10 +112,14 @@ public LineRecordReader(Configuration job, FileSplit split,
109112
// open the file and seek to the start of the split
110113
final FutureDataInputStreamBuilder builder =
111114
file.getFileSystem(job).openFile(file);
112-
FutureIOSupport.propagateOptions(builder, job,
115+
// the start and end of the split may be used to build
116+
// an input strategy.
117+
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
118+
.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
119+
FutureIO.propagateOptions(builder, job,
113120
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
114121
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
115-
fileIn = FutureIOSupport.awaitFuture(builder.build());
122+
fileIn = FutureIO.awaitFuture(builder.build());
116123
if (isCompressedInput()) {
117124
decompressor = CodecPool.getDecompressor(codec);
118125
if (codec instanceof SplittableCompressionCodec) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2929
import org.apache.hadoop.fs.Path;
3030
import org.apache.hadoop.fs.Seekable;
31-
import org.apache.hadoop.fs.impl.FutureIOSupport;
3231
import org.apache.hadoop.io.BytesWritable;
3332
import org.apache.hadoop.io.LongWritable;
3433
import org.apache.hadoop.io.compress.CodecPool;
@@ -40,6 +39,8 @@
4039
import org.apache.hadoop.mapreduce.MRJobConfig;
4140
import org.apache.hadoop.mapreduce.RecordReader;
4241
import org.apache.hadoop.mapreduce.TaskAttemptContext;
42+
import org.apache.hadoop.util.functional.FutureIO;
43+
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

@@ -94,10 +95,10 @@ public void initialize(Configuration job, long splitStart, long splitLength,
9495
// open the file
9596
final FutureDataInputStreamBuilder builder =
9697
file.getFileSystem(job).openFile(file);
97-
FutureIOSupport.propagateOptions(builder, job,
98+
FutureIO.propagateOptions(builder, job,
9899
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
99100
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
100-
fileIn = FutureIOSupport.awaitFuture(builder.build());
101+
fileIn = FutureIO.awaitFuture(builder.build());
101102

102103
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
103104
if (null != codec) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2828
import org.apache.hadoop.fs.Path;
2929
import org.apache.hadoop.fs.Seekable;
30-
import org.apache.hadoop.fs.impl.FutureIOSupport;
3130
import org.apache.hadoop.io.LongWritable;
3231
import org.apache.hadoop.io.Text;
3332
import org.apache.hadoop.io.compress.CodecPool;
@@ -40,9 +39,14 @@
4039
import org.apache.hadoop.mapreduce.MRJobConfig;
4140
import org.apache.hadoop.mapreduce.RecordReader;
4241
import org.apache.hadoop.mapreduce.TaskAttemptContext;
42+
import org.apache.hadoop.util.functional.FutureIO;
43+
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
48+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
49+
4650
/**
4751
* Treats keys as offset in file and value as line.
4852
*/
@@ -86,10 +90,14 @@ public void initialize(InputSplit genericSplit,
8690
// open the file and seek to the start of the split
8791
final FutureDataInputStreamBuilder builder =
8892
file.getFileSystem(job).openFile(file);
89-
FutureIOSupport.propagateOptions(builder, job,
93+
// the start and end of the split may be used to build
94+
// an input strategy.
95+
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
96+
builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
97+
FutureIO.propagateOptions(builder, job,
9098
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
9199
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
92-
fileIn = FutureIOSupport.awaitFuture(builder.build());
100+
fileIn = FutureIO.awaitFuture(builder.build());
93101

94102
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
95103
if (null!=codec) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.hadoop.fs.FileStatus;
3030
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3131
import org.apache.hadoop.fs.Path;
32-
import org.apache.hadoop.fs.impl.FutureIOSupport;
3332
import org.apache.hadoop.io.LongWritable;
3433
import org.apache.hadoop.io.Text;
3534
import org.apache.hadoop.mapreduce.InputSplit;
@@ -39,6 +38,7 @@
3938
import org.apache.hadoop.mapreduce.RecordReader;
4039
import org.apache.hadoop.mapreduce.TaskAttemptContext;
4140
import org.apache.hadoop.util.LineReader;
41+
import org.apache.hadoop.util.functional.FutureIO;
4242

4343
/**
4444
* NLineInputFormat which splits N lines of input as one split.
@@ -99,10 +99,10 @@ public static List<FileSplit> getSplitsForFile(FileStatus status,
9999
try {
100100
final FutureDataInputStreamBuilder builder =
101101
fileName.getFileSystem(conf).openFile(fileName);
102-
FutureIOSupport.propagateOptions(builder, conf,
102+
FutureIO.propagateOptions(builder, conf,
103103
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
104104
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
105-
FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build());
105+
FSDataInputStream in = FutureIO.awaitFuture(builder.build());
106106
lr = new LineReader(in, conf);
107107
Text line = new Text();
108108
int numLines = 0;

hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.conf.Configuration;
2828
import org.apache.hadoop.fs.FSDataInputStream;
2929
import org.apache.hadoop.fs.FileSystem;
30+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3031
import org.apache.hadoop.fs.Path;
3132
import org.apache.hadoop.io.Text;
3233
import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +42,12 @@
4142
import org.apache.hadoop.util.IndexedSortable;
4243
import org.apache.hadoop.util.QuickSort;
4344
import org.apache.hadoop.util.StringUtils;
45+
import org.apache.hadoop.util.functional.FutureIO;
46+
47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
48+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
49+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
50+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
4451

4552
/**
4653
* An input format that reads the first 10 characters of each line as the key
@@ -224,12 +231,17 @@ public void initialize(InputSplit split, TaskAttemptContext context)
224231
throws IOException, InterruptedException {
225232
Path p = ((FileSplit)split).getPath();
226233
FileSystem fs = p.getFileSystem(context.getConfiguration());
227-
in = fs.open(p);
228234
long start = ((FileSplit)split).getStart();
229235
// find the offset to start at a record boundary
230236
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
231-
in.seek(start + offset);
232237
length = ((FileSplit)split).getLength();
238+
final FutureDataInputStreamBuilder builder = fs.openFile(p)
239+
.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
240+
.opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
241+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
242+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
243+
in = FutureIO.awaitFuture(builder.build());
244+
in.seek(start + offset);
233245
}
234246

235247
public void close() throws IOException {

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@
5252

5353
import org.apache.hadoop.classification.VisibleForTesting;
5454

55+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
56+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
5557
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
58+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
5659

5760
/**
5861
* This class extends RetriableCommand to implement the copy of files,
@@ -362,7 +365,11 @@ private static ThrottledInputStream getInputStream(Path path,
362365
FileSystem fs = path.getFileSystem(conf);
363366
float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
364367
DistCpConstants.DEFAULT_BANDWIDTH_MB);
365-
FSDataInputStream in = fs.open(path);
368+
// open with sequential read, but not whole-file
369+
FSDataInputStream in = awaitFuture(fs.openFile(path)
370+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
371+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
372+
.build());
366373
return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
367374
}
368375
catch (IOException e) {

hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hadoop.fs.FileSystem;
2727
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2828
import org.apache.hadoop.fs.Path;
29-
import org.apache.hadoop.fs.impl.FutureIOSupport;
3029
import org.apache.hadoop.io.Text;
3130
import org.apache.hadoop.mapreduce.InputSplit;
3231
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -35,6 +34,7 @@
3534
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
3635
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
3736
import org.apache.hadoop.streaming.StreamUtil;
37+
import org.apache.hadoop.util.functional.FutureIO;
3838

3939
/**
4040
* An input format that selects a RecordReader based on a JobConf property. This
@@ -66,10 +66,10 @@ public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
6666
FileSystem fs = path.getFileSystem(conf);
6767
// open the file
6868
final FutureDataInputStreamBuilder builder = fs.openFile(path);
69-
FutureIOSupport.propagateOptions(builder, conf,
69+
FutureIO.propagateOptions(builder, conf,
7070
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
7171
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
72-
FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build());
72+
FSDataInputStream in = FutureIO.awaitFuture(builder.build());
7373

7474
// Factory dispatch based on available params..
7575
Class readerClass;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.hadoop.fs.FSDataInputStream;
5757
import org.apache.hadoop.fs.FSDataOutputStream;
5858
import org.apache.hadoop.fs.FileContext;
59+
import org.apache.hadoop.fs.FileStatus;
5960
import org.apache.hadoop.fs.Options;
6061
import org.apache.hadoop.fs.Path;
6162
import org.apache.hadoop.fs.permission.FsPermission;
@@ -77,6 +78,11 @@
7778
import org.apache.hadoop.classification.VisibleForTesting;
7879
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
7980

81+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
82+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
83+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
84+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
85+
8086
@Public
8187
@Evolving
8288
public class AggregatedLogFormat {
@@ -576,9 +582,16 @@ public LogReader(Configuration conf, Path remoteAppLogFile)
576582
try {
577583
FileContext fileContext =
578584
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
579-
this.fsDataIStream = fileContext.open(remoteAppLogFile);
585+
FileStatus status = fileContext.getFileStatus(remoteAppLogFile);
586+
this.fsDataIStream = awaitFuture(
587+
fileContext.openFile(remoteAppLogFile)
588+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
589+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
590+
.opt(FS_OPTION_OPENFILE_LENGTH,
591+
status.getLen()) // file length hint for object stores
592+
.build());
580593
reader = new TFile.Reader(this.fsDataIStream,
581-
fileContext.getFileStatus(remoteAppLogFile).getLen(), conf);
594+
status.getLen(), conf);
582595
this.scanner = reader.createScanner();
583596
} catch (IOException ioe) {
584597
close();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@
6060
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
6161
import org.apache.hadoop.yarn.exceptions.YarnException;
6262

63-
/**
63+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
64+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
65+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
66+
67+
/**
6468
* Download a single URL to the local disk.
6569
*
6670
*/
@@ -285,23 +289,25 @@ private void verifyAndCopy(Path destination)
285289
}
286290
}
287291

288-
downloadAndUnpack(sCopy, destination);
292+
downloadAndUnpack(sCopy, sStat, destination);
289293
}
290294

291295
/**
292296
* Copy source path to destination with localization rules.
293-
* @param source source path to copy. Typically HDFS
297+
* @param source source path to copy. Typically HDFS or an object store.
298+
* @param sourceStatus status of source
294299
* @param destination destination path. Typically local filesystem
295300
* @exception YarnException Any error has occurred
296301
*/
297-
private void downloadAndUnpack(Path source, Path destination)
302+
private void downloadAndUnpack(Path source,
303+
FileStatus sourceStatus, Path destination)
298304
throws YarnException {
299305
try {
300306
FileSystem sourceFileSystem = source.getFileSystem(conf);
301307
FileSystem destinationFileSystem = destination.getFileSystem(conf);
302-
if (sourceFileSystem.getFileStatus(source).isDirectory()) {
308+
if (sourceStatus.isDirectory()) {
303309
FileUtil.copy(
304-
sourceFileSystem, source,
310+
sourceFileSystem, sourceStatus,
305311
destinationFileSystem, destination, false,
306312
true, conf);
307313
} else {
@@ -329,7 +335,11 @@ private void unpack(Path source, Path destination,
329335
FileSystem sourceFileSystem,
330336
FileSystem destinationFileSystem)
331337
throws IOException, InterruptedException, ExecutionException {
332-
try (InputStream inputStream = sourceFileSystem.open(source)) {
338+
try (InputStream inputStream = awaitFuture(
339+
sourceFileSystem.openFile(source)
340+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
341+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
342+
.build())) {
333343
File dst = new File(destination.toUri());
334344
String lowerDst = StringUtils.toLowerCase(dst.getName());
335345
switch (resource.getType()) {

0 commit comments

Comments
 (0)