diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index a58bfd9ec8..f06ab9412e 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -20,6 +20,7 @@ import static parquet.Preconditions.checkNotNull; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.column.ColumnDescriptor; @@ -518,16 +519,16 @@ private void readPage() { this.pageValueCount = page.getValueCount(); this.endOfPageValueCount = readValues + pageValueCount; try { - byte[] bytes = page.getBytes().toByteArray(); - if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); + ByteBuffer byteBuf = page.getBytes().toByteBuffer(); + if (DEBUG) LOG.debug("page size " + page.getBytes().size() + " bytes and " + pageValueCount + " records"); if (DEBUG) LOG.debug("reading repetition levels at 0"); - repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0); + repetitionLevelColumn.initFromPage(pageValueCount, byteBuf, 0); int next = repetitionLevelColumn.getNextOffset(); if (DEBUG) LOG.debug("reading definition levels at " + next); - definitionLevelColumn.initFromPage(pageValueCount, bytes, next); + definitionLevelColumn.initFromPage(pageValueCount, byteBuf, next); next = definitionLevelColumn.getNextOffset(); if (DEBUG) LOG.debug("reading data at " + next); - dataColumn.initFromPage(pageValueCount, bytes, next); + dataColumn.initFromPage(pageValueCount, byteBuf, next); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + path, e); } diff --git a/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java index f125b2f0ed..6f0d1d8dc6 100644 --- a/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java +++ b/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java @@ -67,13 +67,13 @@ public String toString() { } public void updateStats(Binary min_value, Binary max_value) { - if (min.compareTo(min_value) > 0) { min = min_value; } - if (max.compareTo(max_value) < 0) { max = max_value; } + if (min.compareTo(min_value) > 0) { min = Binary.fromByteArray(min_value.getBytes()); } + if (max.compareTo(max_value) < 0) { max = Binary.fromByteArray(max_value.getBytes()); } } public void initializeStats(Binary min_value, Binary max_value) { - min = min_value; - max = max_value; + min = Binary.fromByteArray(min_value.getBytes()); + max = Binary.fromByteArray(max_value.getBytes()); this.markAsNotEmpty(); } diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java index ec2c0386a4..3cf5b5cfc6 100644 --- a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java @@ -19,11 +19,12 @@ import parquet.io.ParquetDecodingException; import parquet.io.api.Binary; +import java.nio.ByteBuffer; /** * Base class to implement an encoding for a given column type. * - * A ValuesReader is provided with a page (byte-array) and is responsible + * A ValuesReader is provided with a page (byte-buffer) and is responsible * for deserializing the primitive values stored in that page. * * Given that pages are homogeneous (store only a single type), typical subclasses @@ -55,6 +56,11 @@ public abstract class ValuesReader { * * @throws IOException */ + public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException; + + /* + * Compatitble Interface. + */ public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException; /** diff --git a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java index 7d8513a4fc..74ed80dd1c 100644 --- a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java @@ -18,11 +18,12 @@ import static parquet.bytes.BytesUtils.getWidthFromMaxInt; import static parquet.column.values.bitpacking.BitPacking.createBitPackingReader; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.bytes.BytesUtils; +import parquet.bytes.ByteBufferInputStream; import parquet.column.values.ValuesReader; import parquet.column.values.bitpacking.BitPacking.BitPackingReader; import parquet.io.ParquetDecodingException; @@ -36,7 +37,7 @@ public class BitPackingValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(BitPackingValuesReader.class); - private ByteArrayInputStream in; + private ByteBufferInputStream in; private BitPackingReader bitPackingReader; private final int bitsPerValue; private int nextOffset; @@ -66,15 +67,20 @@ public int readInteger() { * @see parquet.column.values.ValuesReader#initFromPage(long, byte[], int) */ @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { int effectiveBitLength = valueCount * bitsPerValue; int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." ); - this.in = new ByteArrayInputStream(in, offset, length); + this.in = new ByteBufferInputStream(in.duplicate(), offset, length); this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount); this.nextOffset = offset + length; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int getNextOffset() { return nextOffset; diff --git a/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java index 22a6d92c01..8af93f149c 100644 --- a/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; +import java.nio.ByteBuffer; import parquet.Log; import parquet.bytes.BytesUtils; @@ -31,7 +32,7 @@ public class ByteBitPackingValuesReader extends ValuesReader { private final BytePacker packer; private final int[] decoded = new int[VALUES_AT_A_TIME]; private int decodedPosition = VALUES_AT_A_TIME - 1; - private byte[] encoded; + private ByteBuffer encoded; private int encodedPos; private int nextOffset; @@ -39,16 +40,21 @@ public ByteBitPackingValuesReader(int bound, Packer packer) { this.bitWidth = BytesUtils.getWidthFromMaxInt(bound); this.packer = packer.newBytePacker(bitWidth); } - + @Override public int readInteger() { ++ decodedPosition; if (decodedPosition == decoded.length) { - if (encodedPos + bitWidth > encoded.length) { - packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0); + encoded.position(encodedPos); + if (encodedPos + bitWidth > encoded.limit()) { + // unpack8Values needs at least bitWidth bytes to read from, + // We have to fill in 0 byte at the end of encoded bytes. + byte[] tempEncode = new byte[bitWidth]; + encoded.get(tempEncode, 0, encoded.limit() - encodedPos); + packer.unpack8Values(ByteBuffer.wrap(tempEncode), 0, decoded, 0); } else { packer.unpack8Values(encoded, encodedPos, decoded, 0); - } + } encodedPos += bitWidth; decodedPosition = 0; } @@ -56,7 +62,7 @@ public int readInteger() { } @Override - public void initFromPage(int valueCount, byte[] page, int offset) + public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException { int effectiveBitLength = valueCount * bitWidth; int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil @@ -67,6 +73,11 @@ public void initFromPage(int valueCount, byte[] page, int offset) this.nextOffset = offset + length; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int getNextOffset() { return nextOffset; diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java index a7abd7dbee..6b4bc4b5ca 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java @@ -16,13 +16,14 @@ package parquet.column.values.boundedint; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.io.ParquetDecodingException; class BitReader { private int currentByte = 0; private int currentPosition = 8; - private byte[] buf; + private ByteBuffer buf; private int currentBufferPosition = 0; private static final int[] byteGetValueMask = new int[8]; private static final int[] readMask = new int[32]; @@ -47,7 +48,7 @@ class BitReader { * The array is not copied, so must not be mutated during the course of * reading. */ - public void prepare(byte[] buf, int offset, int length) { + public void prepare(ByteBuffer buf, int offset, int length) { this.buf = buf; this.endBufferPosistion = offset + length; currentByte = 0; @@ -84,7 +85,7 @@ public int readNBitInteger(int bitsPerValue) { private int getNextByte() { if (currentBufferPosition < endBufferPosistion) { - return buf[currentBufferPosition++] & 0xFF; + return buf.get(currentBufferPosition++) & 0xFF; } return 0; } diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java index 9244ee24fb..61928e40ba 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java @@ -18,6 +18,7 @@ import static parquet.Log.DEBUG; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.bytes.BytesUtils; @@ -67,8 +68,8 @@ public int readInteger() { // bytes would have to be serialized). This is the flip-side // to BoundedIntColumnWriter.writeData(BytesOutput) @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { - if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " "); + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in.get(offset) + " " + in.get(offset + 1) + " " + in.get(offset + 2) + " " + in.get(offset + 3) + " "); int totalBytes = BytesUtils.readIntLittleEndian(in, offset); if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes"); currentValueCt = 0; @@ -78,6 +79,11 @@ public void initFromPage(int valueCount, byte[] in, int offset) throws IOExcepti this.nextOffset = offset + totalBytes + 4; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int getNextOffset() { return this.nextOffset; diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java index 9cab44c82a..905eafdc1a 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java @@ -16,6 +16,7 @@ package parquet.column.values.boundedint; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.column.values.ValuesReader; @@ -33,10 +34,15 @@ public int readInteger() { } @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { this.nextOffset = offset; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int getNextOffset() { return nextOffset; diff --git a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java index e79ac6efb9..0acf2ecd52 100644 --- a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java @@ -21,9 +21,11 @@ import parquet.column.values.bitpacking.BytePacker; import parquet.column.values.bitpacking.Packer; import parquet.io.ParquetDecodingException; +import parquet.bytes.ByteBufferInputStream; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; /** * Read values written by {@link DeltaBinaryPackingValuesWriter} @@ -37,7 +39,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { */ private int valuesRead; private int minDeltaInCurrentBlock; - private byte[] page; + private ByteBuffer page; /** * stores the decoded values including the first value which is written to the header */ @@ -47,7 +49,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { * when data is not aligned to mini block, which means padding 0s are in the buffer */ private int valuesBuffered; - private ByteArrayInputStream in; + private ByteBufferInputStream in; private int nextOffset; private DeltaBinaryPackingConfig config; private int[] bitWidths; @@ -61,8 +63,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { * @throws IOException */ @Override - public void initFromPage(int valueCount, byte[] page, int offset) throws IOException { - in = new ByteArrayInputStream(page, offset, page.length - offset); + public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException { + in = new ByteBufferInputStream(page.duplicate(), offset, page.limit() - offset); this.config = DeltaBinaryPackingConfig.readConfig(in); this.page = page; this.totalValueCount = BytesUtils.readUnsignedVarInt(in); @@ -75,7 +77,12 @@ public void initFromPage(int valueCount, byte[] page, int offset) throws IOExcep while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis loadNewBlockToBuffer(); } - this.nextOffset = page.length - in.available(); + this.nextOffset = page.limit() - in.available(); + } + + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); } @Override @@ -148,7 +155,7 @@ private void unpackMiniBlock(BytePacker packer) { private void unpack8Values(BytePacker packer) { //calculate the pos because the packer api uses array not stream - int pos = page.length - in.available(); + int pos = page.limit() - in.available(); packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered); this.valuesBuffered += 8; //sync the pos in stream diff --git a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java index 902f1a450c..467f49832f 100644 --- a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java @@ -18,6 +18,7 @@ import static parquet.Log.DEBUG; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.column.values.ValuesReader; @@ -34,7 +35,7 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class); private ValuesReader lengthReader; - private byte[] in; + private ByteBuffer in; private int offset; public DeltaLengthByteArrayValuesReader() { @@ -42,21 +43,26 @@ public DeltaLengthByteArrayValuesReader() { } @Override - public void initFromPage(int valueCount, byte[] in, int offset) + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); lengthReader.initFromPage(valueCount, in, offset); offset = lengthReader.getNextOffset(); this.in = in; this.offset = offset; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public Binary readBytes() { int length = lengthReader.readInteger(); int start = offset; offset = start + length; - return Binary.fromByteArray(in, start, length); + return Binary.fromByteBuffer(in, start, length); } @Override diff --git a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java index f62a746953..723a31d480 100644 --- a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java +++ b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java @@ -16,6 +16,7 @@ package parquet.column.values.deltastrings; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.column.values.ValuesReader; import parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -41,12 +42,17 @@ public DeltaByteArrayReader() { } @Override - public void initFromPage(int valueCount, byte[] page, int offset) + public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException { prefixLengthReader.initFromPage(valueCount, page, offset); int next = prefixLengthReader.getNextOffset(); suffixReader.initFromPage(valueCount, page, next); } + + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } @Override public void skip() { diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java index 5c105ed73d..23f8644eda 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -17,10 +17,12 @@ import static parquet.Log.DEBUG; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; import parquet.Log; +import parquet.bytes.ByteBufferInputStream; import parquet.bytes.BytesUtils; import parquet.column.Dictionary; import parquet.column.values.ValuesReader; @@ -37,7 +39,7 @@ public class DictionaryValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(DictionaryValuesReader.class); - private ByteArrayInputStream in; + private ByteBufferInputStream in; private Dictionary dictionary; @@ -48,12 +50,12 @@ public DictionaryValuesReader(Dictionary dictionary) { } @Override - public void initFromPage(int valueCount, byte[] page, int offset) + public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException { - this.in = new ByteArrayInputStream(page, offset, page.length - offset); - if (page.length - offset > 0) { + this.in = new ByteBufferInputStream(page, offset, page.limit() - offset); + if (page.limit() - offset > 0) { if (DEBUG) - LOG.debug("init from page at offset " + offset + " for length " + (page.length - offset)); + LOG.debug("init from page at offset " + offset + " for length " + (page.limit() - offset)); int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in); if (DEBUG) LOG.debug("bit width " + bitWidth); decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); @@ -66,7 +68,12 @@ public int readInt() throws IOException { }; } } - + + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int readValueDictionaryId() { try { diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java index 33deab87cc..3e6ec2a1aa 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java @@ -19,6 +19,7 @@ import static parquet.column.Encoding.PLAIN_DICTIONARY; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Preconditions; import parquet.column.Dictionary; @@ -81,17 +82,18 @@ public PlainBinaryDictionary(DictionaryPage dictionaryPage) throws IOException { */ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryBytes = dictionaryPage.getBytes().toByteBuffer(); binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()]; - int offset = 0; + int offset = dictionaryBytes.position(); if (length == null) { // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes + for (int i = 0; i < binaryDictionaryContent.length; i++) { int len = readIntLittleEndian(dictionaryBytes, offset); // read the length offset += 4; // wrap the content in a binary - binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len); + binaryDictionaryContent[i] = Binary.fromByteBuffer(dictionaryBytes, offset, len); // increment to the next value offset += len; } @@ -101,14 +103,14 @@ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) thro "Invalid byte array length: " + length); for (int i = 0; i < binaryDictionaryContent.length; i++) { // wrap the content in a Binary - binaryDictionaryContent[i] = Binary.fromByteArray( + binaryDictionaryContent[i] = Binary.fromByteBuffer( dictionaryBytes, offset, length); // increment to the next value offset += length; } } } - + @Override public Binary decodeToBinary(int id) { return binaryDictionaryContent[id]; @@ -143,10 +145,10 @@ public static class PlainLongDictionary extends PlainValuesDictionary { */ public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); longDictionaryContent = new long[dictionaryPage.getDictionarySize()]; LongPlainValuesReader longReader = new LongPlainValuesReader(); - longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); for (int i = 0; i < longDictionaryContent.length; i++) { longDictionaryContent[i] = longReader.readLong(); } @@ -186,10 +188,10 @@ public static class PlainDoubleDictionary extends PlainValuesDictionary { */ public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()]; DoublePlainValuesReader doubleReader = new DoublePlainValuesReader(); - doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); for (int i = 0; i < doubleDictionaryContent.length; i++) { doubleDictionaryContent[i] = doubleReader.readDouble(); } @@ -229,10 +231,10 @@ public static class PlainIntegerDictionary extends PlainValuesDictionary { */ public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); intDictionaryContent = new int[dictionaryPage.getDictionarySize()]; IntegerPlainValuesReader intReader = new IntegerPlainValuesReader(); - intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); for (int i = 0; i < intDictionaryContent.length; i++) { intDictionaryContent[i] = intReader.readInteger(); } @@ -272,10 +274,10 @@ public static class PlainFloatDictionary extends PlainValuesDictionary { */ public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); floatDictionaryContent = new float[dictionaryPage.getDictionarySize()]; FloatPlainValuesReader floatReader = new FloatPlainValuesReader(); - floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); for (int i = 0; i < floatDictionaryContent.length; i++) { floatDictionaryContent[i] = floatReader.readFloat(); } diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java index e1d8906337..d884ed8dc8 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -18,6 +18,7 @@ import static parquet.Log.DEBUG; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.bytes.BytesUtils; @@ -27,7 +28,7 @@ public class BinaryPlainValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class); - private byte[] in; + private ByteBuffer in; private int offset; @Override @@ -36,7 +37,7 @@ public Binary readBytes() { int length = BytesUtils.readIntLittleEndian(in, offset); int start = offset + 4; offset = start + length; - return Binary.fromByteArray(in, start, length); + return Binary.fromByteBuffer(in, start, length); } catch (IOException e) { throw new ParquetDecodingException("could not read bytes at offset " + offset, e); } catch (RuntimeException e) { @@ -57,11 +58,15 @@ public void skip() { } @Override - public void initFromPage(int valueCount, byte[] in, int offset) + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); this.in = in; this.offset = offset; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } } diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java index e349f876a0..96bfb72c68 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -19,6 +19,7 @@ import static parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.column.values.ValuesReader; @@ -59,14 +60,19 @@ public void skip() { * @see parquet.column.values.ValuesReader#initFromPage(byte[], int) */ @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); this.in.initFromPage(valueCount, in, offset); } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int getNextOffset() { return this.in.getNextOffset(); } -} \ No newline at end of file +} diff --git a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index 084da9cdab..6ba6e84846 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -16,6 +16,7 @@ package parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.column.values.ValuesReader; import parquet.io.ParquetDecodingException; @@ -30,7 +31,7 @@ */ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class); - private byte[] in; + private ByteBuffer in; private int offset; private int length; @@ -43,7 +44,7 @@ public Binary readBytes() { try { int start = offset; offset = start + length; - return Binary.fromByteArray(in, start, length); + return Binary.fromByteBuffer(in, start, length); } catch (RuntimeException e) { throw new ParquetDecodingException("could not read bytes at offset " + offset, e); } @@ -55,10 +56,15 @@ public void skip() { } @Override - public void initFromPage(int valueCount, byte[] in, int offset) + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); this.in = in; this.offset = offset; } + + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } } diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java index 27702ad1cd..572512113a 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java @@ -17,11 +17,12 @@ import static parquet.Log.DEBUG; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.Log; import parquet.bytes.LittleEndianDataInputStream; +import parquet.bytes.ByteBufferInputStream; import parquet.column.values.ValuesReader; import parquet.io.ParquetDecodingException; @@ -41,9 +42,14 @@ abstract public class PlainValuesReader extends ValuesReader { * @see parquet.column.values.ValuesReader#initFromPage(byte[], int) */ @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); - this.in = new LittleEndianDataInputStream(new ByteArrayInputStream(in, offset, in.length - offset)); + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); + this.in = new LittleEndianDataInputStream(new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset)); + } + + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); } public static class DoublePlainValuesReader extends PlainValuesReader { @@ -129,4 +135,4 @@ public long readLong() { } } } -} \ No newline at end of file +} diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index 04d3eeb3c8..051ad212ab 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -21,9 +21,12 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import parquet.Log; import parquet.Preconditions; +import parquet.bytes.ByteBufferInputStream; + import parquet.bytes.BytesUtils; import parquet.column.values.bitpacking.BytePacker; import parquet.column.values.bitpacking.Packer; @@ -41,7 +44,7 @@ private static enum MODE { RLE, PACKED } private final int bitWidth; private final BytePacker packer; - private final ByteArrayInputStream in; + private final ByteBufferInputStream in; private MODE mode; private int currentCount; @@ -51,6 +54,17 @@ private static enum MODE { RLE, PACKED } public RunLengthBitPackingHybridDecoder(int bitWidth, ByteArrayInputStream in) { if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth); + Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); + this.bitWidth = bitWidth; + this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + byte[] buf = new byte[in.available()]; + in.read(buf, 0, in.available()); + this.in = new ByteBufferInputStream(ByteBuffer.wrap(buf)); + } + + public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBufferInputStream in) { + if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth); + Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); @@ -91,14 +105,20 @@ private void readNext() throws IOException { currentCount = numGroups * 8; if (DEBUG) LOG.debug("reading " + currentCount + " values BIT PACKED"); currentBuffer = new int[currentCount]; // TODO: reuse a buffer - byte[] bytes = new byte[numGroups * bitWidth]; // At the end of the file RLE data though, there might not be that many bytes left. + ByteBuffer buf = in.toByteBuffer(); int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0); - bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(bytes, 0, bytesToRead); + bytesToRead = Math.min(bytesToRead, buf.remaining()); for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { - packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + if (byteIndex + bitWidth <= buf.remaining()) { + packer.unpack8Values(buf, byteIndex, currentBuffer, valueIndex); + } else { + byte[] bytes = new byte[bitWidth]; + buf.get(bytes, byteIndex, buf.remaining() - byteIndex); + packer.unpack8Values(ByteBuffer.wrap(bytes), 0, currentBuffer, valueIndex); + } } + in.skip(bytesToRead); break; default: throw new ParquetDecodingException("not a valid mode " + mode); diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 4ff05f31a7..cccada5a1b 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -15,9 +15,10 @@ */ package parquet.column.values.rle; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import parquet.bytes.ByteBufferInputStream; import parquet.bytes.BytesUtils; import parquet.column.values.ValuesReader; import parquet.io.ParquetDecodingException; @@ -38,8 +39,8 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) { } @Override - public void initFromPage(int valueCountL, byte[] page, int offset) throws IOException { - ByteArrayInputStream in = new ByteArrayInputStream(page, offset, page.length - offset); + public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException { + ByteBufferInputStream in = new ByteBufferInputStream(page.duplicate(), offset, page.limit() - offset); int length = BytesUtils.readIntLittleEndian(in); decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); @@ -48,6 +49,11 @@ public void initFromPage(int valueCountL, byte[] page, int offset) throws IOExce this.nextOffset = offset + length + 4; } + @Override + public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{ + this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + } + @Override public int getNextOffset() { return this.nextOffset; diff --git a/parquet-column/src/main/java/parquet/io/api/Binary.java b/parquet-column/src/main/java/parquet/io/api/Binary.java index a297603ee5..2b2e68c1c3 100644 --- a/parquet-column/src/main/java/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/parquet/io/api/Binary.java @@ -342,6 +342,91 @@ private void readObjectNoData() throws ObjectStreamException { public static Binary fromByteBuffer(final ByteBuffer value) { return new ByteBufferBackedBinary(value); } + + public static Binary fromByteBuffer( + final ByteBuffer value, + final int offset, + final int length) { + return new Binary() { + @Override + public String toStringUsingUTF8() { + return new String(getBytes(), BytesUtils.UTF8); + } + + @Override + public int length() { + return length; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(getBytes()); + } + + @Override + public byte[] getBytes() { + byte[] bytes = new byte[length]; + + value.mark(); + value.position(offset); + value.get(bytes).reset(); + + return bytes; + } + + @Override + public int hashCode() { + byte[] bytes = getBytes(); + return Binary.hashCode(bytes, 0, bytes.length); + } + + @Override + boolean equals(Binary other) { + if (toByteBuffer().compareTo(other.toByteBuffer()) == 0) { + return true; + } + return false; + } + + @Override + boolean equals(byte[] other, int otherOffset, int otherLength) { + if (toByteBuffer().compareTo(ByteBuffer.wrap(other, otherOffset, otherLength)) == 0) { + return true; + } + return false; + } + + @Override + public int compareTo(Binary other) { + byte[] bytes = getBytes(); + return other.compareTo(bytes, 0, bytes.length); + } + + @Override + int compareTo(byte[] other, int otherOffset, int otherLength) { + byte[] bytes = getBytes(); + return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength); + } + + @Override + public ByteBuffer toByteBuffer() { + ByteBuffer buf; + value.mark(); + value.position(offset); + buf = value.slice(); + buf.limit(length); + value.reset(); + return buf; + } + + @Override + public void writeTo(DataOutput out) throws IOException { + for (int i = offset; i < offset + length; i++) { + out.write(value.get(i)); + } + } + }; + } public static Binary fromString(final String value) { try { diff --git a/parquet-column/src/test/java/parquet/column/values/Utils.java b/parquet-column/src/test/java/parquet/column/values/Utils.java index 0ca0e1769f..9e81aaa41a 100644 --- a/parquet-column/src/test/java/parquet/column/values/Utils.java +++ b/parquet-column/src/test/java/parquet/column/values/Utils.java @@ -16,6 +16,7 @@ package parquet.column.values; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import parquet.io.api.Binary; @@ -58,7 +59,7 @@ public static void writeData(ValuesWriter writer, String[] strings) public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length) throws IOException { Binary[] bins = new Binary[length]; - reader.initFromPage(length, data, 0); + reader.initFromPage(length, ByteBuffer.wrap(data), 0); for(int i=0; i < length; i++) { bins[i] = reader.readBytes(); } @@ -73,7 +74,7 @@ public static Binary[] readData(ValuesReader reader, byte[] data, int length) public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length) throws IOException { int[] ints = new int[length]; - reader.initFromPage(length, data, offset); + reader.initFromPage(length, ByteBuffer.wrap(data), offset); for(int i=0; i < length; i++) { ints[i] = reader.readInteger(); } diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java index 0a3ccc17fc..50a79b0fe6 100644 --- a/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java +++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java @@ -17,6 +17,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import parquet.column.values.ValuesReader; import parquet.column.values.bitpacking.BitPacking.BitPackingWriter; @@ -84,7 +85,7 @@ private static long readNTimes(byte[] bytes, int[] result, ValuesReader r) System.out.print(" no gc <"); for (int k = 0; k < N; k++) { long t2 = System.nanoTime(); - r.initFromPage(result.length, bytes, 0); + r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0); for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); } diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java index 0351db876b..3cd897ad9e 100644 --- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java +++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java @@ -20,6 +20,7 @@ import static parquet.column.values.bitpacking.Packer.BIG_ENDIAN; import java.io.IOException; +import java.nio.ByteBuffer; import org.junit.Test; @@ -169,7 +170,7 @@ private void validateEncodeDecode(int bitLength, int[] vals, String expected) th LOG.debug("bytes: " + TestBitPacking.toString(bytes)); assertEquals(type.toString(), expected, TestBitPacking.toString(bytes)); ValuesReader r = type.getReader(bound); - r.initFromPage(vals.length, bytes, 0); + r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0); int[] result = new int[vals.length]; for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); diff --git a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java index b8ee5fbac7..f3ba0f781b 100644 --- a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java +++ b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Random; @@ -63,7 +64,7 @@ private void compareOutput(int bound, int[] ints, String[] result) throws IOExce byte[] byteArray = bicw.getBytes().toByteArray(); assertEquals(concat(result), toBinaryString(byteArray, 4)); BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound); - bicr.initFromPage(1, byteArray, 0); + bicr.initFromPage(1, ByteBuffer.wrap(byteArray), 0); String expected = ""; String got = ""; for (int i : ints) { @@ -155,7 +156,7 @@ public void testSerDe() throws Exception { idx = 0; int offset = 0; for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { - bicr.initFromPage(1, input, offset); + bicr.initFromPage(1, ByteBuffer.wrap(input), offset); offset = bicr.getNextOffset(); for (int i = 0; i < valuesPerStripe[stripeNum]; i++) { int number = stream[idx++]; diff --git a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java index 415f5097ff..2e7ba9c47f 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import org.junit.Before; @@ -151,7 +152,7 @@ public void shouldReturnCorrectOffsetAfterInitialization() throws IOException { System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length); //offset should be correct - reader.initFromPage(100, pageContent, contentOffsetInPage); + reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage); int offset= reader.getNextOffset(); assertEquals(valueContent.length + contentOffsetInPage, offset); @@ -184,7 +185,7 @@ public void shouldSkip() throws IOException { } writeData(data); reader = new DeltaBinaryPackingValuesReader(); - reader.initFromPage(100, writer.getBytes().toByteArray(), 0); + reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0); for (int i = 0; i < data.length; i++) { if (i % 3 == 0) { reader.skip(); @@ -240,7 +241,7 @@ private void shouldReadAndWrite(int[] data, int length) throws IOException { + blockFlushed * miniBlockNum //bitWidth of mini blocks + (5.0 * blockFlushed);//min delta for each block assertTrue(estimatedSize >= page.length); - reader.initFromPage(100, page, 0); + reader.initFromPage(100, ByteBuffer.wrap(page), 0); for (int i = 0; i < length; i++) { assertEquals(data[i], reader.readInteger()); diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java index d01a605dc4..6d9e7c1f75 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java @@ -30,6 +30,7 @@ import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; @AxisRange(min = 0, max = 1) @@ -83,7 +84,7 @@ public void readingRLE() throws IOException { } private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException { - reader.initFromPage(data.length, deltaBytes, 0); + reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0); for (int i = 0; i < data.length; i++) { reader.readInteger(); } diff --git a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index 8cb39b1a30..155d60d095 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -16,6 +16,7 @@ package parquet.column.values.deltalengthbytearray; import java.io.IOException; +import java.util.Arrays; import org.junit.Test; import org.junit.Assert; @@ -33,12 +34,12 @@ public class TestDeltaLengthByteArray { public void testSerialization () throws IOException { DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); - + Utils.writeData(writer, values); Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); for(int i =0; i< bin.length ; i++) { - Assert.assertEquals(Binary.fromString(values[i]), bin[i]); + Assert.assertEquals(Binary.fromString(values[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8()); } } @@ -52,7 +53,7 @@ public void testRandomStrings() throws IOException { Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); for(int i =0; i< bin.length ; i++) { - Assert.assertEquals(Binary.fromString(values[i]), bin[i]); + Assert.assertEquals(Binary.fromString(values[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8()); } } diff --git a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java index c784491bbf..0a266ec7ce 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java @@ -39,7 +39,7 @@ public void testSerialization () throws IOException { Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); for(int i =0; i< bin.length ; i++) { - Assert.assertEquals(Binary.fromString(values[i]), bin[i]); + Assert.assertEquals(Binary.fromString(values[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8()); } } @@ -52,7 +52,7 @@ public void testRandomStrings() throws IOException { Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), randvalues.length); for(int i =0; i< bin.length ; i++) { - Assert.assertEquals(Binary.fromString(randvalues[i]), bin[i]); + Assert.assertEquals(Binary.fromString(randvalues[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8()); } } diff --git a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java index a5d6e1f8da..626e5655c9 100644 --- a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java @@ -87,10 +87,11 @@ public void testBinaryDictionaryFallBack() throws IOException { //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back ValuesReader reader = new BinaryPlainValuesReader(); - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (long i = 0; i < 100; i++) { - assertEquals(Binary.fromString("str" + i), reader.readBytes()); + assertEquals(Binary.fromString("str" + i).toStringUsingUTF8(), + reader.readBytes().toStringUsingUTF8()); } //simulate cutting the page @@ -175,13 +176,13 @@ public void testLongDictionary() throws IOException { DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (long i = 0; i < COUNT; i++) { long back = cr.readLong(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (long i = COUNT2; i > 0; i--) { long back = cr.readLong(); assertEquals(i % 50, back); @@ -199,7 +200,7 @@ private void roundTripLong(DictionaryValuesWriter cw, ValuesReader reader, int } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (long i = 0; i < 100; i++) { assertEquals(i, reader.readLong()); @@ -245,13 +246,13 @@ public void testDoubleDictionary() throws IOException { final DictionaryValuesReader cr = initDicReader(cw, DOUBLE); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (double i = 0; i < COUNT; i++) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (double i = COUNT2; i > 0; i--) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); @@ -270,7 +271,7 @@ private void roundTripDouble(DictionaryValuesWriter cw, ValuesReader reader, in } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (double i = 0; i < 100; i++) { assertEquals(i, reader.readDouble(), 0.00001); @@ -316,13 +317,13 @@ public void testIntDictionary() throws IOException { DictionaryValuesReader cr = initDicReader(cw, INT32); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (int i = 0; i < COUNT; i++) { int back = cr.readInteger(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (int i = COUNT2; i > 0; i--) { int back = cr.readInteger(); assertEquals(i % 50, back); @@ -341,7 +342,7 @@ private void roundTripInt(DictionaryValuesWriter cw, ValuesReader reader, int m } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (int i = 0; i < 100; i++) { assertEquals(i, reader.readInteger()); @@ -387,13 +388,13 @@ public void testFloatDictionary() throws IOException { DictionaryValuesReader cr = initDicReader(cw, FLOAT); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (float i = 0; i < COUNT; i++) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (float i = COUNT2; i > 0; i--) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); @@ -412,7 +413,7 @@ private void roundTripFloat(DictionaryValuesWriter cw, ValuesReader reader, int } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (float i = 0; i < 100; i++) { assertEquals(i, reader.readFloat(), 0.00001); @@ -461,14 +462,14 @@ private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName } private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException { - cr.initFromPage(COUNT, bytes.toByteArray(), 0); + cr.initFromPage(COUNT, bytes.toByteBuffer(), 0); for (int i = 0; i < COUNT; i++) { Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8()); } } private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException { - cr.initFromPage(COUNT, bytes.toByteArray(), 0); + cr.initFromPage(COUNT, bytes.toByteBuffer(), 0); for (int i = 0; i < COUNT; i++) { Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8()); } diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 2359d8de72..b36a62c892 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -15,9 +15,10 @@ */ package parquet.column.values.rle; -import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.io.InputStream; +import parquet.bytes.ByteBufferInputStream; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -67,8 +68,8 @@ private void doIntegrationTest(int bitWidth) throws Exception { } numValues += 1000; - byte[] encodedBytes = encoder.toBytes().toByteArray(); - ByteArrayInputStream in = new ByteArrayInputStream(encodedBytes); + ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer(); + ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 0859cb10d1..f2e27903e2 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -25,6 +26,7 @@ import org.junit.Test; import parquet.bytes.BytesUtils; +import parquet.bytes.ByteBufferInputStream; import parquet.column.values.bitpacking.BytePacker; import parquet.column.values.bitpacking.Packer; @@ -284,7 +286,7 @@ public void testGroupBoundary() throws Exception { // bit width 2. bytes[0] = (1 << 1 )| 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); + ByteBufferInputStream stream = new ByteBufferInputStream(ByteBuffer.wrap(bytes)); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); assertEquals(decoder.readInt(), 1); assertEquals(decoder.readInt(), 2); @@ -306,7 +308,7 @@ private static List unpack(int bitWidth, int numValues, ByteArrayInputS next8Values[i] = (byte) is.read(); } - packer.unpack8Values(next8Values, 0, unpacked, 0); + packer.unpack8Values(ByteBuffer.wrap(next8Values), 0, unpacked, 0); for (int v = 0; v < 8; v++) { values.add(unpacked[v]); diff --git a/parquet-common/src/main/java/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/parquet/bytes/ByteBufferInputStream.java new file mode 100644 index 0000000000..15a8c8e5b0 --- /dev/null +++ b/parquet-common/src/main/java/parquet/bytes/ByteBufferInputStream.java @@ -0,0 +1,55 @@ +package parquet.bytes; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream { + + protected ByteBuffer byteBuf; + public ByteBufferInputStream(ByteBuffer buffer) { + this(buffer, buffer.position(), buffer.remaining()); + } + + public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) { + buffer.position(offset); + byteBuf = buffer.slice(); + byteBuf.limit(count); + } + + public ByteBuffer toByteBuffer() { + return byteBuf.slice(); + } + + @Override + public int read() throws IOException { + if (!byteBuf.hasRemaining()) { + return -1; + } + //Workaround for unsigned byte + return byteBuf.get() & 0xFF; + } + + @Override + public int read(byte[] bytes, int offset, int length) throws IOException { + int count = Math.min(byteBuf.remaining(), length); + if (count == 0) return -1; + byteBuf.get(bytes, offset, count); + return count; + } + + @Override + public long skip(long n) { + if (n > byteBuf.remaining()) + n = byteBuf.remaining(); + int pos = byteBuf.position(); + byteBuf.position((int)(pos + n)); + return n; + } + + + @Override + public int available() { + return byteBuf.remaining(); + } +} diff --git a/parquet-common/src/main/java/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/parquet/bytes/BytesUtils.java index 108a7ab5f6..53b91c1811 100644 --- a/parquet-common/src/main/java/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/parquet/bytes/BytesUtils.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import parquet.Log; @@ -43,6 +44,21 @@ public static int getWidthFromMaxInt(int bound) { return 32 - Integer.numberOfLeadingZeros(bound); } + /** + * reads an int in little endian at the given position + * @param in + * @param offset + * @return + * @throws IOException + */ + public static int readIntLittleEndian(ByteBuffer in, int offset) throws IOException { + int ch4 = in.get(offset) & 0xff; + int ch3 = in.get(offset + 1) & 0xff; + int ch2 = in.get(offset + 2) & 0xff; + int ch1 = in.get(offset + 3) & 0xff; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + /** * reads an int in little endian at the given position * @param in diff --git a/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java index fc8abfd51d..4a3ad3ebea 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java +++ b/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java @@ -22,6 +22,9 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import parquet.Log; @@ -68,6 +71,15 @@ public static BytesInput concat(List inputs) { public static BytesInput from(InputStream in, int bytes) { return new StreamBytesInput(in, bytes); } + + /** + * @param buffer + * @param length number of bytes to read + * @return a BytesInput that will read the given bytes from the ByteBuffer + */ + public static BytesInput from(ByteBuffer buffer, int offset, int length) { + return new ByteBufferBytesInput(buffer, offset, length); + } /** * @@ -161,6 +173,9 @@ public byte[] toByteArray() throws IOException { return baos.getBuf(); } + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.wrap(toByteArray()); + } /** * * @return the size in bytes that would be written @@ -358,5 +373,39 @@ public long size() { } } + + private static class ByteBufferBytesInput extends BytesInput { + + private final ByteBuffer byteBuf; + private final int length; + private final int offset; + + private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) { + this.byteBuf = byteBuf; + this.offset = offset; + this.length = length; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + final WritableByteChannel outputChannel = Channels.newChannel(out); + byteBuf.position(offset); + ByteBuffer tempBuf = byteBuf.slice(); + tempBuf.limit(length); + outputChannel.write(tempBuf); + } + + @Override + public ByteBuffer toByteBuffer() throws IOException { + byteBuf.position(offset); + ByteBuffer buf = byteBuf.slice(); + buf.limit(length); + return buf; + } + @Override + public long size() { + return length; + } + } } diff --git a/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java b/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java index ad35d2c773..f8edaf2978 100644 --- a/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java +++ b/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java @@ -15,6 +15,8 @@ */ package parquet.column.values.bitpacking; +import java.nio.ByteBuffer; + /** * Packs and unpacks into bytes * @@ -68,6 +70,9 @@ public final int getBitWidth() { * @param output the output values * @param outPos where to write to in output */ + public abstract void unpack8Values(final ByteBuffer input, final int inPos, final int[] output, final int outPos); + + //Compatible API public abstract void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos); /** @@ -78,6 +83,8 @@ public final int getBitWidth() { * @param output the output values * @param outPos where to write to in output */ - public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos); + public abstract void unpack32Values(ByteBuffer input, int inPos, int[] output, int outPos); + //Compatible API + public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos); } diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java index 9d109f4c44..c479033033 100644 --- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java +++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.junit.Assert; import org.junit.Test; @@ -48,7 +49,7 @@ private void packUnpack(BytePacker packer, int[] values, int[] unpacked) { byte[] packed = new byte[packer.getBitWidth() * 4]; packer.pack32Values(values, 0, packed, 0); LOG.debug("packed: " + TestBitPacking.toString(packed)); - packer.unpack32Values(packed, 0, unpacked, 0); + packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0); } private int[] generateValues(int bitWidth) { @@ -138,7 +139,7 @@ public void testPackUnPackAgainstLemire() throws IOException { LOG.debug("Gener. out: " + TestBitPacking.toString(packedGenerated)); Assert.assertEquals(pack.name() + " width " + i, TestBitPacking.toString(packedByLemireAsBytes), TestBitPacking.toString(packedGenerated)); - bytePacker.unpack32Values(packedByLemireAsBytes, 0, unpacked, 0); + bytePacker.unpack32Values(ByteBuffer.wrap(packedByLemireAsBytes), 0, unpacked, 0); LOG.debug("Output: " + TestBitPacking.toString(unpacked)); Assert.assertArrayEquals("width " + i, values, unpacked); diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java index 65fad493fd..4d340b03cc 100644 --- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java +++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java @@ -18,6 +18,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import org.junit.Assert; import org.junit.Test; @@ -61,7 +62,7 @@ private void packUnpack(IntPacker packer, int[] values, int[] unpacked) { private void packUnpack(BytePacker packer, int[] values, int[] unpacked) { byte[] packed = new byte[packer.getBitWidth() * 4]; packer.pack32Values(values, 0, packed, 0); - packer.unpack32Values(packed, 0, unpacked, 0); + packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0); } private int[] generateValues(int bitWidth) { diff --git a/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java index deb8f0e669..59b808fe26 100644 --- a/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java +++ b/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java @@ -49,6 +49,7 @@ private static void generateScheme(String className, boolean msbFirst, String ba } FileWriter fw = new FileWriter(file); fw.append("package parquet.column.values.bitpacking;\n"); + fw.append("import java.nio.ByteBuffer;\n"); fw.append("\n"); fw.append("/**\n"); if (msbFirst) { @@ -205,6 +206,9 @@ private static void generatePack(FileWriter fw, int bitWidth, int batch, boolean private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst) throws IOException { fw.append(" public final void unpack" + (batch * 8) + "Values(final byte[] in, final int inPos, final int[] out, final int outPos) {\n"); + fw.append(" unpack" + (batch * 8) + "Values(ByteBuffer.wrap(in), inPos, out, outPos);\n" ); + fw.append(" }\n"); + fw.append(" public final void unpack" + (batch * 8) + "Values(final ByteBuffer in, final int inPos, final int[] out, final int outPos) {\n"); if (bitWidth > 0) { int mask = genMask(bitWidth); for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) { @@ -227,7 +231,7 @@ private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boole } else if (shift > 0){ shiftString = "<< " + shift; } - fw.append(" (((((int)in[" + align(byteIndex, 2) + " + inPos]) & 255) " + shiftString + ") & " + mask + ")"); + fw.append(" (((((int)in.get(" + align(byteIndex, 2) + " + inPos)) & 255) " + shiftString + ") & " + mask + ")"); } fw.append(";\n"); } diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java index 5bd68698d0..4b63c57cf7 100644 --- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.fs.FSDataInputStream; import parquet.Log; import parquet.common.schema.ColumnPath; import parquet.format.ColumnChunk; @@ -49,6 +50,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.CompressionCodecName; import parquet.hadoop.metadata.ParquetMetadata; +import parquet.hadoop.util.CompatibilityUtil; import parquet.io.ParquetDecodingException; import parquet.schema.GroupType; import parquet.schema.MessageType; @@ -347,7 +349,15 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata)); return parquetMetadata; } - + + public ParquetMetadata readParquetMetadata(FSDataInputStream from) + throws IOException { + FileMetaData fileMetaData = CompatibilityUtil.read(from, new FileMetaData()); + if (Log.DEBUG) LOG.debug(fileMetaData); + ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData); + if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata)); + return parquetMetadata; + } public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException { MessageType messageType = fromParquetSchema(parquetMetadata.getSchema()); List blocks = new ArrayList(); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java index 1a87493566..f3f5279bf9 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java @@ -31,6 +31,7 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.util.ReflectionUtils; +import parquet.bytes.ByteBufferInputStream; import parquet.bytes.BytesInput; import parquet.hadoop.metadata.CompressionCodecName; @@ -54,7 +55,7 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx final BytesInput decompressed; if (codec != null) { decompressor.reset(); - InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); + InputStream is = codec.createInputStream(new ByteBufferInputStream(bytes.toByteBuffer()), decompressor); decompressed = BytesInput.from(is, uncompressedSize); } else { decompressed = bytes; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java index e660c9f9aa..7e05a3b218 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java @@ -15,9 +15,16 @@ */ package parquet.hadoop; +import static parquet.Log.DEBUG; +import static parquet.format.Util.readPageHeader; +import static parquet.bytes.BytesUtils.readIntLittleEndian; +import static parquet.hadoop.ParquetFileWriter.MAGIC; +import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; + import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.io.SequenceInputStream; import java.util.ArrayList; import java.util.Arrays; @@ -42,6 +49,7 @@ import org.apache.hadoop.mapred.Utils; import parquet.Log; +import parquet.bytes.ByteBufferInputStream; import parquet.bytes.BytesInput; import parquet.column.ColumnDescriptor; import parquet.column.page.DictionaryPage; @@ -56,6 +64,7 @@ import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; +import parquet.hadoop.util.CompatibilityUtil; import parquet.hadoop.util.counters.BenchmarkCounter; import parquet.io.ParquetDecodingException; @@ -281,11 +290,22 @@ public static final ParquetMetadata readFooter(Configuration configuration, File if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex); f.seek(footerLengthIndex); - int footerLength = readIntLittleEndian(f); - byte[] magic = new byte[MAGIC.length]; - f.readFully(magic); - if (!Arrays.equals(MAGIC, magic)) { - throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic)); + final int footerLength = CompatibilityUtil.getInt(f); + final ByteBuffer refMagicBuf = ByteBuffer.wrap(MAGIC); + for (int magicRemaining = MAGIC.length; magicRemaining > 0;) { + final ByteBuffer magicBuf = CompatibilityUtil.getBuf(f, magicRemaining); + refMagicBuf.clear(); + refMagicBuf.position(MAGIC.length - magicRemaining); + refMagicBuf.limit(refMagicBuf.position() + magicBuf.remaining()); + if (!magicBuf.equals(refMagicBuf)) { + final String expMagicStr = refMagicBuf.asCharBuffer().toString(); + final String actMagicStr = magicBuf.asCharBuffer().toString(); + throw new RuntimeException(file.getPath() + " is not a Parquet file. " + + "Expected magic number at tail " + expMagicStr + " but found " + + actMagicStr); + } + magicRemaining -= magicBuf.remaining(); + CompatibilityUtil.releaseBuffer(f, magicBuf); } long footerIndex = footerLengthIndex - footerLength; if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex); @@ -380,7 +400,7 @@ public void close() throws IOException { * @author Julien Le Dem * */ - private class Chunk extends ByteArrayInputStream { + private class Chunk extends ByteBufferInputStream { private final ChunkDescriptor descriptor; @@ -390,10 +410,9 @@ private class Chunk extends ByteArrayInputStream { * @param data contains the chunk data at offset * @param offset where the chunk starts in offset */ - public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) { - super(data); + public Chunk(ChunkDescriptor descriptor, ByteBuffer buffer, int offset) { + super(buffer, offset, descriptor.size); this.descriptor = descriptor; - this.pos = offset; } protected PageHeader readPageHeader() throws IOException { @@ -459,7 +478,7 @@ public ColumnChunkPageReader readAllPages() throws IOException { * @return the current position in the chunk */ public int pos() { - return this.pos; + return this.byteBuf.position(); } /** @@ -468,8 +487,9 @@ public int pos() { * @throws IOException */ public BytesInput readAsBytesInput(int size) throws IOException { - final BytesInput r = BytesInput.from(this.buf, this.pos, size); - this.pos += size; + int pos = this.byteBuf.position(); + final BytesInput r = BytesInput.from(this.byteBuf, pos, size); + this.byteBuf.position(pos + size); return r; } @@ -491,14 +511,14 @@ private class WorkaroundChunk extends Chunk { * @param offset where the chunk starts in data * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) { - super(descriptor, data, offset); + private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) { + super(descriptor, byteBuf, offset); this.f = f; } protected PageHeader readPageHeader() throws IOException { PageHeader pageHeader; - int initialPos = this.pos; + int initialPos = pos(); try { pageHeader = Util.readPageHeader(this); } catch (IOException e) { @@ -507,7 +527,7 @@ protected PageHeader readPageHeader() throws IOException { // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.pos = initialPos; // resetting the buffer to the position before we got the error + this.byteBuf.rewind(); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. } @@ -515,12 +535,12 @@ protected PageHeader readPageHeader() throws IOException { } public BytesInput readAsBytesInput(int size) throws IOException { - if (pos + size > count) { + if (size > this.byteBuf.remaining()) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing - int l1 = count - pos; + int l1 = this.byteBuf.remaining(); int l2 = size - l1; LOG.info("completed the column chunk with " + l2 + " bytes"); return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2))); @@ -596,18 +616,18 @@ public void addChunk(ChunkDescriptor descriptor) { public List readAll(FSDataInputStream f) throws IOException { List result = new ArrayList(chunks.size()); f.seek(offset); - byte[] chunksBytes = new byte[length]; - f.readFully(chunksBytes); + ByteBuffer chunksByteBuffer = CompatibilityUtil.getBuf(f, length); + // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); int currentChunkOffset = 0; for (int i = 0; i < chunks.size(); i++) { ChunkDescriptor descriptor = chunks.get(i); if (i < chunks.size() - 1) { - result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset)); + result.add(new Chunk(descriptor, chunksByteBuffer, currentChunkOffset)); } else { // because of a bug, the last chunk might be larger than descriptor.size - result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f)); + result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, currentChunkOffset, f)); } currentChunkOffset += descriptor.size; } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java index f3ef61b198..123ffdbec8 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java @@ -62,7 +62,8 @@ public class ParquetFileWriter { private static final Log LOG = Log.getLog(ParquetFileWriter.class); public static final String PARQUET_METADATA_FILE = "_metadata"; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); + public static final String MAGIC_STR = "PAR1"; + public static final byte[] MAGIC = MAGIC_STR.getBytes(Charset.forName("ASCII")); public static final int CURRENT_VERSION = 1; private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java index 94970d9581..b78d562d76 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java @@ -21,11 +21,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.*; /** * Snappy compression codec for Parquet. We do not use the default hadoop @@ -34,7 +30,7 @@ * for their file formats (e.g. SequenceFile) but is undesirable for Parquet since * we already have the data page which provides that. */ -public class SnappyCodec implements Configurable, CompressionCodec { +public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec { private Configuration conf; // Hadoop config for how big to make intermediate buffers. private final String BUFFER_SIZE_CONFIG = "io.file.buffer.size"; @@ -59,6 +55,10 @@ public Decompressor createDecompressor() { return new SnappyDecompressor(); } + public DirectDecompressor createDirectDecompressor() { + return new SnappyDecompressor.SnappyDirectDecompressor(); + } + @Override public CompressionInputStream createInputStream(InputStream stream) throws IOException { diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java index 34cd00275c..6a20810149 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; import org.xerial.snappy.Snappy; import parquet.Preconditions; @@ -144,4 +145,25 @@ public boolean needsDictionary() { public void setDictionary(byte[] b, int off, int len) { // No-op } -} + + public static class SnappyDirectDecompressor extends SnappyDecompressor implements DirectDecompressor { + + public SnappyDirectDecompressor() { + super(); + } + + public synchronized void decompress(ByteBuffer src, ByteBuffer dst) throws java.io.IOException{ + if(!dst.hasRemaining()){ + return; + } + dst.clear(); + //int decompressedSize = Snappy.uncompressedLength(src); + int size = Snappy.uncompress(src, dst); + dst.limit(size); + // We've decompressed the entire input + super.finished = true; + } // decompress + + } // class SnappyDirectDecompressor + +} //class SnappyDecompressor diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/util/CompatibilityUtil.java new file mode 100644 index 0000000000..0c13ffb6b2 --- /dev/null +++ b/parquet-hadoop/src/main/java/parquet/hadoop/util/CompatibilityUtil.java @@ -0,0 +1,389 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.EnumSet; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Constructor; +import parquet.Log; + +import parquet.org.apache.thrift.TBase; +import parquet.org.apache.thrift.TException; +import parquet.format.FileMetaData; +import parquet.org.apache.thrift.protocol.*; +import parquet.org.apache.thrift.transport.TIOStreamTransport; +import parquet.org.apache.thrift.transport.TTransport; +import parquet.org.apache.thrift.transport.TTransportException; + +public class CompatibilityUtil { + private static final boolean useV21; + private static final Log LOG = Log.getLog(CompatibilityUtil.class); + public static final V21FileAPI fileAPI; + private static final int MAX_SIZE = 1 << 20; + private static final Object bufferPool; + + private static class V21FileAPI { + private final Constructor ELASTIC_BYTE_BUFFER_CONSTRUCTOR; + private final Class ElasticByteBufferCls; + private final Class ByteBufferCls; + private final Class ReadOptionCls; + private final Method READ_METHOD; + private final Method RELEASE_BUFFER_METHOD; + private final Method GET_BUFFER_METHOD; + private final Method PUT_BUFFER_METHOD; + private final Class FSDataInputStreamCls; + + private V21FileAPI() throws ClassNotFoundException, NoSuchMethodException, SecurityException { + final String PACKAGE = "org.apache.hadoop"; + ElasticByteBufferCls = Class.forName(PACKAGE + ".io.ElasticByteBufferPool"); + ELASTIC_BYTE_BUFFER_CONSTRUCTOR = ElasticByteBufferCls.getConstructor(); + ByteBufferCls = Class.forName(PACKAGE + ".io.ByteBufferPool"); + FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream"); + ReadOptionCls = (Class)Class.forName(PACKAGE + ".fs.ReadOption"); + READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBufferCls, int.class, EnumSet.class); + RELEASE_BUFFER_METHOD = FSDataInputStreamCls.getMethod("releaseBuffer", ByteBuffer.class); + GET_BUFFER_METHOD = ElasticByteBufferCls.getMethod("getBuffer", boolean.class, int.class); + PUT_BUFFER_METHOD = ElasticByteBufferCls.getMethod("putBuffer", ByteBuffer.class); + } + } + + static { + boolean v21 = true; + try { + Class.forName("org.apache.hadoop.io.ElasticByteBufferPool"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + + useV21 = v21; + try { + if (v21) { + fileAPI = new V21FileAPI(); + bufferPool = fileAPI.ELASTIC_BYTE_BUFFER_CONSTRUCTOR.newInstance(); + } else { + fileAPI = null; + bufferPool = null; + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Can't find constructor ", e); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create instance ", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create instance ", e); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Can't create instance ", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create instance ", e); + } + } + + public static void releaseBuffer(FSDataInputStream f, ByteBuffer buf) { + if (useV21) { + try { + fileAPI.RELEASE_BUFFER_METHOD.invoke(f, buf); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't call method", e); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Can't call method", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't call method", e); + } + } + } + + public static int getInt(FSDataInputStream f) throws IOException { + ByteBuffer int32Buf = getBuf(f, 4).order(ByteOrder.LITTLE_ENDIAN); + if (int32Buf.remaining() == 4) { + final int res = int32Buf.getInt(); + releaseBuffer(f, int32Buf); + return res; + } + ByteBuffer tmpBuf = int32Buf; + int32Buf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); + int32Buf.put(tmpBuf); + releaseBuffer(f, tmpBuf); + while (int32Buf.hasRemaining()) { + tmpBuf = getBuf(f, int32Buf.remaining()); + int32Buf.put(tmpBuf); + releaseBuffer(f, tmpBuf); + } + return int32Buf.getInt(); + } + + public static ByteBuffer getBuf(FSDataInputStream f, int maxSize) + throws IOException { + ByteBuffer res = null; + if (useV21) { + try { + res = (ByteBuffer) fileAPI.READ_METHOD.invoke(f, + fileAPI.ELASTIC_BYTE_BUFFER_CONSTRUCTOR.newInstance(), + maxSize, + EnumSet.of(Enum.valueOf(fileAPI.ReadOptionCls, "SKIP_CHECKSUMS"))); + } catch (Exception e) { + byte[] buf = new byte[maxSize]; + f.read(buf,0, maxSize); + res = ByteBuffer.wrap(buf); + } + } else { + byte[] buf = new byte[maxSize]; + int size = f.read(buf,0, maxSize); + res = ByteBuffer.wrap(buf, 0, size); + } + + if (res == null) { + throw new EOFException("Null ByteBuffer returned"); + } + return res; + } + + // Caller must allocate the buffer + public static ByteBuffer getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException { + Class[] ZCopyArgs = {ByteBuffer.class}; + int res=0; + int l=readBuf.remaining(); + if (useV21) { + try { + res = f.read(readBuf); + }catch (UnsupportedOperationException e) { + byte[] buf = new byte[maxSize]; + res=f.read(buf); + readBuf.put(buf, 0, maxSize); + } + } else { + byte[] buf = new byte[maxSize]; + res=f.read(buf); + readBuf.put(buf, 0, maxSize); + } + + if (res == 0) { + throw new EOFException("Null ByteBuffer returned"); + } + return readBuf; + } + + public static void bbCopy(ByteBuffer dst, ByteBuffer src) { + final int n = Math.min(dst.remaining(), src.remaining()); + for (int i = 0; i < n; i++) { + dst.put(src.get()); + } + } + + public static > T read(FSDataInputStream f, T tbase) + throws IOException { + try { + // Reverting to using TIOStreamTransport instead of the FSDistTransport + // implementation below. FSDistTransport is 4x slower when reading footers. + tbase.read(new TCompactProtocol(new TIOStreamTransport(f))); + return tbase; + } catch (TException e) { + throw new IOException("can not read " + tbase.getClass() + ": " + + e.getMessage(), e); + } + } + + private static final class FSDISTransport extends TTransport { + private final FSDataInputStream fsdis; + // ByteBuffer-based API + private ByteBuffer tbuf; + private ByteBuffer slice; + + private FSDISTransport(FSDataInputStream f) { + super(); + fsdis = f; + } + + @Override + public boolean isOpen() { + return true; // TODO + } + + @Override + public boolean peek() { + throw new UnsupportedOperationException(); + } + + @Override + public void open() throws TTransportException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + + @Override + public int read(byte[] bytes, int i, int i2) throws TTransportException { + throw new UnsupportedOperationException("ByteBuffer API to be used"); + } + + @Override + public int readAll(byte[] buf, int off, int len) throws TTransportException { + ByteBuffer tmpBuf = readFully(len); + tmpBuf.get(buf, off, len); + return len; + } + + @Override + public void write(byte[] buf) throws TTransportException { + throw new UnsupportedOperationException("Read-Only implementation"); + } + + @Override + public void write(byte[] bytes, int i, int i2) throws TTransportException { + throw new UnsupportedOperationException("Read-Only implementation"); + } + + @Override + public void flush() throws TTransportException { + throw new UnsupportedOperationException("Read-Only implementation"); + } + + @Override + public byte[] getBuffer() { + if (tbuf == null) { + return null; + } + int pos = tbuf.position(); + tbuf.rewind(); + byte[] buf = new byte[tbuf.remaining()]; + tbuf.get(buf); + tbuf.position(pos); + return buf; + } + + @Override + public int getBufferPosition() { + if (tbuf == null) { + return 0; + } + return tbuf.position(); + } + + @Override + public int getBytesRemainingInBuffer() { + if (tbuf == null) { + return 0; + } + return tbuf.remaining(); + } + + @Override + public void consumeBuffer(int len) { + if (tbuf == null) { + return; + } + int pos = tbuf.position(); + tbuf.position(pos + len); + return; + } + + public byte readByte() throws TTransportException { + try { + for (;;) { + if (tbuf == null) { + tbuf = getBuf(fsdis, MAX_SIZE); + } + if (tbuf.hasRemaining()) { + return tbuf.get(); + } else { + release(tbuf); + } + } + } catch (IOException ioe) { + throw new TTransportException("Hadoop FS", ioe); + } finally { + release(tbuf); + } + } + + public ByteBuffer readFully(int size) throws TTransportException { + try { + ByteBuffer newBuf = null; // crossing boundaries + for (;;) { + if (tbuf == null) { + tbuf = getBuf(fsdis, MAX_SIZE); + } + if (newBuf == null) { + // serve slice from I/O buffer? + if (tbuf.remaining() >= size) { + final int lim = tbuf.limit(); + tbuf.limit(tbuf.position() + size); + slice = tbuf.slice(); + tbuf.position(tbuf.limit()); + tbuf.limit(lim); + return slice; + } else { + try { + newBuf = (ByteBuffer)fileAPI.GET_BUFFER_METHOD.invoke(bufferPool, false, size); + } catch (IllegalAccessException e) { + throw new TTransportException("Hadoop FS", e); + } catch (IllegalArgumentException e) { + throw new TTransportException("Hadoop FS", e); + } catch (InvocationTargetException e) { + throw new TTransportException("Hadoop FS", e); + } + newBuf.limit(size).position(0); + } + } + // no zero copy + bbCopy(newBuf, tbuf); + release(tbuf); + if (!newBuf.hasRemaining()) { + newBuf.flip(); + if (newBuf.remaining() != size) { + throw new TTransportException("boom"); + } + return newBuf; + } + } + } catch (IOException ioe) { + throw new TTransportException("Hadoop FS", ioe); + } + } + + public void release(ByteBuffer b) { + if (b == null) { + return; + } else if (b == slice) { + slice = null; + } else if (b == tbuf) { + if (!tbuf.hasRemaining()) { + releaseBuffer(fsdis, tbuf); + tbuf = null; + } + } else { + try { + fileAPI.PUT_BUFFER_METHOD.invoke(bufferPool, b); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't call method", e); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Can't call method", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't call method", e); + } + } + } + } +} diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml index 112330b15a..107f3a9c5d 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml @@ -29,7 +29,7 @@ org.apache.hadoop - hadoop-core + hadoop-client ${hadoop.version} provided diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml index 2baeb54cc4..f6f5f36683 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml @@ -29,7 +29,7 @@ org.apache.hadoop - hadoop-core + hadoop-client ${hadoop.version} provided diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml index fdce1c0b6e..3614db69e1 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml @@ -35,7 +35,7 @@ org.apache.hadoop - hadoop-core + hadoop-client ${hadoop.version} test diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml index bcf7bb4bc4..afdeced3df 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml @@ -15,7 +15,7 @@ org.apache.hadoop - hadoop-core + hadoop-client ${hadoop.version} provided diff --git a/parquet-hive/parquet-hive-storage-handler/pom.xml b/parquet-hive/parquet-hive-storage-handler/pom.xml index 68c39a990c..2deaa08cfa 100644 --- a/parquet-hive/parquet-hive-storage-handler/pom.xml +++ b/parquet-hive/parquet-hive-storage-handler/pom.xml @@ -42,7 +42,7 @@ org.apache.hadoop - hadoop-core + hadoop-client ${hadoop.version} provided diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml index 144761ec42..bfb28087e2 100644 --- a/parquet-pig/pom.xml +++ b/parquet-pig/pom.xml @@ -36,7 +36,8 @@ org.apache.pig pig - 0.11.1 + ${pig.version} + ${pig.classifier} provided diff --git a/parquet-test-hadoop2/pom.xml b/parquet-test-hadoop2/pom.xml index 58c8d6f3aa..64dbced06e 100644 --- a/parquet-test-hadoop2/pom.xml +++ b/parquet-test-hadoop2/pom.xml @@ -40,7 +40,7 @@ org.apache.hadoop hadoop-client - 2.0.3-alpha + ${hadoop.version} test diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index d26d6c112a..ae02c8c023 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -81,7 +81,8 @@ org.apache.pig pig - 0.11.1 + ${pig.version} + ${pig.classifier} provided diff --git a/pom.xml b/pom.xml index 7b2e59fa65..6eacf657eb 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,9 @@ 1.9.11 org.codehaus.jackson parquet - 1.1.0 + 2.3.0 + 0.11.1 + h2 2.5.3 2.1.0 1.2.17 @@ -198,7 +200,7 @@ check - verify + none enforce