Skip to content

Commit 2bac1d4

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-16202. Enhanced openFile(): hadoop-common changes. (apache#2584/1)
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
1 parent 99567ed commit 2bac1d4

36 files changed

+5673
-648
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

28+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
30+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
31+
2832
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
2933
@InterfaceAudience.Public
3034
@InterfaceStability.Stable
@@ -42,7 +46,12 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
4246
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
4347
FileStatus status = fc.getFileStatus(p);
4448
this.len = status.getLen();
45-
this.stream = fc.open(p);
49+
this.stream = awaitFuture(fc.openFile(p)
50+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
51+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
52+
.withFileStatus(status)
53+
.build());
54+
fc.open(p);
4655
}
4756

4857
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,35 @@
2424
import java.io.InputStream;
2525
import java.nio.channels.ClosedChannelException;
2626
import java.util.Arrays;
27+
import java.util.EnumSet;
2728
import java.util.List;
29+
import java.util.concurrent.CompletableFuture;
2830

29-
import com.google.common.base.Preconditions;
31+
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
3032
import org.apache.hadoop.classification.InterfaceAudience;
3133
import org.apache.hadoop.classification.InterfaceStability;
3234
import org.apache.hadoop.conf.Configuration;
35+
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
36+
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
37+
import org.apache.hadoop.fs.impl.OpenFileParameters;
3338
import org.apache.hadoop.fs.permission.AclEntry;
3439
import org.apache.hadoop.fs.permission.FsPermission;
40+
import org.apache.hadoop.fs.statistics.IOStatistics;
41+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
42+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
3543
import org.apache.hadoop.util.DataChecksum;
44+
import org.apache.hadoop.util.LambdaUtils;
3645
import org.apache.hadoop.util.Progressable;
3746

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
3848
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
49+
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
3950

4051
/****************************************************************
4152
* Abstract Checksumed FileSystem.
4253
* It provide a basic implementation of a Checksumed FileSystem,
4354
* which creates a checksum file for each raw file.
44-
* It generates & verifies checksums at the client side.
55+
* It generates & verifies checksums at the client side.
4556
*
4657
*****************************************************************/
4758
@InterfaceAudience.Public
@@ -127,7 +138,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
127138
* For open()'s FSInputStream
128139
* It verifies that data matches checksums.
129140
*******************************************************/
130-
private static class ChecksumFSInputChecker extends FSInputChecker {
141+
private static class ChecksumFSInputChecker extends FSInputChecker implements
142+
IOStatisticsSource {
131143
private ChecksumFileSystem fs;
132144
private FSDataInputStream datas;
133145
private FSDataInputStream sums;
@@ -263,6 +275,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
263275
}
264276
return nread;
265277
}
278+
279+
/**
280+
* Get the IO Statistics of the nested stream, falling back to
281+
* null if the stream does not implement the interface
282+
* {@link IOStatisticsSource}.
283+
* @return an IOStatistics instance or null
284+
*/
285+
@Override
286+
public IOStatistics getIOStatistics() {
287+
return IOStatisticsSupport.retrieveIOStatistics(datas);
288+
}
266289
}
267290

268291
private static class FSDataBoundedInputStream extends FSDataInputStream {
@@ -367,6 +390,12 @@ public boolean truncate(Path f, long newLength) throws IOException {
367390
+ "by ChecksumFileSystem");
368391
}
369392

393+
@Override
394+
public void concat(final Path f, final Path[] psrcs) throws IOException {
395+
throw new UnsupportedOperationException("Concat is not supported "
396+
+ "by ChecksumFileSystem");
397+
}
398+
370399
/**
371400
* Calculated the length of the checksum file in bytes.
372401
* @param size the length of the data file in bytes
@@ -382,7 +411,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {
382411

383412
/** This class provides an output stream for a checksummed file.
384413
* It generates checksums for data. */
385-
private static class ChecksumFSOutputSummer extends FSOutputSummer {
414+
private static class ChecksumFSOutputSummer extends FSOutputSummer
415+
implements IOStatisticsSource, StreamCapabilities {
386416
private FSDataOutputStream datas;
387417
private FSDataOutputStream sums;
388418
private static final float CHKSUM_AS_FRACTION = 0.01f;
@@ -436,6 +466,31 @@ protected void checkClosed() throws IOException {
436466
throw new ClosedChannelException();
437467
}
438468
}
469+
470+
/**
471+
* Get the IO Statistics of the nested stream, falling back to
472+
* null if the stream does not implement the interface
473+
* {@link IOStatisticsSource}.
474+
* @return an IOStatistics instance or null
475+
*/
476+
@Override
477+
public IOStatistics getIOStatistics() {
478+
return IOStatisticsSupport.retrieveIOStatistics(datas);
479+
}
480+
481+
/**
482+
* Probe the inner stream for a capability.
483+
* Syncable operations are rejected before being passed down.
484+
* @param capability string to query the stream support for.
485+
* @return true if a capability is known to be supported.
486+
*/
487+
@Override
488+
public boolean hasCapability(final String capability) {
489+
if (isProbeForSyncable(capability)) {
490+
return false;
491+
}
492+
return datas.hasCapability(capability);
493+
}
439494
}
440495

441496
@Override
@@ -486,6 +541,32 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
486541
blockSize, progress);
487542
}
488543

544+
@Override
545+
public FSDataOutputStream create(final Path f,
546+
final FsPermission permission,
547+
final EnumSet<CreateFlag> flags,
548+
final int bufferSize,
549+
final short replication,
550+
final long blockSize,
551+
final Progressable progress,
552+
final Options.ChecksumOpt checksumOpt) throws IOException {
553+
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
554+
bufferSize, replication, blockSize, progress);
555+
}
556+
557+
@Override
558+
public FSDataOutputStream createNonRecursive(final Path f,
559+
final FsPermission permission,
560+
final EnumSet<CreateFlag> flags,
561+
final int bufferSize,
562+
final short replication,
563+
final long blockSize,
564+
final Progressable progress) throws IOException {
565+
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
566+
false, bufferSize, replication,
567+
blockSize, progress);
568+
}
569+
489570
abstract class FsOperation {
490571
boolean run(Path p) throws IOException {
491572
boolean status = apply(p);
@@ -783,6 +864,59 @@ public boolean reportChecksumFailure(Path f, FSDataInputStream in,
783864
return false;
784865
}
785866

867+
/**
868+
* This is overridden to ensure that this class's
869+
* {@link #openFileWithOptions}() method is called, and so ultimately
870+
* its {@link #open(Path, int)}.
871+
*
872+
* {@inheritDoc}
873+
*/
874+
@Override
875+
public FutureDataInputStreamBuilder openFile(final Path path)
876+
throws IOException, UnsupportedOperationException {
877+
return ((FutureDataInputStreamBuilderImpl)
878+
createDataInputStreamBuilder(this, path)).getThisBuilder();
879+
}
880+
881+
/**
882+
* Open the file as a blocking call to {@link #open(Path, int)}.
883+
*
884+
* {@inheritDoc}
885+
*/
886+
@Override
887+
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
888+
final Path path,
889+
final OpenFileParameters parameters) throws IOException {
890+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
891+
parameters.getMandatoryKeys(),
892+
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
893+
"for " + path);
894+
return LambdaUtils.eval(
895+
new CompletableFuture<>(),
896+
() -> open(path, parameters.getBufferSize()));
897+
}
898+
899+
/**
900+
* This is overridden to ensure that this class's create() method is
901+
* ultimately called.
902+
*
903+
* {@inheritDoc}
904+
*/
905+
public FSDataOutputStreamBuilder createFile(Path path) {
906+
return createDataOutputStreamBuilder(this, path)
907+
.create().overwrite(true);
908+
}
909+
910+
/**
911+
* This is overridden to ensure that this class's create() method is
912+
* ultimately called.
913+
*
914+
* {@inheritDoc}
915+
*/
916+
public FSDataOutputStreamBuilder appendFile(Path path) {
917+
return createDataOutputStreamBuilder(this, path).append();
918+
}
919+
786920
/**
787921
* Disable those operations which the checksummed FS blocks.
788922
* {@inheritDoc}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
6161
*/
6262
B opt(@Nonnull String key, float value);
6363

64+
/**
65+
* Set optional long parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, long value);
70+
6471
/**
6572
* Set optional double parameter for the Builder.
6673
*
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
104111
*/
105112
B must(@Nonnull String key, float value);
106113

114+
/**
115+
* Set mandatory long option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, long value);
120+
107121
/**
108122
* Set mandatory double option.
109123
*

0 commit comments

Comments
 (0)