Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b6f096e
Use ByteBuf-based api to read magic.
gerashegalov Jan 8, 2014
89784cc
Reading file metadata using zero-copy API
gerashegalov Jan 10, 2014
1deb34a
Reading chunk using zero-copy API
dsy88 May 22, 2014
930b93b
Add ByteBufferInputStream and modify Chunk to consume ByteBuffer instead
dsy88 May 29, 2014
918efa7
Read from ByteBuffer instead of ByteArray to avoid unnecessary array …
dsy88 Jun 2, 2014
4a99844
Using Writable Channel to replace write to OutputStream one by one.
dsy88 Jun 17, 2014
a428a29
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Jun 20, 2014
4da7130
Add original readIntLittleEndian function to keep compatible with pre…
dsy88 Jun 20, 2014
80a7351
Add a Hadoop compatible layer to abstract away the zero copy API and old
dsy88 Jun 23, 2014
6b7ea00
Move CompatibilityUtil to parquet.hadoop.util.
dsy88 Jun 25, 2014
c1a1637
Implement FSDISTransport in Compatible layer.
dsy88 Jul 3, 2014
fb4cc9c
Make BytePacker consume ByteBuffer directly.
dsy88 Jul 8, 2014
bfcb7d4
disable enforcer to pass build.
dsy88 Jul 2, 2014
bd6b60b
Merge pull request #1 from apache/master
dsy88 Jul 31, 2014
8a3b36f
remove some unncessary codes.
dsy88 Jul 24, 2014
4cd926f
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Aug 5, 2014
8f4c9b5
fix a bug in equals in ByteBuffer Binary with offset and length
dsy88 Aug 5, 2014
b139059
enable enforcer check.
dsy88 Aug 15, 2014
8e04e79
Address tsdeng's comments
dsy88 Sep 13, 2014
42276c1
Merge pull request #2 from apache/master
dsy88 Sep 13, 2014
00b7c7c
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Sep 13, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static parquet.Preconditions.checkNotNull;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -518,16 +519,16 @@ private void readPage() {
this.pageValueCount = page.getValueCount();
this.endOfPageValueCount = readValues + pageValueCount;
try {
byte[] bytes = page.getBytes().toByteArray();
if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
ByteBuffer byteBuf = page.getBytes().toByteBuffer();
if (DEBUG) LOG.debug("page size " + page.getBytes().size() + " bytes and " + pageValueCount + " records");
if (DEBUG) LOG.debug("reading repetition levels at 0");
repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0);
repetitionLevelColumn.initFromPage(pageValueCount, byteBuf, 0);
int next = repetitionLevelColumn.getNextOffset();
if (DEBUG) LOG.debug("reading definition levels at " + next);
definitionLevelColumn.initFromPage(pageValueCount, bytes, next);
definitionLevelColumn.initFromPage(pageValueCount, byteBuf, next);
next = definitionLevelColumn.getNextOffset();
if (DEBUG) LOG.debug("reading data at " + next);
dataColumn.initFromPage(pageValueCount, bytes, next);
dataColumn.initFromPage(pageValueCount, byteBuf, next);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import parquet.io.ParquetDecodingException;
import parquet.io.api.Binary;
import java.nio.ByteBuffer;

/**
* Base class to implement an encoding for a given column type.
*
* A ValuesReader is provided with a page (byte-array) and is responsible
* A ValuesReader is provided with a page (byte-buffer) and is responsible
* for deserializing the primitive values stored in that page.
*
* Given that pages are homogeneous (store only a single type), typical subclasses
Expand Down Expand Up @@ -55,6 +56,11 @@ public abstract class ValuesReader {
*
* @throws IOException
*/
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;

/**
* Compatitble Interface.
*/
public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static parquet.column.values.bitpacking.BitPacking.createBitPackingReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
import parquet.bytes.ByteBufferInputStream;
import parquet.column.values.ValuesReader;
import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
import parquet.io.ParquetDecodingException;
Expand All @@ -36,7 +37,7 @@
public class BitPackingValuesReader extends ValuesReader {
private static final Log LOG = Log.getLog(BitPackingValuesReader.class);

private ByteArrayInputStream in;
private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
private final int bitsPerValue;
private int nextOffset;
Expand Down Expand Up @@ -66,15 +67,20 @@ public int readInteger() {
* @see parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
*/
@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
this.in = new ByteArrayInputStream(in, offset, length);
this.in = new ByteBufferInputStream(in.duplicate(), offset, length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
Expand All @@ -31,32 +32,37 @@ public class ByteBitPackingValuesReader extends ValuesReader {
private final BytePacker packer;
private final int[] decoded = new int[VALUES_AT_A_TIME];
private int decodedPosition = VALUES_AT_A_TIME - 1;
private byte[] encoded;
private ByteBuffer encoded;
private int encodedPos;
private int nextOffset;

public ByteBitPackingValuesReader(int bound, Packer packer) {
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
this.packer = packer.newBytePacker(bitWidth);
}

@Override
public int readInteger() {
++ decodedPosition;
if (decodedPosition == decoded.length) {
if (encodedPos + bitWidth > encoded.length) {
packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0);
encoded.position(encodedPos);
if (encodedPos + bitWidth > encoded.limit()) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a tmp byte array, can we call slice and use position and limit to create a sub view of the ByteBuffer directly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temp byte array is used to fill 0 at the end of bytebuffer. I don't think using the previous byte memory could do it.

packer.unpack8Values(ByteBuffer.wrap(tempEncode), 0, decoded, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment in the code explaining the special case as the BB api makes the intent less clear.

} else {
packer.unpack8Values(encoded, encodedPos, decoded, 0);
}
}
encodedPos += bitWidth;
decodedPosition = 0;
}
return decoded[decodedPosition];
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset)
public void initFromPage(int valueCount, ByteBuffer page, int offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this is all working we should provide an api compatibility layer in case other libraries are using those.
For example here:

public void initFromPage(int valueCount, byte[] page, int offset) {
  this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

throws IOException {
int effectiveBitLength = valueCount * bitWidth;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
Expand All @@ -67,6 +73,11 @@ public void initFromPage(int valueCount, byte[] page, int offset)
this.nextOffset = offset + length;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package parquet.column.values.boundedint;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.io.ParquetDecodingException;

class BitReader {
private int currentByte = 0;
private int currentPosition = 8;
private byte[] buf;
private ByteBuffer buf;
private int currentBufferPosition = 0;
private static final int[] byteGetValueMask = new int[8];
private static final int[] readMask = new int[32];
Expand All @@ -47,7 +48,7 @@ class BitReader {
* The array is not copied, so must not be mutated during the course of
* reading.
*/
public void prepare(byte[] buf, int offset, int length) {
public void prepare(ByteBuffer buf, int offset, int length) {
this.buf = buf;
this.endBufferPosistion = offset + length;
currentByte = 0;
Expand Down Expand Up @@ -84,7 +85,7 @@ public int readNBitInteger(int bitsPerValue) {

private int getNextByte() {
if (currentBufferPosition < endBufferPosistion) {
return buf[currentBufferPosition++] & 0xFF;
return buf.get(currentBufferPosition++) & 0xFF;
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static parquet.Log.DEBUG;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
Expand Down Expand Up @@ -67,8 +68,8 @@ public int readInteger() {
// bytes would have to be serialized). This is the flip-side
// to BoundedIntColumnWriter.writeData(BytesOutput)
@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " ");
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in.get(offset) + " " + in.get(offset + 1) + " " + in.get(offset + 2) + " " + in.get(offset + 3) + " ");
int totalBytes = BytesUtils.readIntLittleEndian(in, offset);
if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes");
currentValueCt = 0;
Expand All @@ -78,6 +79,11 @@ public void initFromPage(int valueCount, byte[] in, int offset) throws IOExcepti
this.nextOffset = offset + totalBytes + 4;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return this.nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.column.values.boundedint;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.column.values.ValuesReader;

Expand All @@ -33,10 +34,15 @@ public int readInteger() {
}

@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
this.nextOffset = offset;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import parquet.column.values.bitpacking.BytePacker;
import parquet.column.values.bitpacking.Packer;
import parquet.io.ParquetDecodingException;
import parquet.bytes.ByteBufferInputStream;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

/**
* Read values written by {@link DeltaBinaryPackingValuesWriter}
Expand All @@ -37,7 +39,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesRead;
private int minDeltaInCurrentBlock;
private byte[] page;
private ByteBuffer page;
/**
* stores the decoded values including the first value which is written to the header
*/
Expand All @@ -47,7 +49,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
* when data is not aligned to mini block, which means padding 0s are in the buffer
*/
private int valuesBuffered;
private ByteArrayInputStream in;
private ByteBufferInputStream in;
private int nextOffset;
private DeltaBinaryPackingConfig config;
private int[] bitWidths;
Expand All @@ -61,8 +63,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
* @throws IOException
*/
@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
in = new ByteArrayInputStream(page, offset, page.length - offset);
public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
in = new ByteBufferInputStream(page.duplicate(), offset, page.limit() - offset);
this.config = DeltaBinaryPackingConfig.readConfig(in);
this.page = page;
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
Expand All @@ -75,7 +77,12 @@ public void initFromPage(int valueCount, byte[] page, int offset) throws IOExcep
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
loadNewBlockToBuffer();
}
this.nextOffset = page.length - in.available();
this.nextOffset = page.limit() - in.available();
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
Expand Down Expand Up @@ -148,7 +155,7 @@ private void unpackMiniBlock(BytePacker packer) {

private void unpack8Values(BytePacker packer) {
//calculate the pos because the packer api uses array not stream
int pos = page.length - in.available();
int pos = page.limit() - in.available();
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
this.valuesBuffered += 8;
//sync the pos in stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static parquet.Log.DEBUG;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.column.values.ValuesReader;
Expand All @@ -34,29 +35,34 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {

private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class);
private ValuesReader lengthReader;
private byte[] in;
private ByteBuffer in;
private int offset;

public DeltaLengthByteArrayValuesReader() {
this.lengthReader = new DeltaBinaryPackingValuesReader();
}

@Override
public void initFromPage(int valueCount, byte[] in, int offset)
public void initFromPage(int valueCount, ByteBuffer in, int offset)
throws IOException {
if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
lengthReader.initFromPage(valueCount, in, offset);
offset = lengthReader.getNextOffset();
this.in = in;
this.offset = offset;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public Binary readBytes() {
int length = lengthReader.readInteger();
int start = offset;
offset = start + length;
return Binary.fromByteArray(in, start, length);
return Binary.fromByteBuffer(in, start, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.column.values.deltastrings;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.column.values.ValuesReader;
import parquet.column.values.delta.DeltaBinaryPackingValuesReader;
Expand All @@ -41,12 +42,17 @@ public DeltaByteArrayReader() {
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset)
public void initFromPage(int valueCount, ByteBuffer page, int offset)
throws IOException {
prefixLengthReader.initFromPage(valueCount, page, offset);
int next = prefixLengthReader.getNextOffset();
suffixReader.initFromPage(valueCount, page, next);
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public void skip() {
Expand Down
Loading