Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b6f096e
Use ByteBuf-based api to read magic.
gerashegalov Jan 8, 2014
89784cc
Reading file metadata using zero-copy API
gerashegalov Jan 10, 2014
1deb34a
Reading chunk using zero-copy API
dsy88 May 22, 2014
930b93b
Add ByteBufferInputStream and modify Chunk to consume ByteBuffer instead
dsy88 May 29, 2014
918efa7
Read from ByteBuffer instead of ByteArray to avoid unnecessary array …
dsy88 Jun 2, 2014
4a99844
Using Writable Channel to replace write to OutputStream one by one.
dsy88 Jun 17, 2014
a428a29
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Jun 20, 2014
4da7130
Add original readIntLittleEndian function to keep compatible with pre…
dsy88 Jun 20, 2014
80a7351
Add a Hadoop compatible layer to abstract away the zero copy API and old
dsy88 Jun 23, 2014
6b7ea00
Move CompatibilityUtil to parquet.hadoop.util.
dsy88 Jun 25, 2014
c1a1637
Implement FSDISTransport in Compatible layer.
dsy88 Jul 3, 2014
fb4cc9c
Make BytePacker consume ByteBuffer directly.
dsy88 Jul 8, 2014
bfcb7d4
disable enforcer to pass build.
dsy88 Jul 2, 2014
bd6b60b
Merge pull request #1 from apache/master
dsy88 Jul 31, 2014
8a3b36f
remove some unncessary codes.
dsy88 Jul 24, 2014
4cd926f
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Aug 5, 2014
8f4c9b5
fix a bug in equals in ByteBuffer Binary with offset and length
dsy88 Aug 5, 2014
b139059
enable enforcer check.
dsy88 Aug 15, 2014
df7097f
Merge b1390597eeb9a9d4c4e34fdaa03129b706555349 into 647b8a70f9b7c94ca…
dsy88 Sep 4, 2014
ee060fc
Update Snappy Codec to implement DirectDecompressionCodec interface
parthchandra Jul 8, 2014
ccefe49
Make a copy of Min and Max values for BinaryStatistics so that direct…
parthchandra Jul 23, 2014
f012424
Remove Zero Copy read path while reading footers
parthchandra Aug 28, 2014
706617f
Update modules to build with Hadoop 2 jars
parthchandra Sep 4, 2014
7186133
Disable enforcer to allow build to complete
parthchandra Sep 4, 2014
31ec3f9
Update Binary to make a copy of data for initial statistics.
jacques-n Nov 13, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static parquet.Preconditions.checkNotNull;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -518,16 +519,16 @@ private void readPage() {
this.pageValueCount = page.getValueCount();
this.endOfPageValueCount = readValues + pageValueCount;
try {
byte[] bytes = page.getBytes().toByteArray();
if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
ByteBuffer byteBuf = page.getBytes().toByteBuffer();
if (DEBUG) LOG.debug("page size " + page.getBytes().size() + " bytes and " + pageValueCount + " records");
if (DEBUG) LOG.debug("reading repetition levels at 0");
repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0);
repetitionLevelColumn.initFromPage(pageValueCount, byteBuf, 0);
int next = repetitionLevelColumn.getNextOffset();
if (DEBUG) LOG.debug("reading definition levels at " + next);
definitionLevelColumn.initFromPage(pageValueCount, bytes, next);
definitionLevelColumn.initFromPage(pageValueCount, byteBuf, next);
next = definitionLevelColumn.getNextOffset();
if (DEBUG) LOG.debug("reading data at " + next);
dataColumn.initFromPage(pageValueCount, bytes, next);
dataColumn.initFromPage(pageValueCount, byteBuf, next);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public String toString() {
}

public void updateStats(Binary min_value, Binary max_value) {
if (min.compareTo(min_value) > 0) { min = min_value; }
if (max.compareTo(max_value) < 0) { max = max_value; }
if (min.compareTo(min_value) > 0) { min = Binary.fromByteArray(min_value.getBytes()); }
if (max.compareTo(max_value) < 0) { max = Binary.fromByteArray(max_value.getBytes()); }
}

public void initializeStats(Binary min_value, Binary max_value) {
min = min_value;
max = max_value;
min = Binary.fromByteArray(min_value.getBytes());
max = Binary.fromByteArray(max_value.getBytes());
this.markAsNotEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import parquet.io.ParquetDecodingException;
import parquet.io.api.Binary;
import java.nio.ByteBuffer;

/**
* Base class to implement an encoding for a given column type.
*
* A ValuesReader is provided with a page (byte-array) and is responsible
* A ValuesReader is provided with a page (byte-buffer) and is responsible
* for deserializing the primitive values stored in that page.
*
* Given that pages are homogeneous (store only a single type), typical subclasses
Expand Down Expand Up @@ -55,6 +56,11 @@ public abstract class ValuesReader {
*
* @throws IOException
*/
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;

/*
* Compatitble Interface.
*/
public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static parquet.column.values.bitpacking.BitPacking.createBitPackingReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
import parquet.bytes.ByteBufferInputStream;
import parquet.column.values.ValuesReader;
import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
import parquet.io.ParquetDecodingException;
Expand All @@ -36,7 +37,7 @@
public class BitPackingValuesReader extends ValuesReader {
private static final Log LOG = Log.getLog(BitPackingValuesReader.class);

private ByteArrayInputStream in;
private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
private final int bitsPerValue;
private int nextOffset;
Expand Down Expand Up @@ -66,15 +67,20 @@ public int readInteger() {
* @see parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
*/
@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
this.in = new ByteArrayInputStream(in, offset, length);
this.in = new ByteBufferInputStream(in.duplicate(), offset, length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
Expand All @@ -31,32 +32,37 @@ public class ByteBitPackingValuesReader extends ValuesReader {
private final BytePacker packer;
private final int[] decoded = new int[VALUES_AT_A_TIME];
private int decodedPosition = VALUES_AT_A_TIME - 1;
private byte[] encoded;
private ByteBuffer encoded;
private int encodedPos;
private int nextOffset;

public ByteBitPackingValuesReader(int bound, Packer packer) {
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
this.packer = packer.newBytePacker(bitWidth);
}

@Override
public int readInteger() {
++ decodedPosition;
if (decodedPosition == decoded.length) {
if (encodedPos + bitWidth > encoded.length) {
packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0);
encoded.position(encodedPos);
if (encodedPos + bitWidth > encoded.limit()) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
packer.unpack8Values(ByteBuffer.wrap(tempEncode), 0, decoded, 0);
} else {
packer.unpack8Values(encoded, encodedPos, decoded, 0);
}
}
encodedPos += bitWidth;
decodedPosition = 0;
}
return decoded[decodedPosition];
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset)
public void initFromPage(int valueCount, ByteBuffer page, int offset)
throws IOException {
int effectiveBitLength = valueCount * bitWidth;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
Expand All @@ -67,6 +73,11 @@ public void initFromPage(int valueCount, byte[] page, int offset)
this.nextOffset = offset + length;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package parquet.column.values.boundedint;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.io.ParquetDecodingException;

class BitReader {
private int currentByte = 0;
private int currentPosition = 8;
private byte[] buf;
private ByteBuffer buf;
private int currentBufferPosition = 0;
private static final int[] byteGetValueMask = new int[8];
private static final int[] readMask = new int[32];
Expand All @@ -47,7 +48,7 @@ class BitReader {
* The array is not copied, so must not be mutated during the course of
* reading.
*/
public void prepare(byte[] buf, int offset, int length) {
public void prepare(ByteBuffer buf, int offset, int length) {
this.buf = buf;
this.endBufferPosistion = offset + length;
currentByte = 0;
Expand Down Expand Up @@ -84,7 +85,7 @@ public int readNBitInteger(int bitsPerValue) {

private int getNextByte() {
if (currentBufferPosition < endBufferPosistion) {
return buf[currentBufferPosition++] & 0xFF;
return buf.get(currentBufferPosition++) & 0xFF;
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static parquet.Log.DEBUG;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
Expand Down Expand Up @@ -67,8 +68,8 @@ public int readInteger() {
// bytes would have to be serialized). This is the flip-side
// to BoundedIntColumnWriter.writeData(BytesOutput)
@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " ");
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in.get(offset) + " " + in.get(offset + 1) + " " + in.get(offset + 2) + " " + in.get(offset + 3) + " ");
int totalBytes = BytesUtils.readIntLittleEndian(in, offset);
if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes");
currentValueCt = 0;
Expand All @@ -78,6 +79,11 @@ public void initFromPage(int valueCount, byte[] in, int offset) throws IOExcepti
this.nextOffset = offset + totalBytes + 4;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return this.nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.column.values.boundedint;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.column.values.ValuesReader;

Expand All @@ -33,10 +34,15 @@ public int readInteger() {
}

@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
this.nextOffset = offset;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import parquet.column.values.bitpacking.BytePacker;
import parquet.column.values.bitpacking.Packer;
import parquet.io.ParquetDecodingException;
import parquet.bytes.ByteBufferInputStream;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

/**
* Read values written by {@link DeltaBinaryPackingValuesWriter}
Expand All @@ -37,7 +39,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesRead;
private int minDeltaInCurrentBlock;
private byte[] page;
private ByteBuffer page;
/**
* stores the decoded values including the first value which is written to the header
*/
Expand All @@ -47,7 +49,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
* when data is not aligned to mini block, which means padding 0s are in the buffer
*/
private int valuesBuffered;
private ByteArrayInputStream in;
private ByteBufferInputStream in;
private int nextOffset;
private DeltaBinaryPackingConfig config;
private int[] bitWidths;
Expand All @@ -61,8 +63,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
* @throws IOException
*/
@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
in = new ByteArrayInputStream(page, offset, page.length - offset);
public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
in = new ByteBufferInputStream(page.duplicate(), offset, page.limit() - offset);
this.config = DeltaBinaryPackingConfig.readConfig(in);
this.page = page;
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
Expand All @@ -75,7 +77,12 @@ public void initFromPage(int valueCount, byte[] page, int offset) throws IOExcep
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
loadNewBlockToBuffer();
}
this.nextOffset = page.length - in.available();
this.nextOffset = page.limit() - in.available();
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
Expand Down Expand Up @@ -148,7 +155,7 @@ private void unpackMiniBlock(BytePacker packer) {

private void unpack8Values(BytePacker packer) {
//calculate the pos because the packer api uses array not stream
int pos = page.length - in.available();
int pos = page.limit() - in.available();
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
this.valuesBuffered += 8;
//sync the pos in stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static parquet.Log.DEBUG;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.column.values.ValuesReader;
Expand All @@ -34,29 +35,34 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {

private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class);
private ValuesReader lengthReader;
private byte[] in;
private ByteBuffer in;
private int offset;

public DeltaLengthByteArrayValuesReader() {
this.lengthReader = new DeltaBinaryPackingValuesReader();
}

@Override
public void initFromPage(int valueCount, byte[] in, int offset)
public void initFromPage(int valueCount, ByteBuffer in, int offset)
throws IOException {
if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
lengthReader.initFromPage(valueCount, in, offset);
offset = lengthReader.getNextOffset();
this.in = in;
this.offset = offset;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public Binary readBytes() {
int length = lengthReader.readInteger();
int start = offset;
offset = start + length;
return Binary.fromByteArray(in, start, length);
return Binary.fromByteBuffer(in, start, length);
}

@Override
Expand Down
Loading