Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ public class ColumnIOCreatorVisitor implements TypeVisitor {
private final MessageType requestedSchema;
private int currentRequestedIndex;
private Type currentRequestedType;
private boolean strictTypeChecking;

public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
this(validating, requestedSchema, true);
}

public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
this.validating = validating;
this.requestedSchema = requestedSchema;
this.strictTypeChecking = strictTypeChecking;
}

@Override
Expand Down Expand Up @@ -86,7 +92,8 @@ private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType r

@Override
public void visit(PrimitiveType primitiveType) {
if (!currentRequestedType.isPrimitive() || currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName()) {
if (!currentRequestedType.isPrimitive() ||
(this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) {
incompatibleSchema(primitiveType, currentRequestedType);
}
PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size());
Expand Down Expand Up @@ -127,7 +134,17 @@ public ColumnIOFactory(boolean validating) {
* @return the corresponding serializing/deserializing structure
*/
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) {
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema);
return getColumnIO(requestedSchema, fileSchema, true);
}

/**
* @param schema the requestedSchema we want to read/write
* @param fileSchema the file schema (when reading it can be different from the requested schema)
* @param strict should file type and requested primitive types match
* @return the corresponding serializing/deserializing structure
*/
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
fileSchema.accept(visitor);
return visitor.getColumnIO();
}
Expand Down
17 changes: 16 additions & 1 deletion parquet-column/src/main/java/parquet/schema/GroupType.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ protected <T> List<T> convertChildren(List<GroupType> path, TypeConverter<T> con

@Override
protected Type union(Type toMerge) {
return union(toMerge, true);
}

@Override
protected Type union(Type toMerge, boolean strict) {
if (toMerge.isPrimitive()) {
throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
}
Expand All @@ -305,6 +310,16 @@ protected Type union(Type toMerge) {
* @return the merged list
*/
List<Type> mergeFields(GroupType toMerge) {
return mergeFields(toMerge, true);
}

/**
* produces the list of fields resulting from merging toMerge into the fields of this
* @param toMerge the group containing the fields to merge
* @param strict should schema primitive types match
* @return the merged list
*/
List<Type> mergeFields(GroupType toMerge, boolean strict) {
List<Type> newFields = new ArrayList<Type>();
// merge existing fields
for (Type type : this.getFields()) {
Expand All @@ -314,7 +329,7 @@ List<Type> mergeFields(GroupType toMerge) {
if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
}
merged = type.union(fieldToMerge);
merged = type.union(fieldToMerge, strict);
} else {
merged = type;
}
Expand Down
6 changes: 5 additions & 1 deletion parquet-column/src/main/java/parquet/schema/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ public boolean containsPath(String[] path) {
}

public MessageType union(MessageType toMerge) {
return new MessageType(this.getName(), mergeFields(toMerge));
return union(toMerge, true);
}

public MessageType union(MessageType toMerge, boolean strict) {
return new MessageType(this.getName(), mergeFields(toMerge, strict));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ abstract public void addValueToPrimitiveConverter(
private final PrimitiveTypeName primitive;
private final int length;
private final DecimalMetadata decimalMeta;

/**
* @param repetition OPTIONAL, REPEATED, REQUIRED
* @param primitive STRING, INT64, ...
Expand Down Expand Up @@ -486,7 +486,12 @@ protected boolean containsPath(String[] path, int depth) {

@Override
protected Type union(Type toMerge) {
if (!toMerge.isPrimitive() || !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName())) {
return union(toMerge, true);
}

@Override
protected Type union(Type toMerge, boolean strict) {
if (!toMerge.isPrimitive() || (strict && !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName()))) {
throw new IncompatibleSchemaModificationException("can not merge type " + toMerge + " into " + this);
}
Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
Expand Down
7 changes: 7 additions & 0 deletions parquet-column/src/main/java/parquet/schema/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ public boolean equals(Object other) {
* @return the union result of merging toMerge into this
*/
protected abstract Type union(Type toMerge);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put the default implementation here:

protected Type union(Type toMerge) {
    return union(toMerge, true);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to put the default implementation in the abstract class, but the
maven enforcer plugin wouldn't allow me to do it. I assume removing the
abstract is considered an interface change.

On Fri, Jun 20, 2014 at 9:21 PM, Julien Le Dem notifications@github.com
wrote:

In parquet-column/src/main/java/parquet/schema/Type.java:

@@ -195,6 +195,13 @@ public boolean equals(Object other) {
* @return the union result of merging toMerge into this
*/
protected abstract Type union(Type toMerge);

We should put the default implementation here:

protected Type union(Type toMerge) {
return union(toMerge, true);
}


Reply to this email directly or view it on GitHub
https://github.com/apache/incubator-parquet-mr/pull/3/files#r14048282.


/**
* @param toMerge the type to merge into this one
* @param strict should schema primitive types match
* @return the union result of merging toMerge into this
*/
protected abstract Type union(Type toMerge, boolean strict);

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static java.lang.String.format;
import static parquet.Log.DEBUG;
import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;

class InternalParquetRecordReader<T> {
private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
Expand All @@ -57,6 +58,7 @@ class InternalParquetRecordReader<T> {
private ParquetFileReader reader;
private parquet.io.RecordReader<T> recordReader;
private UnboundRecordFilter recordFilter;
private boolean strictTypeChecking;

private long totalTimeSpentReadingBytes;
private long totalTimeSpentProcessingRecords;
Expand Down Expand Up @@ -106,7 +108,7 @@ private void checkRead() throws IOException {
BenchmarkCounter.incrementTime(timeSpentReading);
LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ParquetInputFormat already has read this setting from the conf.
I'd rather keep this centralized there and passed as a constructor argument to the RecordReader.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this one a bit and found that using the constructor results in
two different property reads either way.

Currently, ParquetInputFormat and InternalParquetRecordReader read the
property, but if I take it out of InternalParquetRecordReader and change it
to a parameter, then ParquetReader will need to read the property as well
since it instantiates the InternalParquetRecordReader without being called
by the ParquetInputFormat.

I'm not sure there's a clean way to do this.

-Dan

On Fri, Jun 20, 2014 at 9:26 PM, Julien Le Dem notifications@github.com
wrote:

In
parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java:

@@ -106,7 +108,7 @@ private void checkRead() throws IOException {
BenchmarkCounter.incrementTime(timeSpentReading);
LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);

  •  MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
    
  •  MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
    

the ParquetInputFormat already has read this setting from the conf.
I'd rather keep this centralized there and passed as a constructor
argument to the RecordReader.


Reply to this email directly or view it on GitHub
https://github.com/apache/incubator-parquet-mr/pull/3/files#r14048288.

recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
Expand Down Expand Up @@ -142,7 +144,7 @@ public void initialize(MessageType requestedSchema, MessageType fileSchema,
this.recordConverter = readSupport.prepareForRead(
configuration, extraMetadata, fileSchema,
new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));

this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
List<ColumnDescriptor> columns = requestedSchema.getColumns();
reader = new ParquetFileReader(configuration, file, blocks, columns);
for (BlockMetaData block : blocks) {
Expand Down
32 changes: 28 additions & 4 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class ParquetFileWriter {
private long currentChunkFirstDataPage;
private long currentChunkDictionaryPageOffset;
private long currentChunkValueCount;

private Statistics currentStatistics;

/**
Expand Down Expand Up @@ -439,11 +439,16 @@ public long getPos() throws IOException {
* @param footers the list files footers to merge
* @return the global meta data for all the footers
*/

static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
return getGlobalMetaData(footers, true);
}

static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
GlobalMetaData fileMetaData = null;
for (Footer footer : footers) {
ParquetMetadata currentMetadata = footer.getParquetMetadata();
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
}
return fileMetaData;
}
Expand All @@ -457,6 +462,13 @@ static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata) {
return mergeInto(toMerge, mergedMetadata, true);
}

static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata,
boolean strict) {
MessageType schema = null;
Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
Set<String> createdBy = new HashSet<String>();
Expand All @@ -467,7 +479,7 @@ static GlobalMetaData mergeInto(
}
if ((schema == null && toMerge.getSchema() != null)
|| (schema != null && !schema.equals(toMerge.getSchema()))) {
schema = mergeInto(toMerge.getSchema(), schema);
schema = mergeInto(toMerge.getSchema(), schema, strict);
}
for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
Set<String> values = newKeyValues.get(entry.getKey());
Expand All @@ -491,10 +503,22 @@ static GlobalMetaData mergeInto(
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
return mergeInto(toMerge, mergedSchema, true);
}

/**
* will return the result of merging toMerge into mergedSchema
* @param toMerge the schema to merge into mergedSchema
* @param mergedSchema the schema to append the fields to
* @param strict should schema primitive types match
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
if (mergedSchema == null) {
return toMerge;
}
return mergedSchema.union(toMerge);

return mergedSchema.union(toMerge, strict);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
* key to configure the filter
*/
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";

/**
* key to configure type checking for conflicting schemas (default: true)
*/
public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";

private Class<?> readSupportClass;
private List<Footer> footers;
Expand Down Expand Up @@ -358,7 +363,7 @@ public List<ParquetInputSplit> getSplits(Configuration configuration, List<Foote
throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
}
List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
ReadContext readContext = getReadSupport(configuration).init(new InitContext(
configuration,
globalMetaData.getKeyValueMetaData(),
Expand Down
Loading