Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0</version>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion parquet-cascading/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion parquet-cascading3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
12 changes: 12 additions & 0 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,15 @@
import org.apache.parquet.bytes.HeapByteBufferAllocator;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
import org.apache.parquet.column.values.plain.PlainValuesWriter;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.schema.MessageType;

/**
Expand All @@ -66,6 +51,8 @@ public class ParquetProperties {
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

private static final int MIN_SLAB_SIZE = 64;

public enum WriterVersion {
Expand All @@ -89,6 +76,7 @@ public static WriterVersion fromString(String name) {
}
}

private final int initialSlabSize;
private final int pageSizeThreshold;
private final int dictionaryPageSizeThreshold;
private final WriterVersion writerVersion;
Expand All @@ -97,21 +85,23 @@ public static WriterVersion fromString(String name) {
private final int maxRowCountForPageSizeCheck;
private final boolean estimateNextSizeCheck;
private final ByteBufferAllocator allocator;

private final int initialSlabSize;
private final ValuesWriterFactory valuesWriterFactory;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the story for configurable properties here? If one writes a custom ValuesWriterFactory, it seems totally reasonable that they would have other non-default settings that they would configure. I'm not really sure how this is supported in hadoop (if at all) but I would imagine it being something like fetching all settings under parquet.writer.writerProperties.* or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand your question completely but if you take a look at my last commit: 503958a, you can see that there's a way to configure ValuesWriterFactories. To do so, you write your special ValuesWriterFactory (similar to what I've done in the unit tests) and make it extend ConfigurableFactory. When you do so, you have the Hadoop Config passed in which you can read & use. I did mull doing something like reading everything under parquet.writer.writerProperties.* but felt this was a cleaner approach.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yeah I totally missed ConfigurableFactory, that works perfectly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigurableFactory is now a factory that is Configurable, right?

int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator) {
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dictionaryPageSizeThreshold = dictPageSize;
this.writerVersion = writerVersion;
this.enableDictionary = enableDict;
this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.allocator = allocator;

this.valuesWriterFactory = writerFactory;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -144,128 +134,18 @@ private RunLengthBitPackingHybridEncoder newLevelEncoder(int maxLevel) {
getWidthFromMaxInt(maxLevel), MIN_SLAB_SIZE, pageSizeThreshold, allocator);
}

private ValuesWriter plainWriter(ColumnDescriptor path) {
switch (path.getType()) {
case BOOLEAN:
return new BooleanPlainValuesWriter();
case INT96:
return new FixedLenByteArrayPlainValuesWriter(12, initialSlabSize, pageSizeThreshold, allocator);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSlabSize, pageSizeThreshold, allocator);
case BINARY:
case INT32:
case INT64:
case DOUBLE:
case FLOAT:
return new PlainValuesWriter(initialSlabSize, pageSizeThreshold, allocator);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
}

@SuppressWarnings("deprecation")
private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path) {
Encoding encodingForDataPage;
Encoding encodingForDictionaryPage;
switch(writerVersion) {
case PARQUET_1_0:
encodingForDataPage = PLAIN_DICTIONARY;
encodingForDictionaryPage = PLAIN_DICTIONARY;
break;
case PARQUET_2_0:
encodingForDataPage = RLE_DICTIONARY;
encodingForDictionaryPage = PLAIN;
break;
default:
throw new IllegalArgumentException("Unknown version: " + writerVersion);
}
switch (path.getType()) {
case BOOLEAN:
throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
case BINARY:
return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case INT32:
return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case INT64:
return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case INT96:
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case DOUBLE:
return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case FLOAT:
return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
case FIXED_LEN_BYTE_ARRAY:
return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage, this.allocator);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
}

private ValuesWriter writerToFallbackTo(ColumnDescriptor path) {
switch(writerVersion) {
case PARQUET_1_0:
return plainWriter(path);
case PARQUET_2_0:
switch (path.getType()) {
case BOOLEAN:
return new RunLengthBitPackingHybridValuesWriter(1, initialSlabSize, pageSizeThreshold, allocator);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSlabSize, pageSizeThreshold, allocator);
case INT32:
return new DeltaBinaryPackingValuesWriterForInteger(initialSlabSize, pageSizeThreshold, allocator);
case INT64:
return new DeltaBinaryPackingValuesWriterForLong(initialSlabSize, pageSizeThreshold, allocator);
case INT96:
case DOUBLE:
case FLOAT:
return plainWriter(path);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
default:
throw new IllegalArgumentException("Unknown version: " + writerVersion);
}
}

private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path);
if (enableDictionary) {
return FallbackValuesWriter.of(
dictionaryWriter(path),
writerToFallBackTo);
} else {
return writerToFallBackTo;
}
}

public ValuesWriter newValuesWriter(ColumnDescriptor path) {
switch (path.getType()) {
case BOOLEAN: // no dictionary encoding for boolean
return writerToFallbackTo(path);
case FIXED_LEN_BYTE_ARRAY:
// dictionary encoding for that type was not enabled in PARQUET 1.0
if (writerVersion == WriterVersion.PARQUET_2_0) {
return dictWriterWithFallBack(path);
} else {
return writerToFallbackTo(path);
}
case BINARY:
case INT32:
case INT64:
case INT96:
case DOUBLE:
case FLOAT:
return dictWriterWithFallBack(path);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
return valuesWriterFactory.newValuesWriter(path);
}

public int getPageSizeThreshold() {
return pageSizeThreshold;
}

public int getInitialSlabSize() {
return initialSlabSize;
}

public int getDictionaryPageSizeThreshold() {
return dictionaryPageSizeThreshold;
}
Expand Down Expand Up @@ -302,6 +182,10 @@ public int getMaxRowCountForPageSizeCheck() {
return maxRowCountForPageSizeCheck;
}

public ValuesWriterFactory getValuesWriterFactory() {
return valuesWriterFactory;
}

public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}
Expand All @@ -323,6 +207,7 @@ public static class Builder {
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;

private Builder() {
}
Expand Down Expand Up @@ -411,10 +296,25 @@ public Builder withAllocator(ByteBufferAllocator allocator) {
return this;
}

public Builder withValuesWriterFactory(ValuesWriterFactory factory) {
Preconditions.checkNotNull(factory, "ValuesWriterFactory");
this.valuesWriterFactory = factory;
return this;
}

public ParquetProperties build() {
return new ParquetProperties(writerVersion, pageSize, dictPageSize,
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator);
estimateNextSizeCheck, allocator, valuesWriterFactory);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
// properties to the object.
valuesWriterFactory.initialize(properties);

return properties;
}

}
}
Loading