Skip to content

Commit 9ad5485

Browse files
Daniel Weeksjulienledem
authored andcommitted
PARQUET-2: Adding Type Persuasion for Primitive Types
Original from the old repo: Parquet/parquet-mr#410 JIRA: https://issues.apache.org/jira/browse/PARQUET-2 These changes allow primitive types to be requested as different types than what is stored in the file format using a flag to turn off strict type checking (default is on). Types are cast to the requested type where possible and will suffer precision loss for casting where necessary (e.g. requesting a double as an int). No performance penalty is imposed for using the type defined in the file type. A flag exists to A 6x6 test case is provided to test conversion between the primitive types. Author: Daniel Weeks <dweeks@netflix.com> Closes apache#3 from dcw-netflix/type-persuasion and squashes the following commits: 97f4e9a [Daniel Weeks] Added documentation as suggested by code review 1c3c0c7 [Daniel Weeks] Fixed test with strict checking off f3cb495 [Daniel Weeks] Added type persuasion for primitive types with a flag to control strict type checking for conflicting schemas, which is strict by default.
1 parent 859b6b4 commit 9ad5485

File tree

10 files changed

+305
-23
lines changed

10 files changed

+305
-23
lines changed

parquet-column/src/main/java/parquet/io/ColumnIOFactory.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,16 @@ public class ColumnIOCreatorVisitor implements TypeVisitor {
4343
private final MessageType requestedSchema;
4444
private int currentRequestedIndex;
4545
private Type currentRequestedType;
46+
private boolean strictTypeChecking;
4647

4748
public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
49+
this(validating, requestedSchema, true);
50+
}
51+
52+
public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
4853
this.validating = validating;
4954
this.requestedSchema = requestedSchema;
55+
this.strictTypeChecking = strictTypeChecking;
5056
}
5157

5258
@Override
@@ -86,7 +92,8 @@ private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType r
8692

8793
@Override
8894
public void visit(PrimitiveType primitiveType) {
89-
if (!currentRequestedType.isPrimitive() || currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName()) {
95+
if (!currentRequestedType.isPrimitive() ||
96+
(this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) {
9097
incompatibleSchema(primitiveType, currentRequestedType);
9198
}
9299
PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size());
@@ -127,7 +134,17 @@ public ColumnIOFactory(boolean validating) {
127134
* @return the corresponding serializing/deserializing structure
128135
*/
129136
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) {
130-
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema);
137+
return getColumnIO(requestedSchema, fileSchema, true);
138+
}
139+
140+
/**
141+
* @param schema the requestedSchema we want to read/write
142+
* @param fileSchema the file schema (when reading it can be different from the requested schema)
143+
* @param strict should file type and requested primitive types match
144+
* @return the corresponding serializing/deserializing structure
145+
*/
146+
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
147+
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
131148
fileSchema.accept(visitor);
132149
return visitor.getColumnIO();
133150
}

parquet-column/src/main/java/parquet/schema/GroupType.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,11 @@ protected <T> List<T> convertChildren(List<GroupType> path, TypeConverter<T> con
293293

294294
@Override
295295
protected Type union(Type toMerge) {
296+
return union(toMerge, true);
297+
}
298+
299+
@Override
300+
protected Type union(Type toMerge, boolean strict) {
296301
if (toMerge.isPrimitive()) {
297302
throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
298303
}
@@ -305,6 +310,16 @@ protected Type union(Type toMerge) {
305310
* @return the merged list
306311
*/
307312
List<Type> mergeFields(GroupType toMerge) {
313+
return mergeFields(toMerge, true);
314+
}
315+
316+
/**
317+
* produces the list of fields resulting from merging toMerge into the fields of this
318+
* @param toMerge the group containing the fields to merge
319+
* @param strict should schema primitive types match
320+
* @return the merged list
321+
*/
322+
List<Type> mergeFields(GroupType toMerge, boolean strict) {
308323
List<Type> newFields = new ArrayList<Type>();
309324
// merge existing fields
310325
for (Type type : this.getFields()) {
@@ -314,7 +329,7 @@ List<Type> mergeFields(GroupType toMerge) {
314329
if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
315330
throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
316331
}
317-
merged = type.union(fieldToMerge);
332+
merged = type.union(fieldToMerge, strict);
318333
} else {
319334
merged = type;
320335
}

parquet-column/src/main/java/parquet/schema/MessageType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ public boolean containsPath(String[] path) {
137137
}
138138

139139
public MessageType union(MessageType toMerge) {
140-
return new MessageType(this.getName(), mergeFields(toMerge));
140+
return union(toMerge, true);
141+
}
142+
143+
public MessageType union(MessageType toMerge, boolean strict) {
144+
return new MessageType(this.getName(), mergeFields(toMerge, strict));
141145
}
142146

143147
}

parquet-column/src/main/java/parquet/schema/PrimitiveType.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ abstract public void addValueToPrimitiveConverter(
276276
private final PrimitiveTypeName primitive;
277277
private final int length;
278278
private final DecimalMetadata decimalMeta;
279-
279+
280280
/**
281281
* @param repetition OPTIONAL, REPEATED, REQUIRED
282282
* @param primitive STRING, INT64, ...
@@ -486,7 +486,12 @@ protected boolean containsPath(String[] path, int depth) {
486486

487487
@Override
488488
protected Type union(Type toMerge) {
489-
if (!toMerge.isPrimitive() || !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName())) {
489+
return union(toMerge, true);
490+
}
491+
492+
@Override
493+
protected Type union(Type toMerge, boolean strict) {
494+
if (!toMerge.isPrimitive() || (strict && !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName()))) {
490495
throw new IncompatibleSchemaModificationException("can not merge type " + toMerge + " into " + this);
491496
}
492497
Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(

parquet-column/src/main/java/parquet/schema/Type.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ public boolean equals(Object other) {
195195
* @return the union result of merging toMerge into this
196196
*/
197197
protected abstract Type union(Type toMerge);
198+
199+
/**
200+
* @param toMerge the type to merge into this one
201+
* @param strict should schema primitive types match
202+
* @return the union result of merging toMerge into this
203+
*/
204+
protected abstract Type union(Type toMerge, boolean strict);
198205

199206
/**
200207
* {@inheritDoc}

parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import static java.lang.String.format;
3939
import static parquet.Log.DEBUG;
40+
import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
4041

4142
class InternalParquetRecordReader<T> {
4243
private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
@@ -57,6 +58,7 @@ class InternalParquetRecordReader<T> {
5758
private ParquetFileReader reader;
5859
private parquet.io.RecordReader<T> recordReader;
5960
private UnboundRecordFilter recordFilter;
61+
private boolean strictTypeChecking;
6062

6163
private long totalTimeSpentReadingBytes;
6264
private long totalTimeSpentProcessingRecords;
@@ -106,7 +108,7 @@ private void checkRead() throws IOException {
106108
BenchmarkCounter.incrementTime(timeSpentReading);
107109
LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
108110
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
109-
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
111+
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
110112
recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
111113
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
112114
totalCountLoadedSoFar += pages.getRowCount();
@@ -142,7 +144,7 @@ public void initialize(MessageType requestedSchema, MessageType fileSchema,
142144
this.recordConverter = readSupport.prepareForRead(
143145
configuration, extraMetadata, fileSchema,
144146
new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
145-
147+
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
146148
List<ColumnDescriptor> columns = requestedSchema.getColumns();
147149
reader = new ParquetFileReader(configuration, file, blocks, columns);
148150
for (BlockMetaData block : blocks) {

parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class ParquetFileWriter {
8383
private long currentChunkFirstDataPage;
8484
private long currentChunkDictionaryPageOffset;
8585
private long currentChunkValueCount;
86-
86+
8787
private Statistics currentStatistics;
8888

8989
/**
@@ -439,11 +439,16 @@ public long getPos() throws IOException {
439439
* @param footers the list files footers to merge
440440
* @return the global meta data for all the footers
441441
*/
442+
442443
static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
444+
return getGlobalMetaData(footers, true);
445+
}
446+
447+
static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
443448
GlobalMetaData fileMetaData = null;
444449
for (Footer footer : footers) {
445450
ParquetMetadata currentMetadata = footer.getParquetMetadata();
446-
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
451+
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
447452
}
448453
return fileMetaData;
449454
}
@@ -457,6 +462,13 @@ static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
457462
static GlobalMetaData mergeInto(
458463
FileMetaData toMerge,
459464
GlobalMetaData mergedMetadata) {
465+
return mergeInto(toMerge, mergedMetadata, true);
466+
}
467+
468+
static GlobalMetaData mergeInto(
469+
FileMetaData toMerge,
470+
GlobalMetaData mergedMetadata,
471+
boolean strict) {
460472
MessageType schema = null;
461473
Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
462474
Set<String> createdBy = new HashSet<String>();
@@ -467,7 +479,7 @@ static GlobalMetaData mergeInto(
467479
}
468480
if ((schema == null && toMerge.getSchema() != null)
469481
|| (schema != null && !schema.equals(toMerge.getSchema()))) {
470-
schema = mergeInto(toMerge.getSchema(), schema);
482+
schema = mergeInto(toMerge.getSchema(), schema, strict);
471483
}
472484
for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
473485
Set<String> values = newKeyValues.get(entry.getKey());
@@ -491,10 +503,22 @@ static GlobalMetaData mergeInto(
491503
* @return the resulting schema
492504
*/
493505
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
506+
return mergeInto(toMerge, mergedSchema, true);
507+
}
508+
509+
/**
510+
* will return the result of merging toMerge into mergedSchema
511+
* @param toMerge the schema to merge into mergedSchema
512+
* @param mergedSchema the schema to append the fields to
513+
* @param strict should schema primitive types match
514+
* @return the resulting schema
515+
*/
516+
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
494517
if (mergedSchema == null) {
495518
return toMerge;
496519
}
497-
return mergedSchema.union(toMerge);
520+
521+
return mergedSchema.union(toMerge, strict);
498522
}
499523

500524
}

parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
7777
* key to configure the filter
7878
*/
7979
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
80+
81+
/**
82+
* key to configure type checking for conflicting schemas (default: true)
83+
*/
84+
public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
8085

8186
private Class<?> readSupportClass;
8287
private List<Footer> footers;
@@ -358,7 +363,7 @@ public List<ParquetInputSplit> getSplits(Configuration configuration, List<Foote
358363
throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
359364
}
360365
List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
361-
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
366+
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
362367
ReadContext readContext = getReadSupport(configuration).init(new InitContext(
363368
configuration,
364369
globalMetaData.getKeyValueMetaData(),

0 commit comments

Comments
 (0)