diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 7ac1706c7b..a6e1969e8d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -62,6 +62,7 @@ import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.hadoop.util.CompatibilityReader; import org.apache.parquet.hadoop.util.CompatibilityUtil; import org.apache.parquet.Log; @@ -102,6 +103,13 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + // configure if we want to use Hadoop's V2 read(bytebuffer) API. + // If true, we try to read using the new Hadoop read(ByteBuffer) api. This reads data into the provided + // byteBuffer and allows us to potentially take advantage zero-copy read path in Hadoop. + // Else, we skip and either read into the byteBuffer's array (if its present) or allocate one and copy + // into it. + public static String PARQUET_HADOOP_BYTEBUFFER_READ = "parquet.read.use.byte.buffer"; + private static ParquetMetadataConverter converter = new ParquetMetadataConverter(); /** @@ -487,6 +495,7 @@ public static ParquetFileReader open(Configuration conf, Path file, ParquetMetad private final FileMetaData fileMetaData; // may be null private final ByteBufferAllocator allocator; private final Configuration conf; + private final CompatibilityReader compatibilityReader; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -529,6 +538,7 @@ public ParquetFileReader( // the codec factory to get decompressors this.codecFactory = new CodecFactory(configuration, 0); this.allocator = new HeapByteBufferAllocator(); + this.compatibilityReader = CompatibilityUtil.getHadoopReader(conf.getBoolean(PARQUET_HADOOP_BYTEBUFFER_READ, true)); } /** @@ -561,6 +571,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t // the codec factory to get decompressors this.codecFactory = new CodecFactory(conf, 0); this.allocator = new HeapByteBufferAllocator(); + this.compatibilityReader = CompatibilityUtil.getHadoopReader(conf.getBoolean(PARQUET_HADOOP_BYTEBUFFER_READ, true)); } /** @@ -584,6 +595,7 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) // the codec factory to get decompressors this.codecFactory = new CodecFactory(conf, 0); this.allocator = new HeapByteBufferAllocator(); + this.compatibilityReader = CompatibilityUtil.getHadoopReader(conf.getBoolean(PARQUET_HADOOP_BYTEBUFFER_READ, true)); } public ParquetMetadata getFooter() { @@ -950,7 +962,7 @@ protected PageHeader readPageHeader() throws IOException { // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.byteBuf.rewind(); // resetting the buffer to the position before we got the error + this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. } @@ -1039,8 +1051,11 @@ public void addChunk(ChunkDescriptor descriptor) { public List readAll(FSDataInputStream f) throws IOException { List result = new ArrayList(chunks.size()); f.seek(offset); + + //Allocate the bytebuffer based on whether the FS can support it. ByteBuffer chunksByteBuffer = allocator.allocate(length); - CompatibilityUtil.getBuf(f, chunksByteBuffer, length); + compatibilityReader.readFully(f, chunksByteBuffer); + // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); int currentChunkOffset = 0; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReader.java new file mode 100644 index 0000000000..e093c53939 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FSDataInputStream; + +/** + * Allows us to use either Hadoop V1 / V2 read APIs to read data without reflection. HadoopV2 implementation + * of this interface resides in a module with Hadoop2 dependencies. + * Note: classes that implement this interface are instantiated using reflection and must thus have a + * default constructor. + */ +public interface CompatibilityReader { + + /** + * This method attempts to read into the provided readBuffer, readBuffer.remaining() bytes. + * @return Number of bytes read - should be readBuffer.remaining() + * @throws EOFException if readBuf.remaining() is greater than the number of bytes available to + * read on the FSDataInputStream f. + */ + int readFully(FSDataInputStream f, ByteBuffer readBuffer) throws IOException; +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReaderV1.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReaderV1.java new file mode 100644 index 0000000000..c54f24fe46 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReaderV1.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FSDataInputStream; + +/** + * Uses Hadoop V1's readFully(byte[], ...) APIs to read data. + */ +public class CompatibilityReaderV1 implements CompatibilityReader { + + @Override + public int readFully(FSDataInputStream f, ByteBuffer readBuf) throws IOException { + int res; + if (readBuf.hasArray()) { + res = readWithExistingArray(f, readBuf); + } else { + res = readWithNewArray(f, readBuf); + } + + return res; + } + + private int readWithExistingArray(FSDataInputStream f, ByteBuffer readBuf) throws IOException { + int initPos = readBuf.position(); + int remaining = readBuf.remaining(); + f.readFully(readBuf.array(), readBuf.arrayOffset(), remaining); + readBuf.position(initPos + remaining); + return remaining; + } + + private int readWithNewArray(FSDataInputStream f, ByteBuffer readBuf) throws IOException { + int remaining = readBuf.remaining(); + byte[] buf = new byte[remaining]; + f.readFully(buf); + readBuf.put(buf, 0, remaining); + return remaining; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java index bacf222a24..330e3d4474 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java @@ -18,97 +18,46 @@ */ package org.apache.parquet.hadoop.util; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.ShouldNeverHappenException; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import org.apache.parquet.Log; public class CompatibilityUtil { - // Will be set to true if the implementation of FSDataInputSteam supports - // the 2.x APIs, in particular reading using a provided ByteBuffer - private static boolean useV21; - public static final V21FileAPI fileAPI; + private static final String READER_V2_CLASS = "org.apache.parquet.hadoop.util.CompatibilityReaderV2"; - private static class V21FileAPI { - private final Method PROVIDE_BUF_READ_METHOD; - private final Class FSDataInputStreamCls; + private static final Log LOG = Log.getLog(CompatibilityUtil.class); - private V21FileAPI() throws ReflectiveOperationException { - final String PACKAGE = "org.apache.hadoop"; - FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream"); - PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class); + public static CompatibilityReader getHadoopReader(boolean useV2) { + if (!useV2) { + return new CompatibilityReaderV1(); } - } - - static { - // Test to see if a class from the Hadoop 2.x API is available - boolean v21 = true; - try { - Class.forName("org.apache.hadoop.io.compress.DirectDecompressor"); - } catch (ClassNotFoundException cnfe) { - v21 = false; - } - - useV21 = v21; - try { - if (v21) { - fileAPI = new V21FileAPI(); - } else { - fileAPI = null; - } - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e); + if (!isHadoop2x()) { + LOG.info("Can't read Hadoop 2x classes, will be using 1x read APIs"); + return new CompatibilityReaderV1(); } + + return newV2Reader(); } - private static Object invoke(Method method, String errorMsg, Object instance, Object... args) { + // Test to see if a class from the Hadoop 2.x API is available + // If it is, we try to instantiate the V2 CompatibilityReader. + private static boolean isHadoop2x() { + boolean v2 = true; try { - return method.invoke(instance, args); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException(errorMsg, e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException(errorMsg, e); + Class.forName("org.apache.hadoop.io.compress.DirectDecompressor"); + } catch (ClassNotFoundException cnfe) { + v2 = false; } + return v2; } - public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException { - int res; - if (useV21) { - try { - res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf); - } catch (InvocationTargetException e) { - if (e.getCause() instanceof UnsupportedOperationException) { - // the FSDataInputStream docs say specifically that implementations - // can choose to throw UnsupportedOperationException, so this should - // be a reasonable check to make to see if the interface is - // present but not implemented and we should be falling back - useV21 = false; - return getBuf(f, readBuf, maxSize); - } else if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - // To handle any cases where a Runtime exception occurs and provide - // some additional context information. A stacktrace would just give - // a line number, this at least tells them we were using the version - // of the read method designed for using a ByteBuffer. - throw new IOException("Error reading out of an FSDataInputStream " + - "using the Hadoop 2 ByteBuffer based read method.", e.getCause()); - } - } catch (IllegalAccessException e) { - // This method is public because it is defined in an interface, - // there should be no problems accessing it - throw new ShouldNeverHappenException(e); - } - } else { - byte[] buf = new byte[maxSize]; - res = f.read(buf); - readBuf.put(buf, 0, res); + private static CompatibilityReader newV2Reader() { + try { + Class reader = Class.forName(READER_V2_CLASS); + return (CompatibilityReader)reader.newInstance(); + } catch (ReflectiveOperationException e) { + LOG.warn("Unable to instantiate Hadoop V2 compatibility reader class: " + READER_V2_CLASS + " , will be using 1x read APIs", e); + return new CompatibilityReaderV1(); } - return res; } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestCompatibilityReaderV1.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestCompatibilityReaderV1.java new file mode 100644 index 0000000000..71fdf7a625 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestCompatibilityReaderV1.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.junit.Test; + +import junit.framework.Assert; + +public class TestCompatibilityReaderV1 { + + private static final byte [] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private static class MockInputStream extends ByteArrayInputStream + implements Seekable, PositionedReadable { + public MockInputStream(byte[] buf) { + super(buf); + } + + // empty implementation for unused methods + public int read(long position, byte[] buffer, int offset, int length) { return -1; } + public void readFully(long position, byte[] buffer, int offset, int length) {} + public void readFully(long position, byte[] buffer) {} + public void seek(long position) {} + public long getPos() { return 0; } + public boolean seekToNewSource(long targetPos) { return false; } + } + + // confirm writer version when flag = false + @Test + public void testReaderFlagOff() { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(false); + Assert.assertEquals("Incorrect CompatibilityReader instantiated", CompatibilityReaderV1.class, reader.getClass()); + } + + // confirm writer version when flag is true but we're on hadoop 1.x + @Test + public void testReaderFlagTrueHadoopV1() { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(true); + Assert.assertEquals("Incorrect CompatibilityReader instantiated", CompatibilityReaderV1.class, reader.getClass()); + } + + @Test + public void testReadBufWithArray() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(false); + ByteBuffer byteBuffer = ByteBuffer.allocate(10); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + int readCount = reader.readFully(fsDataInputStream, byteBuffer); + Assert.assertEquals("Mismatching no of chars read", 10, readCount); + Assert.assertFalse("Byte buffer not full", byteBuffer.hasRemaining()); + } + + @Test + public void testReadBufWithoutArray() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(false); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(10); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + int readCount = reader.readFully(fsDataInputStream, byteBuffer); + Assert.assertEquals("Mismatching no of chars read", 10, readCount); + Assert.assertFalse("Byte buffer not full", byteBuffer.hasRemaining()); + } + + @Test + public void testReadBufWithSmallerBuffer() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(false); + ByteBuffer byteBuffer = ByteBuffer.allocate(5); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + int readCount = reader.readFully(fsDataInputStream, byteBuffer); + Assert.assertEquals("Mismatching no of chars read", 5, readCount); + Assert.assertFalse("Byte buffer not full", byteBuffer.hasRemaining()); + } + + @Test(expected = EOFException.class) + public void testReadBufWithLargerBuffer() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(false); + ByteBuffer byteBuffer = ByteBuffer.allocate(50); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + // this throws an exception as we are trying to read 50 chars and have only 10 + reader.readFully(fsDataInputStream, byteBuffer); + } + +} diff --git a/parquet-hadoop2/pom.xml b/parquet-hadoop2/pom.xml new file mode 100644 index 0000000000..fbbede17a0 --- /dev/null +++ b/parquet-hadoop2/pom.xml @@ -0,0 +1,67 @@ + + + + org.apache.parquet + parquet + ../pom.xml + 1.8.2-SNAPSHOT + + + 4.0.0 + + parquet-hadoop2 + jar + Apache Parquet Hadoop2 + https://parquet.apache.org + + + + org.apache.parquet + parquet-column + ${project.version} + + + org.apache.hadoop + hadoop-client + ${hadoop2.version} + provided + + + org.apache.parquet + parquet-hadoop + ${project.version} + + + + + + + maven-enforcer-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + + \ No newline at end of file diff --git a/parquet-hadoop2/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReaderV2.java b/parquet-hadoop2/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReaderV2.java new file mode 100644 index 0000000000..70214d1920 --- /dev/null +++ b/parquet-hadoop2/src/main/java/org/apache/parquet/hadoop/util/CompatibilityReaderV2.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FSDataInputStream; + +/** + * Uses the Hadoop V2 read(ByteBuffer) APIs to read data. + */ +public class CompatibilityReaderV2 implements CompatibilityReader { + + @Override + public int readFully(FSDataInputStream f, ByteBuffer readBuf) throws IOException { + int remaining = readBuf.remaining(); + // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read + // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we + // have to loop to ensure we read them. + while (readBuf.hasRemaining()) { + int readCount = f.read(readBuf); + if (readCount == -1) { + // this is probably a bug in the ParquetReader. We shouldn't have called readFully with a buffer + // that has more remaining than the amount of data in the stream. + throw new EOFException("Reached the end of stream. Still have: " + readBuf.remaining() + " bytes left"); + } + } + + return remaining; + } +} diff --git a/parquet-hadoop2/src/test/java/org/apache/parquet/hadoop/util/TestCompatibilityReaderV2.java b/parquet-hadoop2/src/test/java/org/apache/parquet/hadoop/util/TestCompatibilityReaderV2.java new file mode 100644 index 0000000000..4ad02cf6a4 --- /dev/null +++ b/parquet-hadoop2/src/test/java/org/apache/parquet/hadoop/util/TestCompatibilityReaderV2.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.junit.Test; + +import junit.framework.Assert; + +public class TestCompatibilityReaderV2 { + + private static final byte [] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private static class MockInputStream extends ByteArrayInputStream + implements Seekable, PositionedReadable, ByteBufferReadable { + public MockInputStream(byte[] buf) { + super(buf); + } + + // empty implementation for unused methods + public int read(long position, byte[] buffer, int offset, int length) { return -1; } + public void readFully(long position, byte[] buffer, int offset, int length) {} + public void readFully(long position, byte[] buffer) {} + public void seek(long position) {} + public long getPos() { return 0; } + public boolean seekToNewSource(long targetPos) { return false; } + + @Override + public int read(ByteBuffer buf) throws IOException { + int remaining = buf.remaining(); + while (buf.hasRemaining()) { + int data = read(); + if (data == -1) { + // similar to the pattern used in some Hadoop classes that implement ByteBufferReadable, + // we return -1 if we're at EOF + return -1; + } + buf.put((byte) data); + } + + return remaining; + } + } + + // confirm writer version is v2 when flag = true and we're in Hadoop V2 + @Test + public void testReaderFlagOn() { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(true); + Assert.assertEquals("Incorrect CompatibilityReader instantiated", CompatibilityReaderV2.class, reader.getClass()); + } + + @Test + public void testReadBuf() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(true); + ByteBuffer byteBuffer = ByteBuffer.allocate(10); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + int readCount = reader.readFully(fsDataInputStream, byteBuffer); + Assert.assertEquals("Mismatching no of chars read", 10, readCount); + Assert.assertFalse("Byte buffer not full", byteBuffer.hasRemaining()); + } + + @Test + public void testReadBufWithSmallerBuffer() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(true); + ByteBuffer byteBuffer = ByteBuffer.allocate(5); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + int readCount = reader.readFully(fsDataInputStream, byteBuffer); + Assert.assertEquals("Mismatching no of chars read", 5, readCount); + Assert.assertFalse("Byte buffer not full", byteBuffer.hasRemaining()); + } + + @Test(expected = EOFException.class) + public void testReadBufWithLargerBuffer() throws Exception { + CompatibilityReader reader = CompatibilityUtil.getHadoopReader(true); + ByteBuffer byteBuffer = ByteBuffer.allocate(50); + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockInputStream(TEST_ARRAY)); + + // if we're trying to read 50 chars and have only 10, we end up with an EOFException + reader.readFully(fsDataInputStream, byteBuffer); + } +} diff --git a/pom.xml b/pom.xml index 512bf3735e..6e5666918e 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,7 @@ org.codehaus.jackson shaded.parquet 1.1.0 + 2.3.0 2.5.3 3.0.3 2.3.1 @@ -117,6 +118,7 @@ parquet-hive parquet-hive-bundle parquet-tools + parquet-hadoop2