Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
686d598
Use ByteBuf-based api to read magic.
gerashegalov Jan 8, 2014
2d32f49
Reading file metadata using zero-copy API
gerashegalov Jan 10, 2014
df1ad93
Reading chunk using zero-copy API
dsy88 May 22, 2014
53500d4
Add ByteBufferInputStream and modify Chunk to consume ByteBuffer instead
dsy88 May 29, 2014
36aba13
Read from ByteBuffer instead of ByteArray to avoid unnecessary array …
dsy88 Jun 2, 2014
7ac1df5
Using Writable Channel to replace write to OutputStream one by one.
dsy88 Jun 17, 2014
4f399aa
Add original readIntLittleEndian function to keep compatible with pre…
dsy88 Jun 20, 2014
970fc8b
Add a Hadoop compatible layer to abstract away the zero copy API and old
dsy88 Jun 23, 2014
47b177d
Move CompatibilityUtil to parquet.hadoop.util.
dsy88 Jun 25, 2014
01c2ae5
Implement FSDISTransport in Compatible layer.
dsy88 Jul 3, 2014
a7bcfbb
Make BytePacker consume ByteBuffer directly.
dsy88 Jul 8, 2014
26dc879
disable enforcer to pass build.
dsy88 Jul 2, 2014
016e89c
remove some unncessary codes.
dsy88 Jul 24, 2014
912cbaf
fix a bug in equals in ByteBuffer Binary with offset and length
dsy88 Aug 5, 2014
861e541
enable enforcer check.
dsy88 Aug 15, 2014
8be638a
Address tsdeng's comments
dsy88 Sep 13, 2014
0d22908
merging with master
adeneche Feb 10, 2015
5bc8774
Update Snappy Codec to implement DirectDecompressionCodec interface
parthchandra Jul 8, 2014
7bc2a4d
Make a copy of Min and Max values for BinaryStatistics so that direct…
parthchandra Jul 23, 2014
2c2b183
Remove Zero Copy read path while reading footers
parthchandra Aug 28, 2014
8143174
update pig.version to build with Hadoop 2 jars
adeneche Feb 13, 2015
2187697
Update Binary to make a copy of data for initial statistics.
jacques-n Nov 13, 2014
35b10af
Use ByteBuffers in the Write path. Allow callers to pass in an alloca…
parthchandra Aug 20, 2014
e488924
after merge code cleanup
adeneche Feb 19, 2015
a6389db
Make constructor for PrimitiveType that takes decimalMetadata public.
StevenMPhillips May 12, 2014
98b99ea
Revert readFooter to not use ZeroCopy path.
parthchandra Jul 31, 2014
48cceef
Fix allocation in DictionaryValuesWriter
StevenMPhillips Feb 9, 2015
6943536
fixing bug related to testDictionaryError_419
adeneche Mar 4, 2015
e1df3b9
disabled enforcer and changed version to -drill
adeneche Mar 2, 2015
51cf2f1
cherry pick pull#188
rdblue Mar 2, 2015
c98ec2a
bumped version to 1.6.0rc3-drill-r0.1
adeneche Mar 30, 2015
173aa25
Set max preferred slab size to 16mb
jacques-n May 3, 2015
4a9dd28
update pom version
jacques-n May 3, 2015
9f22bd7
Make CodecFactory pluggable
jacques-n May 4, 2015
9bbc269
Update to 1.6.0rc3-drill-r0.3
jacques-n May 5, 2015
1bfa3a0
Merge branch 'master' into 1.6.0rc3-drill-r0.3-merge
jaltekruse Aug 26, 2015
2b8328b
I all of the tests are now passing after the merge.
jaltekruse Aug 31, 2015
864b011
Simplifying how buffer allocators are passed when creating ValuesWrit…
jaltekruse Aug 31, 2015
45cadee
Cleaning up code in Binary after merge.
jaltekruse Aug 31, 2015
ab54c4e
Moving classes out of the old packages.
jaltekruse Sep 1, 2015
1f4f504
WIP - addressing review comments
jaltekruse Sep 3, 2015
7e252f3
WIP - addressing review comments
jaltekruse Sep 3, 2015
23ad48e
WIP - addressing review comments
jaltekruse Sep 3, 2015
829af6f
WIP - getting rid of unnecessary copies in Binary.java
jaltekruse Sep 3, 2015
fddd4af
WIP - removing copies from the ByteBufferBasedBinary equals, compareT…
jaltekruse Sep 4, 2015
d40706b
Get rid of unnecessary calls to Bytebuffer.wrap(byte[]), as an interf…
jaltekruse Sep 4, 2015
35d8386
Move call to getBytes() on dictionaryPages to remove the need to cach…
jaltekruse Sep 4, 2015
705b864
Rename CapacityByteArrayOutputStream to CapacityByteBufferOutputStrea…
jaltekruse Sep 9, 2015
ebae775
Fix issue reading page data into an off-heap ByteBuffer
jaltekruse Sep 12, 2015
1971fc5
Fixes made while debugging drill unit tests
jaltekruse Sep 15, 2015
86317b0
Address review comments, make field in immutable ParquetProperties ob…
jaltekruse Sep 29, 2015
104a1d1
Remove test requiring a hard-coded binary file. This was actually a b…
jaltekruse Sep 29, 2015
fec4242
Address review comments - factoring out code in tests
jaltekruse Sep 30, 2015
6959db7
addressing review comments, avoiding unnecessary copies when creating…
jaltekruse Sep 30, 2015
29cc747
Factor out common code
jaltekruse Sep 30, 2015
8c6e4a9
Addressing review comments, moving code out of generated class into a…
jaltekruse Sep 30, 2015
0098b1c
Remove unused method
jaltekruse Sep 30, 2015
f0e31ec
revert small formatting and renaming changes, TODO make sure these re…
jaltekruse Sep 30, 2015
9dccb94
Add new method to turn BytesInput into an InputStream.
jaltekruse Oct 1, 2015
b1040a8
Remove code used to debug a test that was failing after the initial m…
jaltekruse Oct 1, 2015
e79684e
Review comments - fixing use of ParquetProperties and removing unused…
jaltekruse Oct 1, 2015
9fb65dd
Rename method to get a dictionary page to clarify that the dictionary…
jaltekruse Oct 1, 2015
ad58bbe
Addressing small review comments, unused imports, doc cleanup, etc.
jaltekruse Oct 1, 2015
d4819b4
remove methods now unneccesary as same implementation has been moved …
jaltekruse Oct 1, 2015
fdb689c
Remove unnecessary copy writing a Binary to an OutputStream if it is …
jaltekruse Oct 1, 2015
a793be8
Add closeQuietly method to convert checked IOExceptions from classle…
jaltekruse Oct 1, 2015
2e95915
Addressing minor review comments, comments out code, star import, for…
jaltekruse Oct 1, 2015
4c3195e
Turn back on SemVer
jaltekruse Oct 1, 2015
d5536b6
Restore original name of CapacityByteArrayOutputStream to keep compat…
jaltekruse Oct 2, 2015
f217e6a
Restore old interfaces
jaltekruse Oct 2, 2015
8f66e43
Create utility methods to transform checked exceptions to unchecked w…
jaltekruse Oct 5, 2015
2f1a6c7
Consolidate a little more code
jaltekruse Oct 5, 2015
da1b52a
Moving classes into parquet from Drill.
jaltekruse Oct 6, 2015
0496350
Add unit test for direct codec factory.
jaltekruse Oct 14, 2015
862eb13
Fix usage of old constructor in Thrift module that caused a compilati…
jaltekruse Oct 14, 2015
8ff878a
Addressing review comments
jaltekruse Oct 19, 2015
b7a6457
fix license leader
Oct 26, 2015
d332ca7
Add test for UnsignedVarIntBytesInput
Oct 26, 2015
f8e5988
Added javadocs, removed unused code in DirectCodecFactory
Oct 26, 2015
b4266fb
Add license header to new class
Oct 26, 2015
ae58486
Changing argument lists that previously included both an allocator an…
jaltekruse Oct 29, 2015
b8f54c2
Add a unit test for ByteBufferBackedBinary.
jaltekruse Oct 29, 2015
c305984
Adding back code generation for method to take a byte array as well a…
jaltekruse Oct 29, 2015
659230f
Remove second version of the class ByteBufferBytesInput that was nest…
jaltekruse Oct 30, 2015
e7f7f7f
WIP - removing unneeded generics form CodecFactories
jaltekruse Oct 30, 2015
3945674
Switch to using the DirectCodecFactory everywhere, one test is failin…
jaltekruse Oct 30, 2015
5869156
Move fallback classes from HeapCodecFactory to the DirectCodecFactory
jaltekruse Oct 30, 2015
1a47767
Address review comments
jaltekruse Oct 31, 2015
192c717
Fix error message
jaltekruse Nov 1, 2015
df7fd9c
Limit access to classes and methods used for reflection based access …
jaltekruse Nov 1, 2015
40714a4
Move pageSize to the constructor of codecfactory rather than the meth…
jaltekruse Nov 2, 2015
a8d2dc1
Address review comments.
jaltekruse Nov 2, 2015
d6501b1
Thought I had fixed this double deallocation earlier, guess the chang…
jaltekruse Nov 2, 2015
57491a2
Delete older version of test file, all of these tests look to be cove…
jaltekruse Nov 2, 2015
10b5ba3
Remove unneeded TODO
jaltekruse Nov 2, 2015
723701c
Adding isDirect interface to ByteBufferAllocator to add a restriction…
jaltekruse Nov 2, 2015
a44fdba
Fix logging and restrict access to classes inside of CodecFactory.
jaltekruse Nov 2, 2015
bd7aa97
Remove unused imports, one of which has been moved to package private…
jaltekruse Nov 2, 2015
269daef
Make CodecFactory public
jaltekruse Nov 2, 2015
96e19a8
Properly set the byte buffer position when reading out of a filesyste…
jaltekruse Nov 3, 2015
58340d8
Fix CompatibilityUtil, primary issue was a small error in the package…
jaltekruse Nov 4, 2015
56316d0
An exception out of the read method doesn't necessarily mean somethin…
jaltekruse Nov 4, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ public interface ColumnWriteStore {
* @return a formated string representing memory usage per column
*/
abstract public String memUsageString();

/**
* Close the related output stream and release any resources
*/
abstract public void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,16 @@ public interface ColumnWriter {
*/
void writeNull(int repetitionLevel, int definitionLevel);

/**
* Close the underlying store. This should be called when there are no
* more data to be written.
*/
void close();

/**
* used to decide when to write a page or row group
* @return the number of bytes of memory used to buffer the current data
*/
long getBufferedSizeInMemory();
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.parquet.column;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
Expand Down Expand Up @@ -74,19 +78,27 @@ public static WriterVersion fromString(String name) {
private final int dictionaryPageSizeThreshold;
private final WriterVersion writerVersion;
private final boolean enableDictionary;
private final ByteBufferAllocator allocator;

public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
this(dictPageSize, writerVersion, enableDict, new HeapByteBufferAllocator());
}

public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict, ByteBufferAllocator allocator) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not multiply public constructors. Just make one private constructor with all fields.
You can make a pattern like this: (not a builder but similar idea)

public ParquetProperties withAllocator(ByteBufferAllocator allocator) {
  return new ParquetProperties(this.dictPageSize, this.writerVersion, this.enableDict, allocator);
}

Probably we should have one parameterLess public constructor and deprecate the others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to go with any pattern, I think it would be best to just go completely with a builder, this would allow for us to enforce required/optional parameters properly. For us to do this in the pattern you suggest we would have to have a validateSetup() method as well which would have to be called after setting all of the properties. Calling this method could not be enforced by the compiler, which would make it a little brittle going forward.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow up in a later PR

this.dictionaryPageSizeThreshold = dictPageSize;
this.writerVersion = writerVersion;
this.enableDictionary = enableDict;
Preconditions.checkNotNull(allocator, "ByteBufferAllocator");
this.allocator = allocator;
}

public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
public ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
if (maxLevel == 0) {
return new DevNullValuesWriter();
} else {
return new RunLengthBitPackingHybridValuesWriter(
getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize, this.allocator
);
}
}

Expand All @@ -95,15 +107,15 @@ private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, i
case BOOLEAN:
return new BooleanPlainValuesWriter();
case INT96:
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize, this.allocator);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize, this.allocator);
case BINARY:
case INT32:
case INT64:
case DOUBLE:
case FLOAT:
return new PlainValuesWriter(initialSizePerCol, pageSize);
return new PlainValuesWriter(initialSizePerCol, pageSize, this.allocator);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -128,19 +140,19 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi
case BOOLEAN:
throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
case BINARY:
return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case INT32:
return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case INT64:
return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case INT96:
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case DOUBLE:
return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case FLOAT:
return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case FIXED_LEN_BYTE_ARRAY:
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage, this.allocator);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -153,12 +165,12 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe
case PARQUET_2_0:
switch (path.getType()) {
case BOOLEAN:
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize, this.allocator);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
return new DeltaByteArrayWriter(initialSizePerCol, pageSize,this.allocator);
case INT32:
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize, this.allocator);
case INT96:
case INT64:
case DOUBLE:
Expand Down Expand Up @@ -218,23 +230,28 @@ public boolean isEnableDictionary() {
return enableDictionary;
}

public ByteBufferAllocator getAllocator() {
return allocator;
}

public ColumnWriteStore newColumnWriteStore(
MessageType schema,
PageWriteStore pageStore,
int pageSize) {
int pageSize,
ByteBufferAllocator allocator) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(
pageStore,
pageSize,
dictionaryPageSizeThreshold,
enableDictionary, writerVersion);
enableDictionary, writerVersion, allocator);
case PARQUET_2_0:
return new ColumnWriteStoreV2(
schema,
pageStore,
pageSize,
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary, allocator));
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.Log;
Expand Down Expand Up @@ -548,7 +549,7 @@ public Void visit(DataPageV2 dataPageV2) {
});
}

private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
ValuesReader previousReader = this.dataColumn;

this.currentEncoding = dataEncoding;
Expand Down Expand Up @@ -588,8 +589,8 @@ private void readPageV1(DataPageV1 page) {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
byte[] bytes = page.getBytes().toByteArray();
if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
ByteBuffer bytes = page.getBytes().toByteBuffer();
if (DEBUG) LOG.debug("page size " + bytes.remaining() + " bytes and " + pageValueCount + " records");
if (DEBUG) LOG.debug("reading repetition levels at 0");
rlReader.initFromPage(pageValueCount, bytes, 0);
int next = rlReader.getNextOffset();
Expand All @@ -608,7 +609,7 @@ private void readPageV2(DataPageV2 page) {
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
try {
if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand All @@ -622,7 +623,7 @@ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
return new RLEIntIterator(
new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel),
new ByteArrayInputStream(bytes.toByteArray())));
bytes.toInputStream()));
} catch (IOException e) {
throw new ParquetDecodingException("could not read levels in page for col " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
Expand All @@ -40,14 +41,16 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
private final int dictionaryPageSizeThreshold;
private final boolean enableDictionary;
private final WriterVersion writerVersion;
private final ByteBufferAllocator allocator;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion, ByteBufferAllocator allocator) {
super();
this.pageWriteStore = pageWriteStore;
this.pageSizeThreshold = pageSizeThreshold;
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
this.enableDictionary = enableDictionary;
this.writerVersion = writerVersion;
this.allocator = allocator;
}

public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Expand All @@ -65,7 +68,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion, allocator);
}

@Override
Expand Down Expand Up @@ -132,4 +135,11 @@ public void endRecord() {
// V1 does not take record boundaries into account
}

public void close() {
Collection<ColumnWriterV1> values = columns.values();
for (ColumnWriterV1 memColumn : values) {
memColumn.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
Expand All @@ -50,6 +51,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
private long rowCount;
private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
private final long thresholdTolerance;
private final ByteBufferAllocator allocator;

private int pageSizeThreshold;

Expand All @@ -61,6 +63,7 @@ public ColumnWriteStoreV2(
super();
this.pageSizeThreshold = pageSizeThreshold;
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
this.allocator = parquetProps.getAllocator();
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
Expand Down Expand Up @@ -127,6 +130,14 @@ public String memUsageString() {
return b.toString();
}

@Override
public void close() {
flush(); // calling flush() here to keep it consistent with the behavior before merging with master
for (ColumnWriterV2 memColumn : columns.values()) {
memColumn.close();
}
}

@Override
public void endRecord() {
++ rowCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;

import org.apache.parquet.Log;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
Expand Down Expand Up @@ -66,18 +67,19 @@ public ColumnWriterV1(
int pageSizeThreshold,
int dictionaryPageSizeThreshold,
boolean enableDictionary,
WriterVersion writerVersion) {
WriterVersion writerVersion,
ByteBufferAllocator allocator) {
this.path = path;
this.pageWriter = pageWriter;
this.pageSizeThreshold = pageSizeThreshold;
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary, allocator);

this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.repetitionLevelColumn = parquetProps.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = parquetProps.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
Expand Down Expand Up @@ -239,7 +241,7 @@ public void flush() {
if (valueCount > 0) {
writePage();
}
final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
if (dictionaryPage != null) {
if (DEBUG) LOG.debug("write dictionary");
try {
Expand All @@ -251,6 +253,16 @@ public void flush() {
}
}

@Override
public void close() {
flush();
// Close the Values writers.
repetitionLevelColumn.close();
definitionLevelColumn.close();
dataColumn.close();
}

@Override
public long getBufferedSizeInMemory() {
return repetitionLevelColumn.getBufferedSize()
+ definitionLevelColumn.getBufferedSize()
Expand Down
Loading