Skip to content

Commit abaa695

Browse files
committed
PARQUET-400: Fix review items.
This commit: * Makes SeekableInputStream implementations package private * Adds tests for the H2 stream wrapper * Adds javadoc to SeekableInputStream * Simplifies the wrap method by checking the underlying stream
1 parent 5dc50a5 commit abaa695

File tree

7 files changed

+592
-131
lines changed

7 files changed

+592
-131
lines changed

parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,88 @@
1919

2020
package org.apache.parquet.io;
2121

22+
import java.io.EOFException;
2223
import java.io.IOException;
2324
import java.io.InputStream;
2425
import java.nio.ByteBuffer;
2526

27+
/**
28+
* {@code SeekableInputStream} is an interface with the methods needed by
29+
* Parquet to read data from a file or Hadoop data stream.
30+
*/
2631
public abstract class SeekableInputStream extends InputStream {
2732

33+
/**
34+
* Return the current position in the InputStream.
35+
*
36+
* @return current position in bytes from the start of the stream
37+
* @throws IOException If the underlying stream throws IOException
38+
*/
2839
public abstract long getPos() throws IOException;
2940

41+
/**
42+
* Seek to a new position in the InputStream.
43+
*
44+
* @param newPos the new position to seek to
45+
* @throws IOException If the underlying stream throws IOException
46+
*/
3047
public abstract void seek(long newPos) throws IOException;
3148

49+
/**
50+
* Read a byte array of data, from position 0 to the end of the array.
51+
* <p>
52+
* This method is equivalent to {@code read(bytes, 0, bytes.length)}.
53+
* <p>
54+
* This method will block until len bytes are available to copy into the
55+
* array, or will throw {@link EOFException} if the stream ends before the
56+
* array is full.
57+
*
58+
* @param bytes a byte array to fill with data from the stream
59+
* @throws IOException If the underlying stream throws IOException
60+
* @throws EOFException If the stream has fewer bytes left than are needed to
61+
* fill the array, {@code bytes.length}
62+
*/
3263
public abstract void readFully(byte[] bytes) throws IOException;
3364

65+
/**
66+
* Read {@code len} bytes of data into an array, at position {@code start}.
67+
* <p>
68+
* This method will block until len bytes are available to copy into the
69+
* array, or will throw {@link EOFException} if the stream ends before the
70+
* array is full.
71+
*
72+
* @param bytes a byte array to fill with data from the stream
73+
* @throws IOException If the underlying stream throws IOException
74+
* @throws EOFException If the stream has fewer than {@code len} bytes left
75+
*/
3476
public abstract void readFully(byte[] bytes, int start, int len) throws IOException;
3577

78+
/**
79+
* Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
80+
* <p>
81+
* This method will copy available bytes into the buffer, reading at most
82+
* {@code buf.remaining()} bytes. The number of bytes actually copied is
83+
* returned by the method, or -1 is returned to signal that the end of the
84+
* underlying stream has been reached.
85+
*
86+
* @param buf a byte array to fill with data from the stream
87+
* @return the number of bytes read or -1 if the stream ended
88+
* @throws IOException If the underlying stream throws IOException
89+
*/
3690
public abstract int read(ByteBuffer buf) throws IOException;
3791

92+
/**
93+
* Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
94+
* <p>
95+
* This method will block until {@code buf.remaining()} bytes are available
96+
* to copy into the buffer, or will throw {@link EOFException} if the stream
97+
* ends before the buffer is full.
98+
*
99+
* @param buf a byte array to fill with data from the stream
100+
* @throws IOException If the underlying stream throws IOException
101+
* @throws EOFException If the stream has fewer bytes left than are needed to
102+
* fill the buffer, {@code buf.remaining()}
103+
*/
38104
public abstract void readFully(ByteBuffer buf) throws IOException;
39105

40106
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
import java.io.IOException;
2626
import java.nio.ByteBuffer;
2727

28-
public class H1SeekableInputStream extends SeekableInputStream {
28+
/**
29+
* SeekableInputStream implementation that implements read(ByteBuffer) for
30+
* Hadoop 1 FSDataInputStream.
31+
*/
32+
class H1SeekableInputStream extends SeekableInputStream {
2933

3034
private final int COPY_BUFFER_SIZE = 8192;
3135
private final byte[] temp = new byte[COPY_BUFFER_SIZE];

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
import java.io.IOException;
2828
import java.nio.ByteBuffer;
2929

30-
public class H2SeekableInputStream extends SeekableInputStream {
30+
/**
31+
* SeekableInputStream implementation for FSDataInputStream that implements
32+
* ByteBufferReadable in Hadoop 2.
33+
*/
34+
class H2SeekableInputStream extends SeekableInputStream {
3135

3236
private final FSDataInputStream stream;
3337

@@ -74,11 +78,15 @@ public int read(ByteBuffer buf) throws IOException {
7478

7579
@Override
7680
public void readFully(ByteBuffer buf) throws IOException {
81+
readFully(stream, buf);
82+
}
83+
84+
public static void readFully(FSDataInputStream stream, ByteBuffer buf) throws IOException {
7785
// unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read
7886
// calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
7987
// have to loop to ensure we read them.
8088
while (buf.hasRemaining()) {
81-
int readCount = read(buf);
89+
int readCount = stream.read(buf);
8290
if (readCount == -1) {
8391
// this is probably a bug in the ParquetReader. We shouldn't have called readFully with a buffer
8492
// that has more remaining than the amount of data in the stream.

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,29 @@
2323
import org.apache.parquet.Log;
2424
import org.apache.parquet.io.ParquetDecodingException;
2525
import org.apache.parquet.io.SeekableInputStream;
26-
import java.io.IOException;
2726
import java.lang.reflect.Constructor;
2827
import java.lang.reflect.InvocationTargetException;
29-
import java.nio.ByteBuffer;
3028

29+
/**
30+
* Convenience methods to get Parquet abstractions for Hadoop data streams.
31+
*/
3132
public class HadoopStreams {
3233

3334
private static final Log LOG = Log.getLog(SeekableInputStream.class);
3435

35-
private static final Class<?> byteBufferReadableClass;
36-
37-
private static final Constructor<SeekableInputStream> h2SeekableConstructor;
38-
39-
static {
40-
byteBufferReadableClass = getReadableClass();
41-
h2SeekableConstructor = getH2SeekableConstructor();
42-
}
36+
private static final Class<?> byteBufferReadableClass = getReadableClass();
37+
private static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor();
4338

39+
/**
40+
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
41+
* implementation for Parquet readers.
42+
*
43+
* @param stream a Hadoop FSDataInputStream
44+
* @return a SeekableInputStream
45+
*/
4446
public static SeekableInputStream wrap(FSDataInputStream stream) {
4547
if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
46-
byteBufferReadableClass.isInstance(stream) &&
47-
supportsByteBufferReads(stream)) {
48+
byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
4849
try {
4950
return h2SeekableConstructor.newInstance(stream);
5051
} catch (InstantiationException e) {
@@ -62,23 +63,6 @@ public static SeekableInputStream wrap(FSDataInputStream stream) {
6263
}
6364
}
6465

65-
private static final ByteBuffer ZERO_LEN_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
66-
67-
private static boolean supportsByteBufferReads(FSDataInputStream stream) {
68-
// FSDataInputStream implements ByteBufferReadable, but may throw
69-
// UnsupportedOperationException if it isn't actually supported. This tests
70-
// whether the method throws by trying to read a 0-length buffer, which has
71-
// no effect on the stream when it is supported.
72-
try {
73-
stream.read(ZERO_LEN_BYTE_BUFFER);
74-
return true;
75-
} catch (UnsupportedOperationException e) {
76-
return false;
77-
} catch (IOException e) {
78-
return false;
79-
}
80-
}
81-
8266
private static Class<?> getReadableClass() {
8367
try {
8468
return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.hadoop.util;
21+
22+
import org.apache.hadoop.fs.ByteBufferReadable;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
26+
public class H2MockInputStream extends MockInputStream implements ByteBufferReadable {
27+
public H2MockInputStream(int... actualReadLengths) {
28+
super(actualReadLengths);
29+
}
30+
31+
@Override
32+
public int read(ByteBuffer buf) throws IOException {
33+
// this is inefficient, but simple for correctness tests of
34+
// readFully(ByteBuffer)
35+
byte[] temp = new byte[buf.remaining()];
36+
int bytesRead = read(temp, 0, temp.length);
37+
if (bytesRead > 0) {
38+
buf.put(temp, 0, bytesRead);
39+
}
40+
return bytesRead;
41+
}
42+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.hadoop.util;
21+
22+
import org.apache.hadoop.fs.PositionedReadable;
23+
import org.apache.hadoop.fs.Seekable;
24+
import java.io.ByteArrayInputStream;
25+
import java.io.IOException;
26+
27+
class MockInputStream extends ByteArrayInputStream
28+
implements Seekable, PositionedReadable {
29+
static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
30+
31+
private int[] lengths;
32+
private int current = 0;
33+
MockInputStream(int... actualReadLengths) {
34+
super(TEST_ARRAY);
35+
this.lengths = actualReadLengths;
36+
}
37+
38+
@Override
39+
public synchronized int read(byte[] b, int off, int len) {
40+
if (current < lengths.length) {
41+
if (len <= lengths[current]) {
42+
// when len == lengths[current], the next read will by 0 bytes
43+
int bytesRead = super.read(b, off, len);
44+
lengths[current] -= bytesRead;
45+
return bytesRead;
46+
} else {
47+
int bytesRead = super.read(b, off, lengths[current]);
48+
current += 1;
49+
return bytesRead;
50+
}
51+
} else {
52+
return super.read(b, off, len);
53+
}
54+
}
55+
56+
@Override
57+
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
58+
seek(position);
59+
return read(buffer, offset, length);
60+
}
61+
62+
@Override
63+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
64+
throw new UnsupportedOperationException("Not actually supported.");
65+
}
66+
67+
@Override
68+
public void readFully(long position, byte[] buffer) throws IOException {
69+
throw new UnsupportedOperationException("Not actually supported.");
70+
}
71+
72+
@Override
73+
public void seek(long pos) throws IOException {
74+
this.pos = (int) pos;
75+
}
76+
77+
@Override
78+
public long getPos() throws IOException {
79+
return this.pos;
80+
}
81+
82+
@Override
83+
public boolean seekToNewSource(long targetPos) throws IOException {
84+
seek(targetPos);
85+
return true;
86+
}
87+
}

0 commit comments

Comments
 (0)