Skip to content

Commit c1e52f5

Browse files
committed
HADOOP-19131. Add ByteBufferPositionedReadable
...because parquet could use it reading footers. Change-Id: Ic09116005b76b1f2221a961c923df412718508cc
1 parent 3087358 commit c1e52f5

File tree

2 files changed

+83
-4
lines changed

2 files changed

+83
-4
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,14 @@ public int read(long position, ByteBuffer buf) throws IOException {
262262
"by " + in.getClass().getCanonicalName());
263263
}
264264

265+
/**
266+
* Delegate to the underlying stream.
267+
* @param position position within file
268+
* @param buf the ByteBuffer to receive the results of the read operation.
269+
* @throws IOException on a failure from the nested stream.
270+
* @throws UnsupportedOperationException if the inner stream does not
271+
* support this operation.
272+
*/
265273
@Override
266274
public void readFully(long position, ByteBuffer buf) throws IOException {
267275
if (in instanceof ByteBufferPositionedReadable) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedOperations.java

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,24 @@
1818

1919
package org.apache.hadoop.io.wrappedio;
2020

21+
import java.io.EOFException;
2122
import java.io.IOException;
2223
import java.io.Serializable;
2324
import java.io.UncheckedIOException;
25+
import java.nio.ByteBuffer;
2426
import java.util.HashMap;
2527
import java.util.Map;
2628
import javax.annotation.Nullable;
2729

2830
import org.apache.hadoop.classification.InterfaceAudience;
2931
import org.apache.hadoop.classification.InterfaceStability;
32+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
3033
import org.apache.hadoop.fs.FSDataInputStream;
3134
import org.apache.hadoop.fs.FileStatus;
3235
import org.apache.hadoop.fs.FileSystem;
3336
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3437
import org.apache.hadoop.fs.Path;
38+
import org.apache.hadoop.fs.StreamCapabilities;
3539
import org.apache.hadoop.fs.statistics.IOStatistics;
3640
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
3741
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
@@ -60,6 +64,28 @@ public final class WrappedOperations {
6064
private WrappedOperations() {
6165
}
6266

67+
public static boolean hasPathCapability(FileSystem fs, Path path, String capability) {
68+
try {
69+
return fs.hasPathCapability(path, capability);
70+
} catch (IOException e) {
71+
return false;
72+
}
73+
}
74+
75+
/**
76+
* Does an object implement {@link StreamCapabilities} and, if so,
77+
* what is the result of the probe for the capability.
78+
* @param o object to probe
79+
* @param capability capability string
80+
* @return true iff the capability is declared available.
81+
*/
82+
public static boolean hasStreamCapability(Object o, String capability) {
83+
if (o instanceof StreamCapabilities) {
84+
return false;
85+
}
86+
return ((StreamCapabilities) o).hasCapability(capability);
87+
}
88+
6389
/**
6490
* OpenFile assistant, easy reflection-based access to
6591
* {@link FileSystem#openFile(Path)}.
@@ -251,31 +277,76 @@ public static Map<String, Long> iostatisticsSnapshotToMap(@Nullable Object sourc
251277
* This is either a thread-local value or a global empty context.
252278
* @return instance of {@link IOStatisticsContext}
253279
*/
254-
public Object iostatisticsContextGetCurrent() {
280+
public static Object iostatisticsContextGetCurrent() {
255281
return getCurrentIOStatisticsContext();
256282
}
257283

258284
/**
259285
* Static probe to check if the thread-level IO statistics enabled.
260286
* @return true if the thread-level IO statistics are enabled.
261287
*/
262-
public boolean iostatisticsContextEnabled() {
288+
public static boolean iostatisticsContextEnabled() {
263289
return IOStatisticsContext.enabled();
264290
}
265291

266292
/**
267293
* Reset the context's IOStatistics.
268294
*/
269-
public void iostatisticsContextReset() {
295+
public static void iostatisticsContextReset() {
270296
getCurrentIOStatisticsContext().reset();
271297
}
272298

273299
/**
274300
* Take a snapshot of the context IOStatistics.
275301
* @return an instance of {@link IOStatisticsSnapshot}.
276302
*/
277-
public Serializable iostatisticsContextSnapshot() {
303+
public static Serializable iostatisticsContextSnapshot() {
278304
return getCurrentIOStatisticsContext().snapshot();
279305
}
280306

307+
/**
308+
* Delegate to {@link ByteBufferPositionedReadable#read(long, ByteBuffer)}.
309+
* @param in input stream
310+
* @param position position within file
311+
* @param buf the ByteBuffer to receive the results of the read operation.
312+
* @throws IOException if there is some error performing the read
313+
* @throws EOFException the end of the data was reached before
314+
* the read operation completed
315+
* @throws UnsupportedOperationException if the input doesn't implement
316+
* the interface or, if when invoked, it is raised.
317+
* Note: that is the default behaviour of {@link FSDataInputStream#readFully(long, ByteBuffer)}.
318+
*/
319+
public static void byteBufferPositionedReadableReadFully(
320+
Object in,
321+
long position,
322+
ByteBuffer buf)
323+
throws IOException {
324+
if (!(in instanceof ByteBufferPositionedReadable)) {
325+
throw new UnsupportedOperationException("Not a ByteBufferPositionedReadable: " + in);
326+
}
327+
328+
((ByteBufferPositionedReadable) in).readFully(position, buf);
329+
}
330+
331+
/**
332+
* Probe to see if the input stream is an instance of ByteBufferPositionedReadable.
333+
* If the stream is an FSDataInputStream, the wrapped stream is checked.
334+
* @param in input stream
335+
* @return true if the stream implements the interface (including a wrapped stream)
336+
* and that it declares the stream capability.
337+
*/
338+
public static boolean byteBufferPositionedReadableReadFullyAvailable(
339+
Object in) {
340+
if (!(in instanceof ByteBufferPositionedReadable)) {
341+
return false;
342+
}
343+
if (in instanceof FSDataInputStream) {
344+
// ask the wrapped stream.
345+
return byteBufferPositionedReadableReadFullyAvailable(
346+
((FSDataInputStream) in).getWrappedStream());
347+
}
348+
// now rely on the input stream implementing path capabilities, which
349+
// all the Hadoop FS implementations do.
350+
return hasStreamCapability(in, StreamCapabilities.PREADBYTEBUFFER);
351+
}
281352
}

0 commit comments

Comments
 (0)