Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b6f096e
Use ByteBuf-based api to read magic.
gerashegalov Jan 8, 2014
89784cc
Reading file metadata using zero-copy API
gerashegalov Jan 10, 2014
1deb34a
Reading chunk using zero-copy API
dsy88 May 22, 2014
930b93b
Add ByteBufferInputStream and modify Chunk to consume ByteBuffer instead
dsy88 May 29, 2014
918efa7
Read from ByteBuffer instead of ByteArray to avoid unnecessary array …
dsy88 Jun 2, 2014
4a99844
Using Writable Channel to replace write to OutputStream one by one.
dsy88 Jun 17, 2014
a428a29
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Jun 20, 2014
4da7130
Add original readIntLittleEndian function to keep compatible with pre…
dsy88 Jun 20, 2014
80a7351
Add a Hadoop compatible layer to abstract away the zero copy API and old
dsy88 Jun 23, 2014
6b7ea00
Move CompatibilityUtil to parquet.hadoop.util.
dsy88 Jun 25, 2014
c1a1637
Implement FSDISTransport in Compatible layer.
dsy88 Jul 3, 2014
fb4cc9c
Make BytePacker consume ByteBuffer directly.
dsy88 Jul 8, 2014
bfcb7d4
disable enforcer to pass build.
dsy88 Jul 2, 2014
bd6b60b
Merge pull request #1 from apache/master
dsy88 Jul 31, 2014
8a3b36f
remove some unncessary codes.
dsy88 Jul 24, 2014
4cd926f
Merge remote-tracking branch 'origin/master' into ByteBufferRead
dsy88 Aug 5, 2014
8f4c9b5
fix a bug in equals in ByteBuffer Binary with offset and length
dsy88 Aug 5, 2014
b139059
enable enforcer check.
dsy88 Aug 15, 2014
df7097f
Merge b1390597eeb9a9d4c4e34fdaa03129b706555349 into 647b8a70f9b7c94ca…
dsy88 Sep 4, 2014
ee060fc
Update Snappy Codec to implement DirectDecompressionCodec interface
parthchandra Jul 8, 2014
ccefe49
Make a copy of Min and Max values for BinaryStatistics so that direct…
parthchandra Jul 23, 2014
f012424
Remove Zero Copy read path while reading footers
parthchandra Aug 28, 2014
706617f
Update modules to build with Hadoop 2 jars
parthchandra Sep 4, 2014
7186133
Disable enforcer to allow build to complete
parthchandra Sep 4, 2014
de025ed
Use ByteBuffers in the Write path. Allow callers to pass in an alloca…
parthchandra Aug 20, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public interface ColumnWriteStore {
*/
abstract public void flush();

/**
* Close the related output stream and release any resources
*/
abstract public void close();

}
6 changes: 6 additions & 0 deletions parquet-column/src/main/java/parquet/column/ColumnWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public interface ColumnWriter {
*/
void flush();

/**
* Close the underlying store. This should be called when there are no
* more data to be written.
*/
void close();

/**
* used to decide when to write a page or row group
* @return the number of bytes of memory used to buffer the current data
Expand Down
50 changes: 29 additions & 21 deletions parquet-column/src/main/java/parquet/column/ParquetProperties.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package parquet.column;

import parquet.bytes.ByteBufferAllocator;
import parquet.bytes.BytesUtils;
import parquet.bytes.HeapByteBufferAllocator;
import parquet.column.values.ValuesWriter;
import parquet.column.values.boundedint.DevNullValuesWriter;
import parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
Expand Down Expand Up @@ -47,85 +49,91 @@ public static WriterVersion fromString(String name) {
private final int dictionaryPageSizeThreshold;
private final WriterVersion writerVersion;
private final boolean enableDictionary;
private ByteBufferAllocator allocator;

public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
this(dictPageSize, writerVersion, enableDict, new HeapByteBufferAllocator());
}

public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict, ByteBufferAllocator allocator) {
this.dictionaryPageSizeThreshold = dictPageSize;
this.writerVersion = writerVersion;
this.enableDictionary = enableDict;
this.allocator=allocator;
}

public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) {
public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, ByteBufferAllocator allocator) {
if (maxLevel == 0) {
return new DevNullValuesWriter();
} else {
return new RunLengthBitPackingHybridValuesWriter(
BytesUtils.getWidthFromMaxInt(maxLevel), initialSizePerCol);
BytesUtils.getWidthFromMaxInt(maxLevel), initialSizePerCol, allocator!=null?allocator:new HeapByteBufferAllocator());
}
}

public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
switch (path.getType()) {
case BOOLEAN:
if(writerVersion == WriterVersion.PARQUET_1_0) {
return new BooleanPlainValuesWriter();
return new BooleanPlainValuesWriter(this.allocator);
} else if (writerVersion == WriterVersion.PARQUET_2_0) {
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, this.allocator);
}
break;
case BINARY:
if(enableDictionary) {
return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, this.allocator);
} else {
if (writerVersion == WriterVersion.PARQUET_1_0) {
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, this.allocator);
} else if (writerVersion == WriterVersion.PARQUET_2_0) {
return new DeltaByteArrayWriter(initialSizePerCol);
return new DeltaByteArrayWriter(initialSizePerCol, this.allocator);
}
}
break;
case INT32:
if(enableDictionary) {
return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, this.allocator);
} else {
if(writerVersion == WriterVersion.PARQUET_1_0) {
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, this.allocator);
} else if(writerVersion == WriterVersion.PARQUET_2_0) {
return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, this.allocator);
}
}
break;
case INT64:
if(enableDictionary) {
return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, this.allocator);
} else {
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, this.allocator);
}
case INT96:
if (enableDictionary) {
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, 12);
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, 12, this.allocator);
} else {
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, this.allocator);
}
case DOUBLE:
if(enableDictionary) {
return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, this.allocator);
} else {
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, this.allocator);
}
case FLOAT:
if(enableDictionary) {
return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, this.allocator);
} else {
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, this.allocator);
}
case FIXED_LEN_BYTE_ARRAY:
if (enableDictionary && (writerVersion == WriterVersion.PARQUET_2_0)) {
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, path.getTypeLength());
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, path.getTypeLength(), this.allocator);
} else {
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, this.allocator);
}
default:
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, this.allocator);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static parquet.Preconditions.checkNotNull;

import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -518,16 +519,16 @@ private void readPage() {
this.pageValueCount = page.getValueCount();
this.endOfPageValueCount = readValues + pageValueCount;
try {
byte[] bytes = page.getBytes().toByteArray();
if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
ByteBuffer byteBuf = page.getBytes().toByteBuffer();
if (DEBUG) LOG.debug("page size " + page.getBytes().size() + " bytes and " + pageValueCount + " records");
if (DEBUG) LOG.debug("reading repetition levels at 0");
repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0);
repetitionLevelColumn.initFromPage(pageValueCount, byteBuf, 0);
int next = repetitionLevelColumn.getNextOffset();
if (DEBUG) LOG.debug("reading definition levels at " + next);
definitionLevelColumn.initFromPage(pageValueCount, bytes, next);
definitionLevelColumn.initFromPage(pageValueCount, byteBuf, next);
next = definitionLevelColumn.getNextOffset();
if (DEBUG) LOG.debug("reading data at " + next);
dataColumn.initFromPage(pageValueCount, bytes, next);
dataColumn.initFromPage(pageValueCount, byteBuf, next);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public void flush() {
}
}

@Override
public void close() {
Collection<ColumnWriterImpl> values = columns.values();
for (ColumnWriterImpl memColumn : values) {
memColumn.close();
}
}

public String memUsageString() {
StringBuilder b = new StringBuilder("Store {\n");
Collection<ColumnWriterImpl> values = columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,16 @@ public ColumnWriterImpl(
this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold,
writerVersion,
enableDictionary,
pageWriter.getAllocator());
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(),
initialSizePerCol,
pageWriter.getAllocator());
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(),
initialSizePerCol,
pageWriter.getAllocator());
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
}

Expand Down Expand Up @@ -247,6 +254,15 @@ public void flush() {
}
}

@Override
public void close() {
flush();
// Close the Values writers.
repetitionLevelColumn.close();
definitionLevelColumn.close();
dataColumn.close();
}

@Override
public long getBufferedSizeInMemory() {
return repetitionLevelColumn.getBufferedSize()
Expand Down
19 changes: 19 additions & 0 deletions parquet-column/src/main/java/parquet/column/page/PageWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;

import parquet.bytes.ByteBufferAllocator;
import parquet.bytes.BytesInput;
import parquet.column.Encoding;
import parquet.column.statistics.Statistics;
Expand Down Expand Up @@ -71,4 +72,22 @@ public interface PageWriter {

public abstract String memUsageString(String prefix);

/**
* Gets the associated ByteBuffer allocator. The allocator is passed in all the way down to
* the Column ValuesWriter(s).
* @return
*/
ByteBufferAllocator getAllocator();

/**
* Reset the page writer. Reset/reallocate any resources.
*/
public void reset();


/**
* Close the page writer. Free any resources.
*/
public void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public String toString() {
}

public void updateStats(Binary min_value, Binary max_value) {
if (min.compareTo(min_value) > 0) { min = min_value; }
if (max.compareTo(max_value) < 0) { max = max_value; }
if (min.compareTo(min_value) > 0) { min = Binary.fromByteArray(min_value.getBytes()); }
if (max.compareTo(max_value) < 0) { max = Binary.fromByteArray(max_value.getBytes()); }
}

public void initializeStats(Binary min_value, Binary max_value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import parquet.io.ParquetDecodingException;
import parquet.io.api.Binary;
import java.nio.ByteBuffer;

/**
* Base class to implement an encoding for a given column type.
*
* A ValuesReader is provided with a page (byte-array) and is responsible
* A ValuesReader is provided with a page (byte-buffer) and is responsible
* for deserializing the primitive values stored in that page.
*
* Given that pages are homogeneous (store only a single type), typical subclasses
Expand Down Expand Up @@ -55,6 +56,11 @@ public abstract class ValuesReader {
*
* @throws IOException
*/
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;

/*
* Compatitble Interface.
*/
public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public abstract class ValuesWriter {
*/
public abstract void reset();

/**
* Called to close the values writer. Any output stream is closed and can no longer be used.
* All resources are released.
*/
public abstract void close();

/**
* @return the dictionary page or null if not dictionary based
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static parquet.column.values.bitpacking.BitPacking.createBitPackingReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import parquet.Log;
import parquet.bytes.BytesUtils;
import parquet.bytes.ByteBufferInputStream;
import parquet.column.values.ValuesReader;
import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
import parquet.io.ParquetDecodingException;
Expand All @@ -36,7 +37,7 @@
public class BitPackingValuesReader extends ValuesReader {
private static final Log LOG = Log.getLog(BitPackingValuesReader.class);

private ByteArrayInputStream in;
private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
private final int bitsPerValue;
private int nextOffset;
Expand Down Expand Up @@ -66,15 +67,20 @@ public int readInteger() {
* @see parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
*/
@Override
public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
this.in = new ByteArrayInputStream(in, offset, length);
this.in = new ByteBufferInputStream(in.duplicate(), offset, length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
}

@Override
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

@Override
public int getNextOffset() {
return nextOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;

import parquet.bytes.ByteBufferAllocator;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.Encoding;
Expand All @@ -39,13 +40,15 @@ public class BitPackingValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream out;
private BitPackingWriter bitPackingWriter;
private int bitsPerValue;
private ByteBufferAllocator allocator;

/**
* @param bound the maximum value stored by this column
*/
public BitPackingValuesWriter(int bound, int initialCapacity) {
public BitPackingValuesWriter(int bound, int initialCapacity, ByteBufferAllocator allocator) {
this.bitsPerValue = getWidthFromMaxInt(bound);
this.out = new CapacityByteArrayOutputStream(initialCapacity);
this.allocator=allocator;
this.out = new CapacityByteArrayOutputStream(initialCapacity, this.allocator);
init();
}

Expand Down Expand Up @@ -99,6 +102,11 @@ public void reset() {
init();
}

@Override
public void close() {
out.close();
}

/**
* {@inheritDoc}
* @see parquet.column.values.ValuesWriter#getAllocatedSize()
Expand Down
Loading