Skip to content

Commit 0dad2aa

Browse files
committed
HADOOP-19131. changing names; adding avro for a read policy
...and mapping to sequential in s3a Change-Id: Iebcac57a4b4ff3fc6edc82f120a7448635bd61bc
1 parent 827b41c commit 0dad2aa

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,11 @@ private OpenFileOptions() {
598598
public static final String FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE =
599599
"adaptive";
600600

601+
/**
602+
* We are an avro file: {@value}.
603+
*/
604+
public static final String FS_OPTION_OPENFILE_READ_POLICY_AVRO = "avro" ;
605+
601606
/**
602607
* This is a columnar file format.
603608
* Do whatever is needed to optimize for it: {@value}.
@@ -649,12 +654,14 @@ private OpenFileOptions() {
649654
public static final String FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE =
650655
"whole-file";
651656

657+
652658
/**
653659
* All the current read policies as a set.
654660
*/
655661
public static final Set<String> FS_OPTION_OPENFILE_READ_POLICIES =
656662
Collections.unmodifiableSet(Stream.of(
657663
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE,
664+
FS_OPTION_OPENFILE_READ_POLICY_AVRO,
658665
FS_OPTION_OPENFILE_READ_POLICY_DEFAULT,
659666
FS_OPTION_OPENFILE_READ_POLICY_ORC,
660667
FS_OPTION_OPENFILE_READ_POLICY_PARQUET,

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedOperations.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
4545
import org.apache.hadoop.util.functional.FutureIO;
4646

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
4748
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
4849
import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
4950
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
@@ -79,7 +80,7 @@ public static boolean hasPathCapability(FileSystem fs, Path path, String capabil
7980
* @param capability capability string
8081
* @return true iff the capability is declared available.
8182
*/
82-
public static boolean hasStreamCapability(Object o, String capability) {
83+
public static boolean streamCapabilities_hasStreamCapability(Object o, String capability) {
8384
if (o instanceof StreamCapabilities) {
8485
return false;
8586
}
@@ -98,11 +99,12 @@ public static boolean hasStreamCapability(Object o, String capability) {
9899
* @throws IOException for any failure.
99100
*/
100101
@InterfaceStability.Stable
101-
public static FSDataInputStream filesystemOpenFile(
102+
public static FSDataInputStream fileSystem_openFile(
102103
final FileSystem fs,
103104
final Path path,
104105
final String policy,
105106
final FileStatus status,
107+
final Long length,
106108
final Map<String, String> options) throws IOException {
107109
final FutureDataInputStreamBuilder builder = fs.openFile(path);
108110
if (policy != null) {
@@ -111,6 +113,9 @@ public static FSDataInputStream filesystemOpenFile(
111113
if (status != null) {
112114
builder.withFileStatus(status);
113115
}
116+
if (length != null) {
117+
builder.opt(FS_OPTION_OPENFILE_LENGTH, Long.toString(length));
118+
}
114119
if (options != null) {
115120
// add all the options map entries
116121
options.forEach(builder::opt);
@@ -128,7 +133,7 @@ public static FSDataInputStream filesystemOpenFile(
128133
* @return a path to the enclosing root
129134
* @throws IOException early checks like failure to resolve path cause IO failures
130135
*/
131-
public static Path filesystemGetEnclosingRoot(FileSystem fs, Path path) throws IOException {
136+
public static Path fileSystem_getEnclosingRoot(FileSystem fs, Path path) throws IOException {
132137
return fs.getEnclosingRoot(path);
133138
}
134139

@@ -316,7 +321,7 @@ public static Serializable iostatisticsContextSnapshot() {
316321
* the interface or, if when invoked, it is raised.
317322
* Note: that is the default behaviour of {@link FSDataInputStream#readFully(long, ByteBuffer)}.
318323
*/
319-
public static void byteBufferPositionedReadableReadFully(
324+
public static void byteBufferPositionedReadable_readFully(
320325
Object in,
321326
long position,
322327
ByteBuffer buf)
@@ -335,18 +340,18 @@ public static void byteBufferPositionedReadableReadFully(
335340
* @return true if the stream implements the interface (including a wrapped stream)
336341
* and that it declares the stream capability.
337342
*/
338-
public static boolean byteBufferPositionedReadableReadFullyAvailable(
343+
public static boolean byteBufferPositionedReadable_readFullyAvailable(
339344
Object in) {
340345
if (!(in instanceof ByteBufferPositionedReadable)) {
341346
return false;
342347
}
343348
if (in instanceof FSDataInputStream) {
344349
// ask the wrapped stream.
345-
return byteBufferPositionedReadableReadFullyAvailable(
350+
return byteBufferPositionedReadable_readFullyAvailable(
346351
((FSDataInputStream) in).getWrappedStream());
347352
}
348353
// now rely on the input stream implementing path capabilities, which
349354
// all the Hadoop FS implementations do.
350-
return hasStreamCapability(in, StreamCapabilities.PREADBYTEBUFFER);
355+
return streamCapabilities_hasStreamCapability(in, StreamCapabilities.PREADBYTEBUFFER);
351356
}
352357
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

2828
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_AVRO;
2930
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
3031
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
3132
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
@@ -109,6 +110,7 @@ public static S3AInputPolicy getPolicy(
109110
case FS_OPTION_OPENFILE_READ_POLICY_PARQUET:
110111
return Random;
111112

113+
case FS_OPTION_OPENFILE_READ_POLICY_AVRO:
112114
case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL:
113115
case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE:
114116
return Sequential;

0 commit comments

Comments
 (0)