diff --git a/.gitignore b/.gitignore index cd3c0669ca..aa67d3d37a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ dependency-reduced-pom.xml parquet-scrooge/.cache .idea/* target/ +.cache +*~ +mvn_install.log diff --git a/.travis.yml b/.travis.yml index aa9534935b..231405e029 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ before_install: - sudo apt-get install build-essential - mkdir protobuf_install - pushd protobuf_install - - wget http://protobuf.googlecode.com/files/protobuf-2.5.0.tar.gz + - wget https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.gz - tar xzf protobuf-2.5.0.tar.gz - cd protobuf-2.5.0 - ./configure @@ -24,8 +24,8 @@ before_install: - cd .. env: - - HADOOP_PROFILE=default - - HADOOP_PROFILE=hadoop-2 + - HADOOP_PROFILE=hadoop-1 TEST_CODECS=uncompressed + - HADOOP_PROFILE=default TEST_CODECS=gzip,snappy install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || (cat mvn_install.log && false) script: mvn test -P $HADOOP_PROFILE diff --git a/LICENSE b/LICENSE index b7591484bd..b0065815a5 100644 --- a/LICENSE +++ b/LICENSE @@ -178,6 +178,14 @@ -------------------------------------------------------------------------------- +This product includes code from Apache Avro. + +Copyright: 2014 The Apache Software Foundation. +Home page: https://avro.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This project includes code from Daniel Lemire's JavaFastPFOR project. The "Lemire" bit packing source code produced by parquet-generator is derived from the JavaFastPFOR project. diff --git a/NOTICE b/NOTICE index c6e3bf2525..a9b6c56de7 100644 --- a/NOTICE +++ b/NOTICE @@ -43,3 +43,14 @@ with the following copyright notice: See the License for the specific language governing permissions and limitations under the License. +-------------------------------------------------------------------------------- + +This product includes code from Apache Avro, which includes the following in +its NOTICE file: + + Apache Avro + Copyright 2010-2015 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + diff --git a/README.md b/README.md index 2d9a50a124..f084f50751 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ Parquet-MR uses Maven to build and depends on both the thrift and protoc compile To build and install the protobuf compiler, run: ``` -wget http://protobuf.googlecode.com/files/protobuf-2.5.0.tar.gz +wget https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.gz tar xzf protobuf-2.5.0.tar.gz cd protobuf-2.5.0 ./configure @@ -62,7 +62,7 @@ sudo make install Once protobuf and thrift are available in your path, you can build the project by running: ``` -mvn clean install +LC_ALL=C mvn clean install ``` ## Features @@ -111,8 +111,8 @@ Avro conversion is implemented via the [parquet-avro](https://github.com/apache/ * the ParquetInputFormat can be provided a ReadSupport to materialize your own objects by implementing a RecordMaterializer See the APIs: -* [Record conversion API](https://github.com/apache/parquet-mr/tree/master/parquet-column/src/main/java/parquet/io/api) -* [Hadoop API](https://github.com/apache/parquet-mr/tree/master/parquet-hadoop/src/main/java/parquet/hadoop/api) +* [Record conversion API](https://github.com/apache/parquet-mr/tree/master/parquet-column/src/main/java/org/apache/parquet/io/api) +* [Hadoop API](https://github.com/apache/parquet-mr/tree/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api) ## Apache Pig integration A [Loader](https://github.com/apache/parquet-mr/blob/master/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java) and a [Storer](https://github.com/apache/parquet-mr/blob/master/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetStorer.java) are provided to read and write Parquet files with Apache Pig @@ -202,16 +202,8 @@ Thank you for getting involved! ## Authors and contributors -* Julien Le Dem [@J_](http://twitter.com/J_) -* Tom White -* Mickaƫl Lacour -* Remy Pecqueur -* Avi Bryant -* Dmitriy Ryaboy [@squarecog](https://twitter.com/squarecog) -* Jonathan Coveney -* Brock Noland -* Tianshuo Deng -* and many others -- see the [Contributor report]( https://github.com/apache/parquet-mr/contributors) +* [Contributors](https://github.com/apache/parquet-mr/graphs/contributors) +* [Committers](dev/COMMITTERS.md) ## Code of Conduct diff --git a/dev/COMMITTERS.md b/dev/COMMITTERS.md index efb2762b4e..78cfdcd81f 100644 --- a/dev/COMMITTERS.md +++ b/dev/COMMITTERS.md @@ -17,30 +17,41 @@ ~ under the License. --> -# Committers (in aplhabetical order): +# Committers (in alphabetical order): -| Name | Apache Id | github id | JIRA id | -|--------------------|------------|----------------|-------------| -| Aniket Mokashi | aniket486 | aniket486 | | -| Brock Noland | brock | brockn | | -| Cheng Lian | lian | liancheng | lian cheng | -| Chris Aniszczyk | caniszczyk | | | -| Dmitriy Ryaboy | dvryaboy | dvryaboy | | -| Jake Farrell | jfarrell | | | -| Jonathan Coveney | jcoveney | jcoveney | | -| Julien Le Dem | julien | julienledem | julienledem | -| Lukas Nalezenec | lukas | lukasnalezenec | | -| Marcel Kornacker | marcel | | | -| Mickael Lacour | mlacour | mickaellcr | | -| Nong Li | nong | nongli | | -| Remy Pecqueur | rpecqueur | Lordshinjo | | -| Ryan Blue | blue | rdblue | | -| Sergio Pena | spena | spena | spena | -| Tianshuo Deng | tianshuo | tsdeng | | -| Tom White | tomwhite | tomwhite | | -| Wesley Graham Peck | wesleypeck | wesleypeck | | +The official list of committers can be found here: [Apache Parquet Committers and PMC](http://people.apache.org/committers-by-project.html#parquet) -Reviewing guidelines: +Below is more information about each committer (in alphabetical order). If this information becomes out of date, please send a PR to update! + +| Name | Apache Id | github id | JIRA id | +|------------------------|-----------------|---------------------|----------------| +| Alex Levenson | alexlevenson | @isnotinvain | alexlevenson | +| Aniket Mokashi | aniket486 | @aniket486 | | +| Brock Noland | brock | @brockn | | +| Cheng Lian | lian | @liancheng | liancheng | +| Chris Aniszczyk | caniszczyk | @caniszczyk | | +| Chris Mattmann | mattmann | @chrismattmann | | +| Daniel C. Weeks | dweeks | @danielcweeks | | +| Dmitriy Ryaboy | dvryaboy | @dvryaboy | | +| Jake Farrell | jfarrell | | | +| Jonathan Coveney | jcoveney | @jcoveney | | +| Julien Le Dem | julien | @julienledem | julienledem | +| Lukas Nalezenec | lukas | @lukasnalezenec | | +| Marcel Kornacker | marcel | @mkornacker | | +| Mickael Lacour | mlacour | @mickaellcr | | +| Nong Li | nong | @nongli | | +| Remy Pecqueur | rpecqueur | @Lordshinjo | | +| Roman Shaposhnik | rvs | @rvs | | +| Ryan Blue | blue | @rdblue | | +| Sergio Pena | spena | @spena | spena | +| Tianshuo Deng | tianshuo | @tsdeng | | +| Todd Lipcon | todd | @toddlipcon | | +| Tom White | tomwhite | @tomwhite | | +| Wes McKinney | wesm | @wesm | | +| Wesley Graham Peck | wesleypeck | @wesleypeck | | + + +# Reviewing guidelines: Committers have the responsibility to give constructive and timely feedback on the pull requests. Anybody can give feedback on a pull request but only committers can merge it. diff --git a/dev/merge_parquet_pr.py b/dev/merge_parquet_pr.py index 621fcde807..b1896e2e17 100755 --- a/dev/merge_parquet_pr.py +++ b/dev/merge_parquet_pr.py @@ -81,9 +81,9 @@ def fail(msg): def run_cmd(cmd): try: if isinstance(cmd, list): - return subprocess.check_output(cmd, stderr=subprocess.STDOUT) + return subprocess.check_output(cmd) else: - return subprocess.check_output(cmd.split(" "), stderr = subprocess.STDOUT) + return subprocess.check_output(cmd.split(" ")) except subprocess.CalledProcessError as e: # this avoids hiding the stdout / stderr of failed processes print 'Command failed: %s' % cmd diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 94343438df..109cc3875d 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -32,10 +32,6 @@ Apache Parquet Avro https://parquet.apache.org - - 1.7.6 - - org.apache.parquet @@ -71,7 +67,7 @@ com.google.guava guava - 11.0 + ${guava.version} test @@ -87,6 +83,13 @@ ${slf4j.version} test + + org.apache.parquet + parquet-hadoop + ${project.version} + test-jar + test + diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java index 06c66d6925..48eab4d0c9 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java @@ -21,6 +21,8 @@ import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; @@ -111,6 +113,11 @@ public void add(Object value) { @SuppressWarnings("unchecked") private static Class getDatumClass(GenericData model, Schema schema) { + if (model.getConversionFor(schema.getLogicalType()) != null) { + // use generic classes to pass data to conversions + return null; + } + if (model instanceof SpecificData) { return (Class) ((SpecificData) model).getClass(schema); } @@ -133,7 +140,16 @@ private Schema.Field getAvroField(String parquetFieldName) { } private static Converter newConverter(Schema schema, Type type, - GenericData model, ParentValueContainer parent) { + GenericData model, ParentValueContainer setter) { + + LogicalType logicalType = schema.getLogicalType(); + // the expected type is always null because it is determined by the parent + // datum class, which never helps for generic. when logical types are added + // to specific, this should pass the expected type here. + Conversion conversion = model.getConversionFor(logicalType); + ParentValueContainer parent = ParentValueContainer + .getConversionContainer(setter, conversion, schema); + if (schema.getType().equals(Schema.Type.BOOLEAN)) { return new AvroConverters.FieldBooleanConverter(parent); } else if (schema.getType().equals(Schema.Type.INT)) { diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index e73e8af262..7d55bf5834 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -110,9 +110,9 @@ public RecordMaterializer prepareForRead( MessageType parquetSchema = readContext.getRequestedSchema(); Schema avroSchema; - if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) { + if (metadata.get(AVRO_READ_SCHEMA_METADATA_KEY) != null) { // use the Avro read schema provided by the user - avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY)); + avroSchema = new Schema.Parser().parse(metadata.get(AVRO_READ_SCHEMA_METADATA_KEY)); } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) { // use the Avro schema from the file metadata if present avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY)); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 61d7d8ef55..c0d6dc2a25 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -29,13 +29,24 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.LinkedHashMap; +import org.apache.avro.AvroTypeException; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericData; +import org.apache.avro.reflect.AvroIgnore; +import org.apache.avro.reflect.AvroName; +import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.Stringable; import org.apache.avro.specific.SpecificData; @@ -43,6 +54,7 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.avro.AvroConverters.FieldStringConverter; import org.apache.parquet.avro.AvroConverters.FieldStringableConverter; +import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -50,6 +62,8 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; +import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility; import static org.apache.parquet.schema.Type.Repetition.REPEATED; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; @@ -67,7 +81,8 @@ class AvroRecordConverter extends AvroConverters.AvroGroupConverter { private static final String JAVA_CLASS_PROP = "java-class"; private static final String JAVA_KEY_CLASS_PROP = "java-key-class"; - protected T currentRecord; + protected T currentRecord = null; + private ParentValueContainer rootContainer = null; private final Converter[] converters; private final Schema avroSchema; @@ -78,6 +93,15 @@ class AvroRecordConverter extends AvroConverters.AvroGroupConverter { public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) { this(null, parquetSchema, avroSchema, baseModel); + LogicalType logicalType = avroSchema.getLogicalType(); + Conversion conversion = baseModel.getConversionFor(logicalType); + this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + AvroRecordConverter.this.currentRecord = (T) value; + } + }, conversion, avroSchema); } public AvroRecordConverter(ParentValueContainer parent, @@ -99,6 +123,8 @@ public AvroRecordConverter(ParentValueContainer parent, recordClass = getDatumClass(avroSchema, model); } + Map> fields = getFieldsByName(recordClass, false); + int parquetFieldIndex = 0; for (Type parquetField: parquetSchema.getFields()) { final Schema.Field avroField = getAvroField(parquetField.getName()); @@ -110,8 +136,10 @@ public void add(Object value) { AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value); } }; + + Class fieldClass = fields.get(avroField.name()); converters[parquetFieldIndex] = newConverter( - nonNullSchema, parquetField, this.model, container); + nonNullSchema, parquetField, this.model, fieldClass, container); // @Stringable doesn't affect the reflected schema; must be enforced here if (recordClass != null && @@ -145,6 +173,43 @@ public void add(Object value) { } } + // this was taken from Avro's ReflectData + private static Map> getFieldsByName(Class recordClass, + boolean excludeJava) { + Map> fields = new LinkedHashMap>(); + + if (recordClass != null) { + Class current = recordClass; + do { + if (excludeJava && current.getPackage() != null + && current.getPackage().getName().startsWith("java.")) { + break; // skip java built-in classes + } + for (Field field : current.getDeclaredFields()) { + if (field.isAnnotationPresent(AvroIgnore.class) || + isTransientOrStatic(field)) { + continue; + } + AvroName altName = field.getAnnotation(AvroName.class); + Class existing = fields.put( + altName != null ? altName.value() : field.getName(), + field.getType()); + if (existing != null) { + throw new AvroTypeException( + current + " contains two fields named: " + field.getName()); + } + } + current = current.getSuperclass(); + } while (current != null); + } + + return fields; + } + + private static boolean isTransientOrStatic(Field field) { + return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0; + } + private Schema.Field getAvroField(String parquetFieldName) { Schema.Field avroField = avroSchema.getField(parquetFieldName); if (avroField != null) { @@ -162,12 +227,28 @@ private Schema.Field getAvroField(String parquetFieldName) { parquetFieldName)); } + private static Converter newConverter( + Schema schema, Type type, GenericData model, ParentValueContainer setter) { + return newConverter(schema, type, model, null, setter); + } + private static Converter newConverter(Schema schema, Type type, - GenericData model, ParentValueContainer parent) { + GenericData model, Class knownClass, ParentValueContainer setter) { + LogicalType logicalType = schema.getLogicalType(); + Conversion conversion; + if (knownClass != null) { + conversion = model.getConversionByClass(knownClass, logicalType); + } else { + conversion = model.getConversionFor(logicalType); + } + + ParentValueContainer parent = ParentValueContainer + .getConversionContainer(setter, conversion, schema); + if (schema.getType().equals(Schema.Type.BOOLEAN)) { return new AvroConverters.FieldBooleanConverter(parent); } else if (schema.getType().equals(Schema.Type.INT)) { - Class datumClass = getDatumClass(schema, model); + Class datumClass = getDatumClass(conversion, knownClass, schema, model); if (datumClass == null) { return new AvroConverters.FieldIntegerConverter(parent); } else if (datumClass == byte.class || datumClass == Byte.class) { @@ -185,7 +266,7 @@ private static Converter newConverter(Schema schema, Type type, } else if (schema.getType().equals(Schema.Type.DOUBLE)) { return new AvroConverters.FieldDoubleConverter(parent); } else if (schema.getType().equals(Schema.Type.BYTES)) { - Class datumClass = getDatumClass(schema, model); + Class datumClass = getDatumClass(conversion, knownClass, schema, model); if (datumClass == null) { return new AvroConverters.FieldByteBufferConverter(parent); } else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) { @@ -199,7 +280,7 @@ private static Converter newConverter(Schema schema, Type type, } else if (schema.getType().equals(Schema.Type.ENUM)) { return new AvroConverters.FieldEnumConverter(parent, schema, model); } else if (schema.getType().equals(Schema.Type.ARRAY)) { - Class datumClass = getDatumClass(schema, model); + Class datumClass = getDatumClass(conversion, knownClass, schema, model); if (datumClass != null && datumClass.isArray()) { return new AvroArrayConverter( parent, type.asGroupType(), schema, model, datumClass); @@ -263,8 +344,24 @@ private static Class getStringableClass(Schema schema, GenericData model) { } } - @SuppressWarnings("unchecked") private static Class getDatumClass(Schema schema, GenericData model) { + return getDatumClass(null, null, schema, model); + } + + @SuppressWarnings("unchecked") + private static Class getDatumClass(Conversion conversion, + Class knownClass, + Schema schema, GenericData model) { + if (conversion != null) { + // use generic classes to pass data to conversions + return null; + } + + // known class can be set when using reflect + if (knownClass != null) { + return knownClass; + } + if (model instanceof SpecificData) { // this works for reflect as well return ((SpecificData) model).getClass(schema); @@ -312,6 +409,9 @@ public void end() { fillInDefaults(); if (parent != null) { parent.add(currentRecord); + } else { + // this applies any converters needed for the root value + rootContainer.add(currentRecord); } } @@ -500,10 +600,10 @@ public AvroArrayConverter(ParentValueContainer parent, GroupType type, // matching it against the element schema. if (isElementType(repeatedType, elementSchema)) { // the element type is the repeated type (and required) - converter = newConverter(elementSchema, repeatedType, model, setter); + converter = newConverter(elementSchema, repeatedType, model, elementClass, setter); } else { // the element is wrapped in a synthetic group and may be optional - converter = new PrimitiveElementConverter( + converter = new ArrayElementConverter( repeatedType.asGroupType(), elementSchema, model, setter); } } @@ -641,20 +741,20 @@ public void add(Object value) { * } * */ - final class PrimitiveElementConverter extends GroupConverter { + final class ArrayElementConverter extends GroupConverter { private boolean isSet; private final Converter elementConverter; - public PrimitiveElementConverter(GroupType repeatedType, - Schema elementSchema, GenericData model, - final ParentValueContainer setter) { + public ArrayElementConverter(GroupType repeatedType, + Schema elementSchema, GenericData model, + final ParentValueContainer setter) { Type elementType = repeatedType.getType(0); Preconditions.checkArgument( !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED), "Cannot convert list of optional elements to primitive array"); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); this.elementConverter = newConverter( - nonNullElementSchema, elementType, model, new ParentValueContainer() { + nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() { @Override public void add(Object value) { isSet = true; @@ -732,6 +832,14 @@ public void end() { } } + // Converter used to test whether a requested schema is a 2-level schema. + // This is used to convert the file's type assuming that the file uses + // 2-level lists and the result is checked to see if it matches the requested + // element type. This should always convert assuming 2-level lists because + // 2-level and 3-level can't be mixed. + private static final AvroSchemaConverter CONVERTER = + new AvroSchemaConverter(true); + /** * Returns whether the given type is the element type of a list or is a * synthetic group with one field that is the element type. This is @@ -753,13 +861,12 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) { // synthetic wrapper. Must be a group with one optional or required field return true; } else if (elementSchema != null && - elementSchema.getType() == Schema.Type.RECORD && - elementSchema.getFields().size() == 1 && - elementSchema.getFields().get(0).name().equals( - repeatedType.asGroupType().getFieldName(0))) { - // The repeated type must be the element type because it matches the - // structure of the Avro element's schema. - return true; + elementSchema.getType() == Schema.Type.RECORD) { + Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType()); + if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated) + .getType() == COMPATIBLE) { + return true; + } } return false; } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 6cfa8d1196..70b6525f60 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -18,20 +18,26 @@ */ package org.apache.parquet.avro; -import java.util.*; - +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.codehaus.jackson.node.NullNode; import org.apache.parquet.schema.ConversionPatterns; +import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.avro.JsonProperties.NULL_VALUE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT; import static org.apache.parquet.schema.OriginalType.*; @@ -58,6 +64,17 @@ public AvroSchemaConverter() { this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; } + /** + * Constructor used by {@link AvroRecordConverter#isElementType}, which always + * uses the 2-level list conversion. + * + * @param assumeRepeatedIsListElement whether to assume 2-level lists + */ + AvroSchemaConverter(boolean assumeRepeatedIsListElement) { + this.assumeRepeatedIsListElement = assumeRepeatedIsListElement; + this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; + } + public AvroSchemaConverter(Configuration conf) { this.assumeRepeatedIsListElement = conf.getBoolean( ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT); @@ -113,26 +130,28 @@ private Type convertField(String fieldName, Schema schema) { return convertField(fieldName, schema, Type.Repetition.REQUIRED); } + @SuppressWarnings("deprecation") private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) { + Types.PrimitiveBuilder builder; Schema.Type type = schema.getType(); if (type.equals(Schema.Type.BOOLEAN)) { - return primitive(fieldName, BOOLEAN, repetition); + builder = Types.primitive(BOOLEAN, repetition); } else if (type.equals(Schema.Type.INT)) { - return primitive(fieldName, INT32, repetition); + builder = Types.primitive(INT32, repetition); } else if (type.equals(Schema.Type.LONG)) { - return primitive(fieldName, INT64, repetition); + builder = Types.primitive(INT64, repetition); } else if (type.equals(Schema.Type.FLOAT)) { - return primitive(fieldName, FLOAT, repetition); + builder = Types.primitive(FLOAT, repetition); } else if (type.equals(Schema.Type.DOUBLE)) { - return primitive(fieldName, DOUBLE, repetition); + builder = Types.primitive(DOUBLE, repetition); } else if (type.equals(Schema.Type.BYTES)) { - return primitive(fieldName, BINARY, repetition); + builder = Types.primitive(BINARY, repetition); } else if (type.equals(Schema.Type.STRING)) { - return primitive(fieldName, BINARY, repetition, UTF8); + builder = Types.primitive(BINARY, repetition).as(UTF8); } else if (type.equals(Schema.Type.RECORD)) { return new GroupType(repetition, fieldName, convertFields(schema.getFields())); } else if (type.equals(Schema.Type.ENUM)) { - return primitive(fieldName, BINARY, repetition, ENUM); + builder = Types.primitive(BINARY, repetition).as(ENUM); } else if (type.equals(Schema.Type.ARRAY)) { if (writeOldListStructure) { return ConversionPatterns.listType(repetition, fieldName, @@ -146,16 +165,36 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet // avro map key type is always string return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType); } else if (type.equals(Schema.Type.FIXED)) { - return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition, - schema.getFixedSize(), null); + builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .length(schema.getFixedSize()); } else if (type.equals(Schema.Type.UNION)) { return convertUnion(fieldName, schema, repetition); + } else { + throw new UnsupportedOperationException("Cannot convert Avro type " + type); } - throw new UnsupportedOperationException("Cannot convert Avro type " + type); + + // schema translation can only be done for known logical types because this + // creates an equivalence + LogicalType logicalType = schema.getLogicalType(); + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + builder = builder.as(DECIMAL) + .precision(((LogicalTypes.Decimal) logicalType).getPrecision()) + .scale(((LogicalTypes.Decimal) logicalType).getScale()); + + } else { + OriginalType annotation = convertLogicalType(logicalType); + if (annotation != null) { + builder.as(annotation); + } + } + } + + return builder.named(fieldName); } private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { - List nonNullSchemas = new ArrayList(schema.getTypes().size()); + List nonNullSchemas = new ArrayList(schema.getTypes().size()); for (Schema childSchema : schema.getTypes()) { if (childSchema.getType().equals(Schema.Type.NULL)) { if (Type.Repetition.REQUIRED == repetition) { @@ -175,7 +214,7 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet return convertField(fieldName, nonNullSchemas.get(0), repetition); default: // complex union type - List unionTypes = new ArrayList(nonNullSchemas.size()); + List unionTypes = new ArrayList(nonNullSchemas.size()); int index = 0; for (Schema childSchema : nonNullSchemas) { unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL)); @@ -188,25 +227,11 @@ private Type convertField(Schema.Field field) { return convertField(field.name(), field.schema()); } - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, - int typeLength, OriginalType originalType) { - return new PrimitiveType(repetition, primitive, typeLength, name, - originalType); - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, - OriginalType originalType) { - return new PrimitiveType(repetition, primitive, name, originalType); - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) { - return new PrimitiveType(repetition, primitive, name, null); + public Schema convert(MessageType parquetSchema) { + return convertFields(parquetSchema.getName(), parquetSchema.getFields()); } - public Schema convert(MessageType parquetSchema) { + Schema convert(GroupType parquetSchema) { return convertFields(parquetSchema.getName(), parquetSchema.getFields()); } @@ -217,10 +242,11 @@ private Schema convertFields(String name, List parquetFields) { if (parquetType.isRepetition(REPEATED)) { throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType); } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) { - fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null, - NullNode.getInstance())); + fields.add(new Schema.Field( + parquetType.getName(), optional(fieldSchema), null, NULL_VALUE)); } else { // REQUIRED - fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null)); + fields.add(new Schema.Field( + parquetType.getName(), fieldSchema, null, (Object) null)); } } Schema schema = Schema.createRecord(name, null, null, false); @@ -230,10 +256,11 @@ private Schema convertFields(String name, List parquetFields) { private Schema convertField(final Type parquetType) { if (parquetType.isPrimitive()) { + final PrimitiveType asPrimitive = parquetType.asPrimitiveType(); final PrimitiveTypeName parquetPrimitiveTypeName = - parquetType.asPrimitiveType().getPrimitiveTypeName(); - final OriginalType originalType = parquetType.getOriginalType(); - return parquetPrimitiveTypeName.convert( + asPrimitive.getPrimitiveTypeName(); + final OriginalType annotation = parquetType.getOriginalType(); + Schema schema = parquetPrimitiveTypeName.convert( new PrimitiveType.PrimitiveTypeNameConverter() { @Override public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { @@ -266,13 +293,24 @@ public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) { } @Override public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) { - if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { + if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) { return Schema.create(Schema.Type.STRING); } else { return Schema.create(Schema.Type.BYTES); } } }); + + LogicalType logicalType = convertOriginalType( + annotation, asPrimitive.getDecimalMetadata()); + if (logicalType != null && (annotation != DECIMAL || + parquetPrimitiveTypeName == BINARY || + parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) { + schema = logicalType.addToSchema(schema); + } + + return schema; + } else { GroupType parquetGroupType = parquetType.asGroupType(); OriginalType originalType = parquetGroupType.getOriginalType(); @@ -335,6 +373,46 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) { } } + private OriginalType convertLogicalType(LogicalType logicalType) { + if (logicalType == null) { + return null; + } else if (logicalType instanceof LogicalTypes.Decimal) { + return OriginalType.DECIMAL; + } else if (logicalType instanceof LogicalTypes.Date) { + return OriginalType.DATE; + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + return OriginalType.TIME_MILLIS; + } else if (logicalType instanceof LogicalTypes.TimeMicros) { + return OriginalType.TIME_MICROS; + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + return OriginalType.TIMESTAMP_MILLIS; + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return OriginalType.TIMESTAMP_MICROS; + } + return null; + } + + private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) { + if (annotation == null) { + return null; + } + switch (annotation) { + case DECIMAL: + return LogicalTypes.decimal(meta.getPrecision(), meta.getScale()); + case DATE: + return LogicalTypes.date(); + case TIME_MILLIS: + return LogicalTypes.timeMillis(); + case TIME_MICROS: + return LogicalTypes.timeMicros(); + case TIMESTAMP_MILLIS: + return LogicalTypes.timestampMillis(); + case TIMESTAMP_MICROS: + return LogicalTypes.timestampMicros(); + } + return null; + } + /** * Implements the rules for interpreting existing data from the logical type * spec for the LIST annotation. This is used to produce the expected schema. diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index c75bb032f6..460565bb01 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; @@ -69,6 +71,8 @@ public static void setAvroDataSupplier( private RecordConsumer recordConsumer; private MessageType rootSchema; private Schema rootAvroSchema; + private LogicalType rootLogicalType; + private Conversion rootConversion; private GenericData model; private ListWriter listWriter; @@ -82,6 +86,7 @@ public AvroWriteSupport() { public AvroWriteSupport(MessageType schema, Schema avroSchema) { this.rootSchema = schema; this.rootAvroSchema = avroSchema; + this.rootLogicalType = rootAvroSchema.getLogicalType(); this.model = null; } @@ -89,6 +94,7 @@ public AvroWriteSupport(MessageType schema, Schema avroSchema, GenericData model) { this.rootSchema = schema; this.rootAvroSchema = avroSchema; + this.rootLogicalType = rootAvroSchema.getLogicalType(); this.model = model; } @@ -136,16 +142,25 @@ public void prepareForWrite(RecordConsumer recordConsumer) { // overloaded version for backward compatibility @SuppressWarnings("unchecked") public void write(IndexedRecord record) { - recordConsumer.startMessage(); - writeRecordFields(rootSchema, rootAvroSchema, record); - recordConsumer.endMessage(); + write((T) record); } @Override public void write(T record) { - recordConsumer.startMessage(); - writeRecordFields(rootSchema, rootAvroSchema, record); - recordConsumer.endMessage(); + if (rootLogicalType != null) { + Conversion conversion = model.getConversionByClass( + record.getClass(), rootLogicalType); + + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootAvroSchema, + convert(rootAvroSchema, rootLogicalType, conversion, record)); + recordConsumer.endMessage(); + + } else { + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootAvroSchema, record); + recordConsumer.endMessage(); + } } private void writeRecord(GroupType schema, Schema avroSchema, @@ -226,6 +241,8 @@ private void writeUnion(GroupType parquetSchema, Schema avroSchema, } } + // TODO: what if the value is null? + // Sparsely populated method of encoding unions, each member has its own // set of columns. String memberName = "member" + parquetIndex; @@ -237,44 +254,108 @@ private void writeUnion(GroupType parquetSchema, Schema avroSchema, recordConsumer.endGroup(); } - @SuppressWarnings("unchecked") + /** + * Calls an appropriate write method based on the value. + * Value MUST not be null. + * + * @param type the Parquet type + * @param avroSchema the Avro schema + * @param value a non-null value to write + */ private void writeValue(Type type, Schema avroSchema, Object value) { Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema); - Schema.Type avroType = nonNullAvroSchema.getType(); - if (avroType.equals(Schema.Type.BOOLEAN)) { - recordConsumer.addBoolean((Boolean) value); - } else if (avroType.equals(Schema.Type.INT)) { - if (value instanceof Character) { - recordConsumer.addInteger((Character) value); - } else { - recordConsumer.addInteger(((Number) value).intValue()); - } - } else if (avroType.equals(Schema.Type.LONG)) { - recordConsumer.addLong(((Number) value).longValue()); - } else if (avroType.equals(Schema.Type.FLOAT)) { - recordConsumer.addFloat(((Number) value).floatValue()); - } else if (avroType.equals(Schema.Type.DOUBLE)) { - recordConsumer.addDouble(((Number) value).doubleValue()); - } else if (avroType.equals(Schema.Type.BYTES)) { - if (value instanceof byte[]) { - recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value)); - } else { - recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value)); - } - } else if (avroType.equals(Schema.Type.STRING)) { - recordConsumer.addBinary(fromAvroString(value)); - } else if (avroType.equals(Schema.Type.RECORD)) { - writeRecord(type.asGroupType(), nonNullAvroSchema, value); - } else if (avroType.equals(Schema.Type.ENUM)) { - recordConsumer.addBinary(Binary.fromString(value.toString())); - } else if (avroType.equals(Schema.Type.ARRAY)) { - listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value); - } else if (avroType.equals(Schema.Type.MAP)) { - writeMap(type.asGroupType(), nonNullAvroSchema, (Map) value); - } else if (avroType.equals(Schema.Type.UNION)) { - writeUnion(type.asGroupType(), nonNullAvroSchema, value); - } else if (avroType.equals(Schema.Type.FIXED)) { - recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes())); + LogicalType logicalType = nonNullAvroSchema.getLogicalType(); + if (logicalType != null) { + Conversion conversion = model.getConversionByClass( + value.getClass(), logicalType); + writeValueWithoutConversion(type, nonNullAvroSchema, + convert(nonNullAvroSchema, logicalType, conversion, value)); + } else { + writeValueWithoutConversion(type, nonNullAvroSchema, value); + } + } + + private Object convert(Schema schema, LogicalType logicalType, + Conversion conversion, Object datum) { + if (conversion == null) { + return datum; + } + Class fromClass = conversion.getConvertedType(); + switch (schema.getType()) { + case RECORD: return conversion.toRecord(fromClass.cast(datum), schema, logicalType); + case ENUM: return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType); + case ARRAY: return conversion.toArray(fromClass.cast(datum), schema, logicalType); + case MAP: return conversion.toMap(fromClass.cast(datum), schema, logicalType); + case FIXED: return conversion.toFixed(fromClass.cast(datum), schema, logicalType); + case STRING: return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType); + case BYTES: return conversion.toBytes(fromClass.cast(datum), schema, logicalType); + case INT: return conversion.toInt(fromClass.cast(datum), schema, logicalType); + case LONG: return conversion.toLong(fromClass.cast(datum), schema, logicalType); + case FLOAT: return conversion.toFloat(fromClass.cast(datum), schema, logicalType); + case DOUBLE: return conversion.toDouble(fromClass.cast(datum), schema, logicalType); + case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType); + } + return datum; + } + + /** + * Calls an appropriate write method based on the value. + * Value must not be null and the schema must not be nullable. + * + * @param type a Parquet type + * @param avroSchema a non-nullable Avro schema + * @param value a non-null value to write + */ + @SuppressWarnings("unchecked") + private void writeValueWithoutConversion(Type type, Schema avroSchema, Object value) { + switch (avroSchema.getType()) { + case BOOLEAN: + recordConsumer.addBoolean((Boolean) value); + break; + case INT: + if (value instanceof Character) { + recordConsumer.addInteger((Character) value); + } else { + recordConsumer.addInteger(((Number) value).intValue()); + } + break; + case LONG: + recordConsumer.addLong(((Number) value).longValue()); + break; + case FLOAT: + recordConsumer.addFloat(((Number) value).floatValue()); + break; + case DOUBLE: + recordConsumer.addDouble(((Number) value).doubleValue()); + break; + case FIXED: + recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes())); + break; + case BYTES: + if (value instanceof byte[]) { + recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value)); + } else { + recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value)); + } + break; + case STRING: + recordConsumer.addBinary(fromAvroString(value)); + break; + case RECORD: + writeRecord(type.asGroupType(), avroSchema, value); + break; + case ENUM: + recordConsumer.addBinary(Binary.fromString(value.toString())); + break; + case ARRAY: + listWriter.writeList(type.asGroupType(), avroSchema, value); + break; + case MAP: + writeMap(type.asGroupType(), avroSchema, (Map) value); + break; + case UNION: + writeUnion(type.asGroupType(), avroSchema, value); + break; } } @@ -283,7 +364,7 @@ private Binary fromAvroString(Object value) { Utf8 utf8 = (Utf8) value; return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength()); } - return Binary.fromString(value.toString()); + return Binary.fromCharSequence((CharSequence) value); } private static GenericData getDataModel(Configuration conf) { diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java index 67b710dbb7..f36f5fc96d 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java @@ -18,6 +18,16 @@ */ package org.apache.parquet.avro; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericEnumSymbol; +import org.apache.avro.generic.IndexedRecord; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; + abstract class ParentValueContainer { /** @@ -60,4 +70,169 @@ public void addDouble(double value) { add(value); } + static class LogicalTypePrimitiveContainer extends ParentValueContainer { + private final ParentValueContainer wrapped; + private final Schema schema; + private final LogicalType logicalType; + private final Conversion conversion; + + public LogicalTypePrimitiveContainer(ParentValueContainer wrapped, + Schema schema, Conversion conversion) { + this.wrapped = wrapped; + this.schema = schema; + this.logicalType = schema.getLogicalType(); + this.conversion = conversion; + } + + @Override + public void addDouble(double value) { + wrapped.add(conversion.fromDouble(value, schema, logicalType)); + } + + @Override + public void addFloat(float value) { + wrapped.add(conversion.fromFloat(value, schema, logicalType)); + } + + @Override + public void addLong(long value) { + wrapped.add(conversion.fromLong(value, schema, logicalType)); + } + + @Override + public void addInt(int value) { + wrapped.add(conversion.fromInt(value, schema, logicalType)); + } + + @Override + public void addShort(short value) { + wrapped.add(conversion.fromInt((int) value, schema, logicalType)); + } + + @Override + public void addChar(char value) { + wrapped.add(conversion.fromInt((int) value, schema, logicalType)); + } + + @Override + public void addByte(byte value) { + wrapped.add(conversion.fromInt((int) value, schema, logicalType)); + } + + @Override + public void addBoolean(boolean value) { + wrapped.add(conversion.fromBoolean(value, schema, logicalType)); + } + } + + static ParentValueContainer getConversionContainer( + final ParentValueContainer parent, final Conversion conversion, + final Schema schema) { + if (conversion == null) { + return parent; + } + + final LogicalType logicalType = schema.getLogicalType(); + + switch (schema.getType()) { + case STRING: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromCharSequence( + (CharSequence) value, schema, logicalType)); + } + }; + case BOOLEAN: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromBoolean( + (Boolean) value, schema, logicalType)); + } + }; + case INT: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromInt( + (Integer) value, schema, logicalType)); + } + }; + case LONG: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromLong( + (Long) value, schema, logicalType)); + } + }; + case FLOAT: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromFloat( + (Float) value, schema, logicalType)); + } + }; + case DOUBLE: + return new LogicalTypePrimitiveContainer(parent, schema, conversion) { + @Override + public void add(Object value) { + parent.add(conversion.fromDouble( + (Double) value, schema, logicalType)); + } + }; + case BYTES: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromBytes( + (ByteBuffer) value, schema, logicalType)); + } + }; + case FIXED: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromFixed( + (GenericData.Fixed) value, schema, logicalType)); + } + }; + case RECORD: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromRecord( + (IndexedRecord) value, schema, logicalType)); + } + }; + case ARRAY: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromArray( + (Collection) value, schema, logicalType)); + } + }; + case MAP: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromMap( + (Map) value, schema, logicalType)); + } + }; + case ENUM: + return new ParentValueContainer() { + @Override + public void add(Object value) { + parent.add(conversion.fromEnumSymbol( + (GenericEnumSymbol) value, schema, logicalType)); + } + }; + default: + return new LogicalTypePrimitiveContainer(parent, schema, conversion); + } + } } diff --git a/parquet-avro/src/main/resources/META-INF/LICENSE b/parquet-avro/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..20b23c9604 --- /dev/null +++ b/parquet-avro/src/main/resources/META-INF/LICENSE @@ -0,0 +1,186 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + +-------------------------------------------------------------------------------- + +This product includes code from Apache Avro. + +Copyright: 2014 The Apache Software Foundation. +Home page: https://avro.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + diff --git a/parquet-avro/src/main/resources/META-INF/NOTICE b/parquet-avro/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..7b5682c703 --- /dev/null +++ b/parquet-avro/src/main/resources/META-INF/NOTICE @@ -0,0 +1,18 @@ + +Apache Parquet MR (Incubating) +Copyright 2014-2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +-------------------------------------------------------------------------------- + +This product includes code from Apache Avro, which includes the following in +its NOTICE file: + + Apache Avro + Copyright 2010-2015 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java index d5fe11adcc..f4682d6f21 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java @@ -19,11 +19,21 @@ package org.apache.parquet.avro; import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; import org.codehaus.jackson.node.NullNode; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; public class AvroTestUtil { @@ -66,4 +76,47 @@ public static GenericRecord instance(Schema schema, Object... pairs) { return record; } + public static List read(GenericData model, Schema schema, File file) throws IOException { + List data = new ArrayList(); + Configuration conf = new Configuration(false); + AvroReadSupport.setRequestedProjection(conf, schema); + AvroReadSupport.setAvroReadSchema(conf, schema); + ParquetReader fileReader = AvroParquetReader + .builder(new Path(file.toString())) + .withDataModel(model) // reflect disables compatibility + .withConf(conf) + .build(); + + try { + D datum; + while ((datum = fileReader.read()) != null) { + data.add(datum); + } + } finally { + fileReader.close(); + } + + return data; + } + + @SuppressWarnings("unchecked") + public static File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException { + File file = temp.newFile(); + Assert.assertTrue(file.delete()); + ParquetWriter writer = AvroParquetWriter + .builder(new Path(file.toString())) + .withDataModel(model) + .withSchema(schema) + .build(); + + try { + for (D datum : data) { + writer.write(datum); + } + } finally { + writer.close(); + } + + return file; + } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index 9c29e5030f..aa577ab579 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -18,28 +18,23 @@ */ package org.apache.parquet.avro; -import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.DirectWriterTest; import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import static org.apache.parquet.avro.AvroTestUtil.array; import static org.apache.parquet.avro.AvroTestUtil.field; @@ -49,15 +44,15 @@ import static org.apache.parquet.avro.AvroTestUtil.primitive; import static org.apache.parquet.avro.AvroTestUtil.record; -public class TestArrayCompatibility { - - @Rule - public final TemporaryFolder tempDir = new TemporaryFolder(); +public class TestArrayCompatibility extends DirectWriterTest { + public static final Configuration OLD_BEHAVIOR_CONF = new Configuration(); public static final Configuration NEW_BEHAVIOR_CONF = new Configuration(); @BeforeClass public static void setupNewBehaviorConfiguration() { + OLD_BEHAVIOR_CONF.setBoolean( + AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, true); NEW_BEHAVIOR_CONF.setBoolean( AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false); } @@ -1045,71 +1040,143 @@ public void write(RecordConsumer rc) { assertReaderContains(newBehaviorReader(test), newSchema, newRecord); } - private interface DirectWriter { - public void write(RecordConsumer consumer); - } + @Test + public void testListOfSingleElementStructsWithElementField() + throws Exception { + Path test = writeDirect( + "message ListOfSingleElementStructsWithElementField {" + + " optional group list_of_structs (LIST) {" + + " repeated group list {" + + " required group element {" + + " required float element;" + + " }" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_structs", 0); - private static class DirectWriteSupport extends WriteSupport { - private RecordConsumer recordConsumer; - private final MessageType type; - private final DirectWriter writer; - private final Map metadata; - - private DirectWriteSupport(MessageType type, DirectWriter writer, - Map metadata) { - this.type = type; - this.writer = writer; - this.metadata = metadata; - } + rc.startGroup(); + rc.startField("list", 0); // start writing array contents - @Override - public WriteContext init(Configuration configuration) { - return new WriteContext(type, metadata); - } + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } + // the inner element field + rc.startGroup(); + rc.startField("element", 0); + rc.addFloat(33.0F); + rc.endField("element", 0); + rc.endGroup(); - @Override - public void write(Void record) { - writer.write(recordConsumer); - } - } + rc.endField("element", 0); + rc.endGroup(); // array level - private Path writeDirect(String type, DirectWriter writer) throws IOException { - return writeDirect(MessageTypeParser.parseMessageType(type), writer); - } + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); - private Path writeDirect(String type, DirectWriter writer, - Map metadata) throws IOException { - return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata); - } + // the inner element field + rc.startGroup(); + rc.startField("element", 0); + rc.addFloat(34.0F); + rc.endField("element", 0); + rc.endGroup(); - private Path writeDirect(MessageType type, DirectWriter writer) throws IOException { - return writeDirect(type, writer, new HashMap()); - } + rc.endField("element", 0); + rc.endGroup(); // array level - private Path writeDirect(MessageType type, DirectWriter writer, - Map metadata) throws IOException { - File temp = tempDir.newFile(UUID.randomUUID().toString()); - temp.deleteOnExit(); - temp.delete(); + rc.endField("list", 0); // finished writing array contents + rc.endGroup(); - Path path = new Path(temp.getPath()); + rc.endField("list_of_structs", 0); + rc.endMessage(); + } + }); - ParquetWriter parquetWriter = new ParquetWriter( - path, new DirectWriteSupport(type, writer, metadata)); - parquetWriter.write(null); - parquetWriter.close(); + Schema structWithElementField = record("element", + field("element", primitive(Schema.Type.FLOAT))); - return path; + // old behavior - assume that the repeated type is the element type + Schema elementRecord = record("list", + field("element", structWithElementField)); + Schema oldSchema = record("ListOfSingleElementStructsWithElementField", + optionalField("list_of_structs", array(elementRecord))); + GenericRecord oldRecord = instance(oldSchema, + "list_of_structs", Arrays.asList( + instance(elementRecord, "element", + instance(structWithElementField, "element", 33.0F)), + instance(elementRecord, "element", + instance(structWithElementField, "element", 34.0F)))); + + // check the schema + ParquetFileReader reader = ParquetFileReader + .open(new Configuration(), test); + MessageType fileSchema = reader.getFileMetaData().getSchema(); + Assert.assertEquals("Converted schema should assume 2-layer structure", + oldSchema, + new AvroSchemaConverter(OLD_BEHAVIOR_CONF).convert(fileSchema)); + + // both should default to the 2-layer structure + assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord); + + Schema newSchema = record("ListOfSingleElementStructsWithElementField", + optionalField("list_of_structs", array(structWithElementField))); + GenericRecord newRecord = instance(newSchema, + "list_of_structs", Arrays.asList( + instance(structWithElementField, "element", 33.0F), + instance(structWithElementField, "element", 34.0F))); + + // check the schema + Assert.assertEquals("Converted schema should assume 3-layer structure", + newSchema, + new AvroSchemaConverter(NEW_BEHAVIOR_CONF).convert(fileSchema)); + assertReaderContains(newBehaviorReader(test), newSchema, newRecord); + + // check that this works with compatible nested schemas + + Schema structWithDoubleElementField = record("element", + field("element", primitive(Schema.Type.DOUBLE))); + + Schema doubleElementRecord = record("list", + field("element", structWithDoubleElementField)); + Schema oldDoubleSchema = record( + "ListOfSingleElementStructsWithElementField", + optionalField("list_of_structs", array(doubleElementRecord))); + GenericRecord oldDoubleRecord = instance(oldDoubleSchema, + "list_of_structs", Arrays.asList( + instance(doubleElementRecord, "element", + instance(structWithDoubleElementField, "element", 33.0)), + instance(doubleElementRecord, "element", + instance(structWithDoubleElementField, "element", 34.0)))); + assertReaderContains(oldBehaviorReader(test, oldDoubleSchema), + oldDoubleSchema, oldDoubleRecord); + + Schema newDoubleSchema = record( + "ListOfSingleElementStructsWithElementField", + optionalField("list_of_structs", array(structWithDoubleElementField))); + GenericRecord newDoubleRecord = instance(newDoubleSchema, + "list_of_structs", Arrays.asList( + instance(structWithDoubleElementField, "element", 33.0), + instance(structWithDoubleElementField, "element", 34.0))); + assertReaderContains(newBehaviorReader(test, newDoubleSchema), + newDoubleSchema, newDoubleRecord); } public AvroParquetReader oldBehaviorReader( Path path) throws IOException { - return new AvroParquetReader(path); + return new AvroParquetReader(OLD_BEHAVIOR_CONF, path); + } + + public AvroParquetReader oldBehaviorReader( + Path path, Schema expectedSchema) throws IOException { + Configuration conf = new Configuration(OLD_BEHAVIOR_CONF); + AvroReadSupport.setAvroReadSchema(conf, expectedSchema); + return new AvroParquetReader(conf, path); } public AvroParquetReader newBehaviorReader( @@ -1117,6 +1184,13 @@ public AvroParquetReader newBehaviorReader( return new AvroParquetReader(NEW_BEHAVIOR_CONF, path); } + public AvroParquetReader newBehaviorReader( + Path path, Schema expectedSchema) throws IOException { + Configuration conf = new Configuration(NEW_BEHAVIOR_CONF); + AvroReadSupport.setAvroReadSchema(conf, expectedSchema); + return new AvroParquetReader(conf, path); + } + public void assertReaderContains( AvroParquetReader reader, Schema expectedSchema, T... expectedRecords) throws IOException { diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index b393615ec4..942e3b1378 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -20,16 +20,37 @@ import com.google.common.collect.Lists; import com.google.common.io.Resources; -import java.util.Arrays; -import java.util.Collections; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import org.codehaus.jackson.node.NullNode; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; +import java.util.Arrays; +import java.util.Collections; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.parquet.schema.OriginalType.DATE; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS; +import static org.apache.parquet.schema.OriginalType.TIME_MICROS; +import static org.apache.parquet.schema.OriginalType.TIME_MILLIS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; public class TestAvroSchemaConverter { @@ -131,7 +152,7 @@ private void testRoundTripConversion( @Test(expected = IllegalArgumentException.class) public void testTopLevelMustBeARecord() { - new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT)); + new AvroSchemaConverter().convert(Schema.create(INT)); } @Test @@ -270,7 +291,7 @@ public void testParquetMapWithNonStringKeyFails() throws Exception { @Test public void testOptionalFields() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); - Schema optionalInt = optional(Schema.create(Schema.Type.INT)); + Schema optionalInt = optional(Schema.create(INT)); schema.setFields(Arrays.asList( new Schema.Field("myint", optionalInt, null, NullNode.getInstance()) )); @@ -284,7 +305,7 @@ public void testOptionalFields() throws Exception { @Test public void testOptionalMapValue() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); - Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT))); + Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT))); schema.setFields(Arrays.asList( new Schema.Field("myintmap", optionalIntMap, null, null) )); @@ -303,7 +324,7 @@ public void testOptionalMapValue() throws Exception { @Test public void testOptionalArrayElement() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); - Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT))); + Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT))); schema.setFields(Arrays.asList( new Schema.Field("myintarray", optionalIntArray, null, null) )); @@ -323,7 +344,7 @@ public void testUnionOfTwoTypes() throws Exception { Schema schema = Schema.createRecord("record2", null, null, false); Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type .NULL), - Schema.create(Schema.Type.INT), + Schema.create(INT), Schema.create(Schema.Type.FLOAT))); schema.setFields(Arrays.asList( new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance()))); @@ -396,7 +417,7 @@ public void testArrayOfOptionalRecordsOldBehavior() throws Exception { @Test public void testOldAvroListOfLists() throws Exception { Schema listOfLists = optional(Schema.createArray(Schema.createArray( - Schema.create(Schema.Type.INT)))); + Schema.create(INT)))); Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) @@ -425,7 +446,7 @@ public void testOldAvroListOfLists() throws Exception { @Test public void testOldThriftListOfLists() throws Exception { Schema listOfLists = optional(Schema.createArray(Schema.createArray( - Schema.create(Schema.Type.INT)))); + Schema.create(INT)))); Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) @@ -458,7 +479,7 @@ public void testUnknownTwoLevelListOfLists() throws Exception { // group's name, but it must be 2-level because the repeated group doesn't // contain an optional or repeated element as required for 3-level lists Schema listOfLists = optional(Schema.createArray(Schema.createArray( - Schema.create(Schema.Type.INT)))); + Schema.create(INT)))); Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) @@ -488,7 +509,7 @@ public void testUnknownTwoLevelListOfLists() throws Exception { @Test public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception { Schema schema = Schema.createRecord("myrecord", null, null, false); - Schema map = Schema.createMap(Schema.create(Schema.Type.INT)); + Schema map = Schema.createMap(Schema.create(INT)); schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null))); String parquetSchema = "message myrecord {\n" + @@ -504,9 +525,240 @@ public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception { testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema); } + @Test + public void testDecimalBytesType() throws Exception { + Schema schema = Schema.createRecord("myrecord", null, null, false); + Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( + Schema.create(Schema.Type.BYTES)); + schema.setFields(Collections.singletonList( + new Schema.Field("dec", decimal, null, null))); + + testRoundTripConversion(schema, + "message myrecord {\n" + + " required binary dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDecimalFixedType() throws Exception { + Schema schema = Schema.createRecord("myrecord", null, null, false); + Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( + Schema.createFixed("dec", null, null, 8)); + schema.setFields(Collections.singletonList( + new Schema.Field("dec", decimal, null, null))); + + testRoundTripConversion(schema, + "message myrecord {\n" + + " required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDecimalIntegerType() throws Exception { + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field( + "dec", Schema.create(INT), null, null))); + + // the decimal portion is lost because it isn't valid in Avro + testParquetToAvroConversion(expected, + "message myrecord {\n" + + " required int32 dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDecimalLongType() throws Exception { + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null))); + + // the decimal portion is lost because it isn't valid in Avro + testParquetToAvroConversion(expected, + "message myrecord {\n" + + " required int64 dec (DECIMAL(9,2));\n" + + "}\n"); + } + + @Test + public void testDateType() throws Exception { + Schema date = LogicalTypes.date().addToSchema(Schema.create(INT)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("date", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int32 date (DATE);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", DATE); + } + + assertThrows("Should not allow TIME_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimeMillisType() throws Exception { + Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("time", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int32 time (TIME_MILLIS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS); + } + + assertThrows("Should not allow TIME_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimeMicrosType() throws Exception { + Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("time", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 time (TIME_MICROS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS); + } + + assertThrows("Should not allow TIME_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimestampMillisType() throws Exception { + Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("timestamp", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 timestamp (TIMESTAMP_MILLIS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS); + } + + assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + + @Test + public void testTimestampMicrosType() throws Exception { + Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("timestamp", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 timestamp (TIMESTAMP_MICROS);\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS); + } + + assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive, + IllegalArgumentException.class, new Runnable() { + @Override + public void run() { + new AvroSchemaConverter().convert(message(type)); + } + }); + } + } + public static Schema optional(Schema original) { return Schema.createUnion(Lists.newArrayList( Schema.create(Schema.Type.NULL), original)); } + + public static MessageType message(PrimitiveType primitive) { + return Types.buildMessage() + .addField(primitive) + .named("myrecord"); + } + + /** + * A convenience method to avoid a large number of @Test(expected=...) tests + * @param message A String message to describe this assertion + * @param expected An Exception class that the Runnable should throw + * @param runnable A Runnable that is expected to throw the exception + */ + public static void assertThrows( + String message, Class expected, Runnable runnable) { + try { + runnable.run(); + Assert.fail("No exception was thrown (" + message + "), expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(message, expected, actual.getClass()); + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java new file mode 100644 index 0000000000..d2f80edf91 --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.avro; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.util.Utf8; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * This class is based on org.apache.avro.TestCircularReferences + * + * The main difference between this class and the Avro version is that this one + * uses a place-holder schema for the circular reference from Child to Parent. + * This avoids creating a schema for Parent that references itself and can't be + * converted to a Parquet schema. The place-holder schema must also have a + * referenceable logical type. + */ +public class TestCircularReferences { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + public static class Reference extends LogicalType { + private static final String REFERENCE = "reference"; + private static final String REF_FIELD_NAME = "ref-field-name"; + + private final String refFieldName; + + public Reference(String refFieldName) { + super(REFERENCE); + this.refFieldName = refFieldName; + } + + public Reference(Schema schema) { + super(REFERENCE); + this.refFieldName = schema.getProp(REF_FIELD_NAME); + } + + @Override + public Schema addToSchema(Schema schema) { + super.addToSchema(schema); + schema.addProp(REF_FIELD_NAME, refFieldName); + return schema; + } + + @Override + public String getName() { + return REFERENCE; + } + + public String getRefFieldName() { + return refFieldName; + } + + @Override + public void validate(Schema schema) { + super.validate(schema); + if (schema.getField(refFieldName) == null) { + throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName); + } + } + } + + public static class Referenceable extends LogicalType { + private static final String REFERENCEABLE = "referenceable"; + private static final String ID_FIELD_NAME = "id-field-name"; + + private final String idFieldName; + + public Referenceable(String idFieldName) { + super(REFERENCEABLE); + this.idFieldName = idFieldName; + } + + public Referenceable(Schema schema) { + super(REFERENCEABLE); + this.idFieldName = schema.getProp(ID_FIELD_NAME); + } + + @Override + public Schema addToSchema(Schema schema) { + super.addToSchema(schema); + schema.addProp(ID_FIELD_NAME, idFieldName); + return schema; + } + + @Override + public String getName() { + return REFERENCEABLE; + } + + public String getIdFieldName() { + return idFieldName; + } + + @Override + public void validate(Schema schema) { + super.validate(schema); + Schema.Field idField = schema.getField(idFieldName); + if (idField == null || idField.schema().getType() != Schema.Type.LONG) { + throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField); + } + } + } + + @BeforeClass + public static void addReferenceTypes() { + LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() { + @Override + public LogicalType fromSchema(Schema schema) { + return new Referenceable(schema); + } + }); + LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() { + @Override + public LogicalType fromSchema(Schema schema) { + return new Reference(schema); + } + }); + } + + public static class ReferenceManager { + private interface Callback { + void set(Object referenceable); + } + + private final Map references = new HashMap(); + private final Map ids = new IdentityHashMap(); + private final Map> callbacksById = new HashMap>(); + private final ReferenceableTracker tracker = new ReferenceableTracker(); + private final ReferenceHandler handler = new ReferenceHandler(); + + public ReferenceableTracker getTracker() { + return tracker; + } + + public ReferenceHandler getHandler() { + return handler; + } + + public class ReferenceableTracker extends Conversion { + @Override + @SuppressWarnings("unchecked") + public Class getConvertedType() { + return (Class) Record.class; + } + + @Override + public String getLogicalTypeName() { + return Referenceable.REFERENCEABLE; + } + + @Override + public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) { + // read side + long id = getId(value, schema); + + // keep track of this for later references + references.put(id, value); + + // call any callbacks waiting to resolve this id + List callbacks = callbacksById.get(id); + for (Callback callback : callbacks) { + callback.set(value); + } + + return value; + } + + @Override + public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) { + // write side + long id = getId(value, schema); + + // keep track of this for later references + //references.put(id, value); + ids.put(value, id); + + return value; + } + + private long getId(IndexedRecord referenceable, Schema schema) { + Referenceable info = (Referenceable) schema.getLogicalType(); + int idField = schema.getField(info.getIdFieldName()).pos(); + return (Long) referenceable.get(idField); + } + } + + public class ReferenceHandler extends Conversion { + @Override + @SuppressWarnings("unchecked") + public Class getConvertedType() { + return (Class) Record.class; + } + + @Override + public String getLogicalTypeName() { + return Reference.REFERENCE; + } + + @Override + public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) { + // read side: resolve the record or save a callback + final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName()); + + Long id = (Long) record.get(refField.pos()); + if (id != null) { + if (references.containsKey(id)) { + record.put(refField.pos(), references.get(id)); + + } else { + List callbacks = callbacksById.get(id); + if (callbacks == null) { + callbacks = new ArrayList(); + callbacksById.put(id, callbacks); + } + // add a callback to resolve this reference when the id is available + callbacks.add(new Callback() { + @Override + public void set(Object referenceable) { + record.put(refField.pos(), referenceable); + } + }); + } + } + + return record; + } + + @Override + public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) { + // write side: replace a referenced field with its id + Schema.Field refField = schema.getField(((Reference) type).getRefFieldName()); + IndexedRecord referenced = (IndexedRecord) record.get(refField.pos()); + if (referenced == null) { + return record; + } + + // hijack the field to return the id instead of the ref + return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced)); + } + } + + private static class HijackingIndexedRecord implements IndexedRecord { + private final IndexedRecord wrapped; + private final int index; + private final Object data; + + public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) { + this.wrapped = wrapped; + this.index = index; + this.data = data; + } + + @Override + public void put(int i, Object v) { + throw new RuntimeException("[BUG] This is a read-only class."); + } + + @Override + public Object get(int i) { + if (i == index) { + return data; + } + return wrapped.get(i); + } + + @Override + public Schema getSchema() { + return wrapped.getSchema(); + } + } + } + + @Test + public void test() throws IOException { + ReferenceManager manager = new ReferenceManager(); + GenericData model = new GenericData(); + model.addLogicalTypeConversion(manager.getTracker()); + model.addLogicalTypeConversion(manager.getHandler()); + + Schema parentSchema = Schema.createRecord("Parent", null, null, false); + + Schema placeholderSchema = Schema.createRecord("Placeholder", null, null, false); + List placeholderFields = new ArrayList(); + placeholderFields.add( // at least one field is needed to be a valid schema + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); + placeholderSchema.setFields(placeholderFields); + + Referenceable idRef = new Referenceable("id"); + + Schema parentRefSchema = Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.create(Schema.Type.LONG), + idRef.addToSchema(placeholderSchema)); + + Reference parentRef = new Reference("parent"); + + List childFields = new ArrayList(); + childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null)); + childFields.add(new Schema.Field("parent", parentRefSchema, null, null)); + Schema childSchema = parentRef.addToSchema( + Schema.createRecord("Child", null, null, false, childFields)); + + List parentFields = new ArrayList(); + parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); + parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null)); + parentFields.add(new Schema.Field("child", childSchema, null, null)); + parentSchema.setFields(parentFields); + + Schema schema = idRef.addToSchema(parentSchema); + + System.out.println("Schema: " + schema.toString(true)); + + Record parent = new Record(schema); + parent.put("id", 1L); + parent.put("p", "parent data!"); + + Record child = new Record(childSchema); + child.put("c", "child data!"); + child.put("parent", parent); + + parent.put("child", child); + + // serialization round trip + File data = AvroTestUtil.write(temp, model, schema, parent); + List records = AvroTestUtil.read(model, schema, data); + + Record actual = records.get(0); + + // because the record is a recursive structure, equals won't work + Assert.assertEquals("Should correctly read back the parent id", + 1L, actual.get("id")); + Assert.assertEquals("Should correctly read back the parent data", + new Utf8("parent data!"), actual.get("p")); + + Record actualChild = (Record) actual.get("child"); + Assert.assertEquals("Should correctly read back the child data", + new Utf8("child data!"), actualChild.get("c")); + Object childParent = actualChild.get("parent"); + Assert.assertTrue("Should have a parent Record object", + childParent instanceof Record); + + Record childParentRecord = (Record) actualChild.get("parent"); + Assert.assertEquals("Should have the right parent id", + 1L, childParentRecord.get("id")); + Assert.assertEquals("Should have the right parent data", + new Utf8("parent data!"), childParentRecord.get("p")); + } +} diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java new file mode 100644 index 0000000000..6809fff3f3 --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.avro.Conversion; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import static org.apache.avro.Schema.Type.STRING; +import static org.apache.parquet.avro.AvroTestUtil.field; +import static org.apache.parquet.avro.AvroTestUtil.instance; +import static org.apache.parquet.avro.AvroTestUtil.optionalField; +import static org.apache.parquet.avro.AvroTestUtil.read; +import static org.apache.parquet.avro.AvroTestUtil.record; + +/** + * This class is based on org.apache.avro.generic.TestGenericLogicalTypes + */ +public class TestGenericLogicalTypes { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + public static final GenericData GENERIC = new GenericData(); + public static final LogicalType DECIMAL_9_2 = LogicalTypes.decimal(9, 2); + public static final BigDecimal D1 = new BigDecimal("-34.34"); + public static final BigDecimal D2 = new BigDecimal("117230.00"); + + + @BeforeClass + public static void addDecimalAndUUID() { + GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion()); + GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion()); + } + + private List getFieldValues(Collection records, String field, + Class expectedClass) { + List values = new ArrayList(); + for (GenericRecord record : records) { + values.add(expectedClass.cast(record.get(field))); + } + return values; + } + + @Test + public void testReadUUID() throws IOException { + Schema uuidSchema = record("R", + field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); + GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID()); + + Schema stringSchema = record("R", field("uuid", Schema.create(STRING))); + GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString()); + GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString()); + + File test = write(stringSchema, s1, s2); + Assert.assertEquals("Should convert Strings to UUIDs", + Arrays.asList(u1, u2), read(GENERIC, uuidSchema, test)); + } + + @Test + public void testWriteUUIDReadStringSchema() throws IOException { + Schema uuidSchema = record("R", + field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); + GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID()); + + Schema stringUuidSchema = Schema.create(STRING); + stringUuidSchema.addProp(GenericData.STRING_PROP, "String"); + Schema stringSchema = record("R", field("uuid", stringUuidSchema)); + GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString()); + GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString()); + + File test = write(GENERIC, uuidSchema, u1, u2); + Assert.assertEquals("Should read UUIDs as Strings", + Arrays.asList(s1, s2), read(GENERIC, stringSchema, test)); + } + + @Test + public void testWriteUUIDReadStringMissingLogicalType() throws IOException { + Schema uuidSchema = record("R", + field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); + GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID()); + + GenericRecord s1 = instance(uuidSchema, "uuid", new Utf8(u1.get("uuid").toString())); + GenericRecord s2 = instance(uuidSchema, "uuid", new Utf8(u2.get("uuid").toString())); + + File test = write(GENERIC, uuidSchema, u1, u2); + Assert.assertEquals("Should read UUIDs as Strings", + Arrays.asList(s1, s2), read(GenericData.get(), uuidSchema, test)); + } + + @Test + public void testWriteNullableUUID() throws IOException { + Schema nullableUuidSchema = record("R", + optionalField("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); + GenericRecord u1 = instance(nullableUuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(nullableUuidSchema, "uuid", UUID.randomUUID()); + + Schema stringUuidSchema = Schema.create(STRING); + stringUuidSchema.addProp(GenericData.STRING_PROP, "String"); + Schema nullableStringSchema = record("R", optionalField("uuid", stringUuidSchema)); + GenericRecord s1 = instance(nullableStringSchema, "uuid", u1.get("uuid").toString()); + GenericRecord s2 = instance(nullableStringSchema, "uuid", u2.get("uuid").toString()); + + File test = write(GENERIC, nullableUuidSchema, u1, u2); + Assert.assertEquals("Should read UUIDs as Strings", + Arrays.asList(s1, s2), read(GENERIC, nullableStringSchema, test)); + } + + @Test + public void testReadDecimalFixed() throws IOException { + Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4); + Schema fixedRecord = record("R", field("dec", fixedSchema)); + Schema decimalSchema = DECIMAL_9_2.addToSchema( + Schema.createFixed("aFixed", null, null, 4)); + Schema decimalRecord = record("R", field("dec", decimalSchema)); + + GenericRecord r1 = instance(decimalRecord, "dec", D1); + GenericRecord r2 = instance(decimalRecord, "dec", D2); + List expected = Arrays.asList(r1, r2); + + Conversion conversion = new Conversions.DecimalConversion(); + + // use the conversion directly instead of relying on the write side + GenericRecord r1fixed = instance(fixedRecord, "dec", + conversion.toFixed(D1, fixedSchema, DECIMAL_9_2)); + GenericRecord r2fixed = instance(fixedRecord, "dec", + conversion.toFixed(D2, fixedSchema, DECIMAL_9_2)); + + File test = write(fixedRecord, r1fixed, r2fixed); + Assert.assertEquals("Should convert fixed to BigDecimals", + expected, read(GENERIC, decimalRecord, test)); + } + + @Test + public void testWriteDecimalFixed() throws IOException { + Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4); + Schema fixedRecord = record("R", field("dec", fixedSchema)); + Schema decimalSchema = DECIMAL_9_2.addToSchema( + Schema.createFixed("aFixed", null, null, 4)); + Schema decimalRecord = record("R", field("dec", decimalSchema)); + + GenericRecord r1 = instance(decimalRecord, "dec", D1); + GenericRecord r2 = instance(decimalRecord, "dec", D2); + + Conversion conversion = new Conversions.DecimalConversion(); + + // use the conversion directly instead of relying on the write side + GenericRecord r1fixed = instance(fixedRecord, "dec", + conversion.toFixed(D1, fixedSchema, DECIMAL_9_2)); + GenericRecord r2fixed = instance(fixedRecord, "dec", + conversion.toFixed(D2, fixedSchema, DECIMAL_9_2)); + List expected = Arrays.asList(r1fixed, r2fixed); + + File test = write(GENERIC, decimalRecord, r1, r2); + Assert.assertEquals("Should read BigDecimals as fixed", + expected, read(GENERIC, fixedRecord, test)); + } + + @Test + public void testReadDecimalBytes() throws IOException { + Schema bytesSchema = Schema.create(Schema.Type.BYTES); + Schema bytesRecord = record("R", field("dec", bytesSchema)); + Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES)); + Schema decimalRecord = record("R", field("dec", decimalSchema)); + + GenericRecord r1 = instance(decimalRecord, "dec", D1); + GenericRecord r2 = instance(decimalRecord, "dec", D2); + List expected = Arrays.asList(r1, r2); + + Conversion conversion = new Conversions.DecimalConversion(); + + // use the conversion directly instead of relying on the write side + GenericRecord r1bytes = instance(bytesRecord, "dec", + conversion.toBytes(D1, bytesSchema, DECIMAL_9_2)); + GenericRecord r2bytes = instance(bytesRecord, "dec", + conversion.toBytes(D2, bytesSchema, DECIMAL_9_2)); + + File test = write(bytesRecord, r1bytes, r2bytes); + Assert.assertEquals("Should convert bytes to BigDecimals", + expected, read(GENERIC, decimalRecord, test)); + } + + @Test + public void testWriteDecimalBytes() throws IOException { + Schema bytesSchema = Schema.create(Schema.Type.BYTES); + Schema bytesRecord = record("R", field("dec", bytesSchema)); + Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES)); + Schema decimalRecord = record("R", field("dec", decimalSchema)); + + GenericRecord r1 = instance(decimalRecord, "dec", D1); + GenericRecord r2 = instance(decimalRecord, "dec", D2); + + Conversion conversion = new Conversions.DecimalConversion(); + + // use the conversion directly instead of relying on the write side + GenericRecord r1bytes = instance(bytesRecord, "dec", + conversion.toBytes(D1, bytesSchema, DECIMAL_9_2)); + GenericRecord r2bytes = instance(bytesRecord, "dec", + conversion.toBytes(D2, bytesSchema, DECIMAL_9_2)); + + List expected = Arrays.asList(r1bytes, r2bytes); + + File test = write(GENERIC, decimalRecord, r1, r2); + Assert.assertEquals("Should read BigDecimals as bytes", + expected, read(GENERIC, bytesRecord, test)); + } + + private File write(Schema schema, D... data) throws IOException { + return write(GenericData.get(), schema, data); + } + + private File write(GenericData model, Schema schema, D... data) throws IOException { + return AvroTestUtil.write(temp, model, schema, data); + } + +} diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java index 36c090fb97..7ba6c9b8ec 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java @@ -36,14 +36,15 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.junit.Test; -import org.apache.parquet.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.lang.Thread.sleep; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class TestInputOutputFormat { - private static final Log LOG = Log.getLog(TestInputOutputFormat.class); + private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class); private static Schema avroSchema; static { @@ -132,10 +133,10 @@ public void testReadWrite() throws Exception { private void waitForJob(Job job) throws Exception { job.submit(); while (!job.isComplete()) { - LOG.debug("waiting for job " + job.getJobName()); + LOG.debug("waiting for job {}", job.getJobName()); sleep(100); } - LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE")); + LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE")); if (!job.isSuccessful()) { throw new RuntimeException("job failed " + job.getJobName()); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 855a5b14fa..4fa71ea986 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -19,18 +19,23 @@ package org.apache.parquet.avro; import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; import java.io.File; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; @@ -39,12 +44,16 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -225,6 +234,113 @@ public void testMapWithUtf8Key() throws Exception { assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); } + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testDecimalValues() throws Exception { + Schema decimalSchema = Schema.createRecord("myrecord", null, null, false); + Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( + Schema.create(Schema.Type.BYTES)); + decimalSchema.setFields(Collections.singletonList( + new Schema.Field("dec", decimal, null, null))); + + // add the decimal conversion to a generic data model + GenericData decimalSupport = new GenericData(); + decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); + + File file = temp.newFile("decimal.parquet"); + file.delete(); + Path path = new Path(file.toString()); + + ParquetWriter writer = AvroParquetWriter + .builder(path) + .withDataModel(decimalSupport) + .withSchema(decimalSchema) + .build(); + + Random random = new Random(34L); + GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema); + List expected = Lists.newArrayList(); + for (int i = 0; i < 1000; i += 1) { + BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2); + builder.set("dec", dec); + + GenericRecord rec = builder.build(); + expected.add(rec); + writer.write(builder.build()); + } + writer.close(); + + ParquetReader reader = AvroParquetReader + .builder(path) + .withDataModel(decimalSupport) + .disableCompatibility() + .build(); + List records = Lists.newArrayList(); + GenericRecord rec; + while ((rec = reader.read()) != null) { + records.add(rec); + } + reader.close(); + + Assert.assertTrue("dec field should be a BigDecimal instance", + records.get(0).get("dec") instanceof BigDecimal); + Assert.assertEquals("Content should match", expected, records); + } + + @Test + public void testFixedDecimalValues() throws Exception { + Schema decimalSchema = Schema.createRecord("myrecord", null, null, false); + Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( + Schema.createFixed("dec", null, null, 4)); + decimalSchema.setFields(Collections.singletonList( + new Schema.Field("dec", decimal, null, null))); + + // add the decimal conversion to a generic data model + GenericData decimalSupport = new GenericData(); + decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); + + File file = temp.newFile("decimal.parquet"); + file.delete(); + Path path = new Path(file.toString()); + + ParquetWriter writer = AvroParquetWriter + .builder(path) + .withDataModel(decimalSupport) + .withSchema(decimalSchema) + .build(); + + Random random = new Random(34L); + GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema); + List expected = Lists.newArrayList(); + for (int i = 0; i < 1000; i += 1) { + BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2); + builder.set("dec", dec); + + GenericRecord rec = builder.build(); + expected.add(rec); + writer.write(builder.build()); + } + writer.close(); + + ParquetReader reader = AvroParquetReader + .builder(path) + .withDataModel(decimalSupport) + .disableCompatibility() + .build(); + List records = Lists.newArrayList(); + GenericRecord rec; + while ((rec = reader.read()) != null) { + records.add(rec); + } + reader.close(); + + Assert.assertTrue("dec field should be a BigDecimal instance", + records.get(0).get("dec") instanceof BigDecimal); + Assert.assertEquals("Content should match", expected, records); + } + @Test public void testAll() throws Exception { Schema schema = new Schema.Parser().parse( diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java index 64caacc8b0..af6f938115 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java @@ -47,7 +47,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import sun.net.www.content.text.Generic; import static org.apache.parquet.avro.AvroTestUtil.array; import static org.apache.parquet.avro.AvroTestUtil.optional; diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java index 3e1d32eeab..729f24ac40 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java @@ -37,7 +37,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.parquet.Log; import org.apache.parquet.column.ColumnReader; import org.apache.parquet.filter.ColumnPredicates; import org.apache.parquet.filter.ColumnRecordFilter; @@ -46,6 +45,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.lang.Thread.sleep; import static org.junit.Assert.assertArrayEquals; @@ -55,7 +56,7 @@ import static org.junit.Assert.fail; public class TestReflectInputOutputFormat { - private static final Log LOG = Log.getLog(TestReflectInputOutputFormat.class); + private static final Logger LOG = LoggerFactory.getLogger(TestReflectInputOutputFormat.class); public static class Service { @@ -477,10 +478,10 @@ public void testReadWriteChangedCar() throws Exception { private void waitForJob(Job job) throws Exception { job.submit(); while (!job.isComplete()) { - LOG.debug("waiting for job " + job.getJobName()); + LOG.debug("waiting for job {}", job.getJobName()); sleep(100); } - LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE")); + LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE")); if (!job.isSuccessful()) { throw new RuntimeException("job failed " + job.getJobName()); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java new file mode 100644 index 0000000000..401e6987d4 --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java @@ -0,0 +1,705 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import org.apache.avro.Conversion; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.reflect.AvroSchema; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificData; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import static org.apache.parquet.avro.AvroTestUtil.read; + +/** + * This class is based on org.apache.avro.reflect.TestReflectLogicalTypes + * + * Tests various logical types + * * string => UUID + * * fixed and bytes => Decimal + * * record => Pair + */ +public class TestReflectLogicalTypes { + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + public static final ReflectData REFLECT = new ReflectData(); + + @BeforeClass + public static void addUUID() { + REFLECT.addLogicalTypeConversion(new Conversions.UUIDConversion()); + REFLECT.addLogicalTypeConversion(new Conversions.DecimalConversion()); + } + + @Test + public void testReflectedSchema() { + Schema expected = SchemaBuilder.record(RecordWithUUIDList.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + expected.getField("uuids").schema().addProp( + SpecificData.CLASS_PROP, List.class.getName()); + LogicalTypes.uuid().addToSchema( + expected.getField("uuids").schema().getElementType()); + + Schema actual = REFLECT.getSchema(RecordWithUUIDList.class); + + Assert.assertEquals("Should use the UUID logical type", expected, actual); + } + + // this can be static because the schema only comes from reflection + public static class DecimalRecordBytes { + // scale is required and will not be set by the conversion + @AvroSchema("{" + + "\"type\": \"bytes\"," + + "\"logicalType\": \"decimal\"," + + "\"precision\": 9," + + "\"scale\": 2" + + "}") + private BigDecimal decimal; + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DecimalRecordBytes that = (DecimalRecordBytes) other; + if (decimal == null) { + return (that.decimal == null); + } + + return decimal.equals(that.decimal); + } + + @Override + public int hashCode() { + return decimal != null ? decimal.hashCode() : 0; + } + } + + @Test + public void testDecimalBytes() throws IOException { + Schema schema = REFLECT.getSchema(DecimalRecordBytes.class); + Assert.assertEquals("Should have the correct record name", + "org.apache.parquet.avro.TestReflectLogicalTypes$", + schema.getNamespace()); + Assert.assertEquals("Should have the correct record name", + "DecimalRecordBytes", + schema.getName()); + Assert.assertEquals("Should have the correct logical type", + LogicalTypes.decimal(9, 2), + LogicalTypes.fromSchema(schema.getField("decimal").schema())); + + DecimalRecordBytes record = new DecimalRecordBytes(); + record.decimal = new BigDecimal("3.14"); + + File test = write(REFLECT, schema, record); + Assert.assertEquals("Should match the decimal after round trip", + Arrays.asList(record), + read(REFLECT, schema, test)); + } + + // this can be static because the schema only comes from reflection + public static class DecimalRecordFixed { + // scale is required and will not be set by the conversion + @AvroSchema("{" + + "\"name\": \"decimal_9\"," + + "\"type\": \"fixed\"," + + "\"size\": 4," + + "\"logicalType\": \"decimal\"," + + "\"precision\": 9," + + "\"scale\": 2" + + "}") + private BigDecimal decimal; + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DecimalRecordFixed that = (DecimalRecordFixed) other; + if (decimal == null) { + return (that.decimal == null); + } + + return decimal.equals(that.decimal); + } + + @Override + public int hashCode() { + return decimal != null ? decimal.hashCode() : 0; + } + } + + @Test + public void testDecimalFixed() throws IOException { + Schema schema = REFLECT.getSchema(DecimalRecordFixed.class); + Assert.assertEquals("Should have the correct record name", + "org.apache.parquet.avro.TestReflectLogicalTypes$", + schema.getNamespace()); + Assert.assertEquals("Should have the correct record name", + "DecimalRecordFixed", + schema.getName()); + Assert.assertEquals("Should have the correct logical type", + LogicalTypes.decimal(9, 2), + LogicalTypes.fromSchema(schema.getField("decimal").schema())); + + DecimalRecordFixed record = new DecimalRecordFixed(); + record.decimal = new BigDecimal("3.14"); + + File test = write(REFLECT, schema, record); + Assert.assertEquals("Should match the decimal after round trip", + Arrays.asList(record), + read(REFLECT, schema, test)); + } + + public static class Pair { + private final X first; + private final Y second; + + private Pair(X first, Y second) { + this.first = first; + this.second = second; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + Pair that = (Pair) other; + if (first == null) { + if (that.first != null) { + return false; + } + } else if (first.equals(that.first)) { + return false; + } + + if (second == null) { + if (that.second != null) { + return false; + } + } else if (second.equals(that.second)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return Arrays.hashCode(new Object[] {first, second}); + } + + public static Pair of(X first, Y second) { + return new Pair(first, second); + } + } + + public static class PairRecord { + @AvroSchema("{" + + "\"name\": \"Pair\"," + + "\"type\": \"record\"," + + "\"fields\": [" + + " {\"name\": \"x\", \"type\": \"long\"}," + + " {\"name\": \"y\", \"type\": \"long\"}" + + " ]," + + "\"logicalType\": \"pair\"" + + "}") + Pair pair; + } + + @Test + @SuppressWarnings("unchecked") + public void testPairRecord() throws IOException { + ReflectData model = new ReflectData(); + model.addLogicalTypeConversion(new Conversion() { + @Override + public Class getConvertedType() { + return Pair.class; + } + + @Override + public String getLogicalTypeName() { + return "pair"; + } + + @Override + public Pair fromRecord(IndexedRecord value, Schema schema, LogicalType type) { + return Pair.of(value.get(0), value.get(1)); + } + + @Override + public IndexedRecord toRecord(Pair value, Schema schema, LogicalType type) { + GenericData.Record record = new GenericData.Record(schema); + record.put(0, value.first); + record.put(1, value.second); + return record; + } + }); + + LogicalTypes.register("pair", new LogicalTypes.LogicalTypeFactory() { + private final LogicalType PAIR = new LogicalType("pair"); + @Override + public LogicalType fromSchema(Schema schema) { + return PAIR; + } + }); + + Schema schema = model.getSchema(PairRecord.class); + Assert.assertEquals("Should have the correct record name", + "org.apache.parquet.avro.TestReflectLogicalTypes$", + schema.getNamespace()); + Assert.assertEquals("Should have the correct record name", + "PairRecord", + schema.getName()); + Assert.assertEquals("Should have the correct logical type", + "pair", + LogicalTypes.fromSchema(schema.getField("pair").schema()).getName()); + + PairRecord record = new PairRecord(); + record.pair = Pair.of(34L, 35L); + List expected = new ArrayList(); + expected.add(record); + + File test = write(model, schema, record); + Pair actual = AvroTestUtil + .read(model, schema, test) + .get(0).pair; + Assert.assertEquals("Data should match after serialization round-trip", + 34L, (long) actual.first); + Assert.assertEquals("Data should match after serialization round-trip", + 35L, (long) actual.second); + } + + @Test + public void testReadUUID() throws IOException { + Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithStringUUID r1 = new RecordWithStringUUID(); + r1.uuid = u1.toString(); + RecordWithStringUUID r2 = new RecordWithStringUUID(); + r2.uuid = u2.toString(); + + List expected = Arrays.asList( + new RecordWithUUID(), new RecordWithUUID()); + expected.get(0).uuid = u1; + expected.get(1).uuid = u2; + + File test = write( + ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, read(REFLECT, uuidSchema, test)); + + // verify that the field's type overrides the logical type + Schema uuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema()); + + Assert.assertEquals("Should not convert to UUID if accessor is String", + Arrays.asList(r1, r2), + read(REFLECT, uuidStringSchema, test)); + } + + @Test + public void testWriteUUID() throws IOException { + Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithUUID r1 = new RecordWithUUID(); + r1.uuid = u1; + RecordWithUUID r2 = new RecordWithUUID(); + r2.uuid = u2; + + List expected = Arrays.asList( + new RecordWithStringUUID(), new RecordWithStringUUID()); + expected.get(0).uuid = u1.toString(); + expected.get(1).uuid = u2.toString(); + + File test = write(REFLECT, uuidSchema, r1, r2); + + // verify that the field's type overrides the logical type + Schema uuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + + Assert.assertEquals("Should read uuid as String without UUID conversion", + expected, + read(REFLECT, uuidStringSchema, test)); + + LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema()); + Assert.assertEquals("Should read uuid as String without UUID logical type", + expected, + read(ReflectData.get(), uuidStringSchema, test)); + } + + @Test + public void testWriteNullableUUID() throws IOException { + Schema nullableUuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().optionalString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema( + nullableUuidSchema.getField("uuid").schema().getTypes().get(1)); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithUUID r1 = new RecordWithUUID(); + r1.uuid = u1; + RecordWithUUID r2 = new RecordWithUUID(); + r2.uuid = u2; + + List expected = Arrays.asList( + new RecordWithStringUUID(), new RecordWithStringUUID()); + expected.get(0).uuid = u1.toString(); + expected.get(1).uuid = u2.toString(); + + File test = write(REFLECT, nullableUuidSchema, r1, r2); + + // verify that the field's type overrides the logical type + Schema nullableUuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().optionalString("uuid").endRecord(); + + Assert.assertEquals("Should read uuid as String without UUID conversion", + expected, + read(REFLECT, nullableUuidStringSchema, test)); + } + + @Test(expected = ClassCastException.class) + public void testWriteUUIDMissingLogicalType() throws IOException { + Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithUUID r1 = new RecordWithUUID(); + r1.uuid = u1; + RecordWithUUID r2 = new RecordWithUUID(); + r2.uuid = u2; + + // write without using REFLECT, which has the logical type + File test = write(uuidSchema, r1, r2); + + // verify that the field's type overrides the logical type + Schema uuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + + // this fails with an AppendWriteException wrapping ClassCastException + // because the UUID isn't converted to a CharSequence expected internally + read(ReflectData.get(), uuidStringSchema, test); + } + + @Test + public void testReadUUIDGenericRecord() throws IOException { + Schema uuidSchema = SchemaBuilder.record("RecordWithUUID") + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithStringUUID r1 = new RecordWithStringUUID(); + r1.uuid = u1.toString(); + RecordWithStringUUID r2 = new RecordWithStringUUID(); + r2.uuid = u2.toString(); + + List expected = Arrays.asList( + new GenericData.Record(uuidSchema), new GenericData.Record(uuidSchema)); + expected.get(0).put("uuid", u1); + expected.get(1).put("uuid", u2); + + File test = write( + ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, read(REFLECT, uuidSchema, test)); + + // verify that the field's type overrides the logical type + Schema uuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + Assert.assertEquals("Should not convert to UUID if accessor is String", + Arrays.asList(r1, r2), + read(REFLECT, uuidStringSchema, test)); + } + + @Test + public void testReadUUIDArray() throws IOException { + Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + LogicalTypes.uuid().addToSchema( + uuidArraySchema.getField("uuids").schema().getElementType()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord r = new GenericData.Record(uuidArraySchema); + r.put("uuids", Arrays.asList(u1.toString(), u2.toString())); + + RecordWithUUIDArray expected = new RecordWithUUIDArray(); + expected.uuids = new UUID[] {u1, u2}; + + File test = write(uuidArraySchema, r); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, + read(REFLECT, uuidArraySchema, test).get(0)); + } + + @Test + public void testWriteUUIDArray() throws IOException { + Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + LogicalTypes.uuid().addToSchema( + uuidArraySchema.getField("uuids").schema().getElementType()); + + Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray") + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + stringArraySchema.getField("uuids").schema() + .addProp(SpecificData.CLASS_PROP, List.class.getName()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord expected = new GenericData.Record(stringArraySchema); + List uuids = new ArrayList(); + uuids.add(u1.toString()); + uuids.add(u2.toString()); + expected.put("uuids", uuids); + + RecordWithUUIDArray r = new RecordWithUUIDArray(); + r.uuids = new UUID[] {u1, u2}; + + File test = write(REFLECT, uuidArraySchema, r); + + Assert.assertEquals("Should read UUIDs as Strings", + expected, + read(ReflectData.get(), stringArraySchema, test).get(0)); + } + + @Test + public void testReadUUIDList() throws IOException { + Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + uuidListSchema.getField("uuids").schema().addProp( + SpecificData.CLASS_PROP, List.class.getName()); + LogicalTypes.uuid().addToSchema( + uuidListSchema.getField("uuids").schema().getElementType()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord r = new GenericData.Record(uuidListSchema); + r.put("uuids", Arrays.asList(u1.toString(), u2.toString())); + + RecordWithUUIDList expected = new RecordWithUUIDList(); + expected.uuids = Arrays.asList(u1, u2); + + File test = write(uuidListSchema, r); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, read(REFLECT, uuidListSchema, test).get(0)); + } + + @Test + public void testWriteUUIDList() throws IOException { + Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + uuidListSchema.getField("uuids").schema().addProp( + SpecificData.CLASS_PROP, List.class.getName()); + LogicalTypes.uuid().addToSchema( + uuidListSchema.getField("uuids").schema().getElementType()); + + Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray") + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + stringArraySchema.getField("uuids").schema() + .addProp(SpecificData.CLASS_PROP, List.class.getName()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord expected = new GenericData.Record(stringArraySchema); + expected.put("uuids", Arrays.asList(u1.toString(), u2.toString())); + + RecordWithUUIDList r = new RecordWithUUIDList(); + r.uuids = Arrays.asList(u1, u2); + + File test = write(REFLECT, uuidListSchema, r); + + Assert.assertEquals("Should read UUIDs as Strings", + expected, + read(REFLECT, stringArraySchema, test).get(0)); + } + + private File write(Schema schema, D... data) throws IOException { + return write(ReflectData.get(), schema, data); + } + + @SuppressWarnings("unchecked") + private File write(GenericData model, Schema schema, D... data) throws IOException { + return AvroTestUtil.write(temp, model, schema, data); + } +} + +class RecordWithUUID { + UUID uuid; + + @Override + public int hashCode() { + return uuid.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof RecordWithUUID)) { + return false; + } + RecordWithUUID that = (RecordWithUUID) obj; + return this.uuid.equals(that.uuid); + } +} + +class RecordWithStringUUID { + String uuid; + + @Override + public int hashCode() { + return uuid.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof RecordWithStringUUID)) { + return false; + } + RecordWithStringUUID that = (RecordWithStringUUID) obj; + return this.uuid.equals(that.uuid); + } +} + +class RecordWithUUIDArray { + UUID[] uuids; + + @Override + public int hashCode() { + return Arrays.hashCode(uuids); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof RecordWithUUIDArray)) { + return false; + } + RecordWithUUIDArray that = (RecordWithUUIDArray) obj; + return Arrays.equals(this.uuids, that.uuids); + } +} + +class RecordWithUUIDList { + List uuids; + + @Override + public int hashCode() { + return uuids.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof RecordWithUUIDList)) { + return false; + } + RecordWithUUIDList that = (RecordWithUUIDList) obj; + return this.uuids.equals(that.uuids); + } +} + diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java index 17a0af1ef2..a0b58f3cdc 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java @@ -39,15 +39,16 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.apache.parquet.Log; import org.apache.parquet.column.ColumnReader; import org.apache.parquet.filter.ColumnPredicates; import org.apache.parquet.filter.ColumnRecordFilter; import org.apache.parquet.filter.RecordFilter; import org.apache.parquet.filter.UnboundRecordFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestSpecificInputOutputFormat { - private static final Log LOG = Log.getLog(TestSpecificInputOutputFormat.class); + private static final Logger LOG = LoggerFactory.getLogger(TestSpecificInputOutputFormat.class); public static Car nextRecord(int i) { String vin = "1VXBR12EXCP000000"; @@ -268,10 +269,10 @@ public void testReadWriteChangedCar() throws Exception { private void waitForJob(Job job) throws Exception { job.submit(); while (!job.isComplete()) { - LOG.debug("waiting for job " + job.getJobName()); + LOG.debug("waiting for job {}", job.getJobName()); sleep(100); } - LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE")); + LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE")); if (!job.isSuccessful()) { throw new RuntimeException("job failed " + job.getJobName()); } diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java similarity index 100% rename from parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java rename to parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java diff --git a/parquet-cascading/src/test/resources/names.txt b/parquet-cascading-common23/src/test/resources/names.txt similarity index 100% rename from parquet-cascading/src/test/resources/names.txt rename to parquet-cascading-common23/src/test/resources/names.txt diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading-common23/src/test/thrift/test.thrift similarity index 100% rename from parquet-cascading/src/test/thrift/test.thrift rename to parquet-cascading-common23/src/test/thrift/test.thrift diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml index 0cd858886e..0573aba664 100644 --- a/parquet-cascading/pom.xml +++ b/parquet-cascading/pom.xml @@ -77,7 +77,7 @@ org.mockito mockito-all - 1.9.5 + ${mockito.version} test @@ -102,6 +102,51 @@ + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-source + generate-sources + + add-source + + + + ../parquet-cascading-common23/src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ../parquet-cascading-common23/src/test/java + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ../parquet-cascading-common23/src/test/resources + + + + + + maven-enforcer-plugin @@ -112,9 +157,11 @@ org.apache.thrift.tools maven-thrift-plugin - 0.1.10 + ${maven-thrift-plugin.version} ${thrift.executable} + ../parquet-cascading-common23/src/main/thrift + ../parquet-cascading-common23/src/test/thrift diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java index ea70d43f8e..b34ee7d24a 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; import org.apache.parquet.thrift.TBaseRecordConverter; +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class ParquetTBaseScheme> extends ParquetValueScheme { // In the case of reads, we can read the thrift class from the file metadata diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java index 41b56d0fcb..3b7d715273 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -59,6 +59,7 @@ * @author Avi Bryant */ +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class ParquetTupleScheme extends Scheme{ private static final long serialVersionUID = 0L; diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java index 9549ef43f6..6c34a8494b 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java @@ -47,6 +47,7 @@ * This is an abstract class; implementations are expected to set up their Input/Output Formats * correctly in the respective Init methods. */ +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public abstract class ParquetValueScheme extends Scheme{ public static final class Config implements Serializable { diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java index 841314ca7c..e0f33e1161 100644 --- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java +++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java @@ -58,8 +58,9 @@ import java.util.HashMap; import java.util.Map; +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class TestParquetTBaseScheme { - final String txtInputPath = "src/test/resources/names.txt"; + final String txtInputPath = "target/test-classes/names.txt"; final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in"; final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out"; final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out"; diff --git a/parquet-cascading3/REVIEWERS.md b/parquet-cascading3/REVIEWERS.md new file mode 100644 index 0000000000..f7972357e9 --- /dev/null +++ b/parquet-cascading3/REVIEWERS.md @@ -0,0 +1,27 @@ + + +The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project: + +| Name | Apache Id | github id | +|--------------------|------------|-------------| +| Dmitriy Ryaboy | dvryaboy | dvryaboy | +| Tianshuo Deng | tianshuo | tsdeng | + + diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml new file mode 100644 index 0000000000..9aa8991e9b --- /dev/null +++ b/parquet-cascading3/pom.xml @@ -0,0 +1,195 @@ + + + + org.apache.parquet + parquet + ../pom.xml + 1.8.2-SNAPSHOT + + + 4.0.0 + + parquet-cascading3 + jar + + Apache Parquet Cascading (for Cascading 3.0 onwards) + https://parquet.apache.org + + + + conjars.org + http://conjars.org/repo + + + + + + org.apache.parquet + parquet-column + ${project.version} + + + org.apache.parquet + parquet-hadoop + ${project.version} + + + org.apache.parquet + parquet-thrift + ${project.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + + + org.apache.thrift + libthrift + ${thrift.version} + provided + + + cascading + cascading-hadoop + ${cascading3.version} + provided + + + org.apache.parquet + parquet-column + ${project.version} + test-jar + test + + + org.mockito + mockito-all + ${mockito.version} + test + + + org.slf4j + slf4j-simple + ${slf4j.version} + test + + + + + + + + maven-enforcer-plugin + + + none + + + + true + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-source + generate-sources + + add-source + + + + ../parquet-cascading-common23/src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ../parquet-cascading-common23/src/test/java + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ../parquet-cascading-common23/src/test/resources + + + + + + + + maven-enforcer-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.thrift.tools + maven-thrift-plugin + ${maven-thrift-plugin.version} + + ${thrift.executable} + ../parquet-cascading-common23/src/main/thrift + ../parquet-cascading-common23/src/test/thrift + + + + thrift-sources + generate-test-sources + + testCompile + + + + + + + diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java new file mode 100644 index 0000000000..af04b47c8e --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.thrift.TBase; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; +import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; +import org.apache.parquet.thrift.TBaseRecordConverter; + +public class ParquetTBaseScheme> extends ParquetValueScheme { + + // In the case of reads, we can read the thrift class from the file metadata + public ParquetTBaseScheme() { + this(new Config()); + } + + public ParquetTBaseScheme(Class thriftClass) { + this(new Config().withRecordClass(thriftClass)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate) { + this(new Config().withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate, Class thriftClass) { + this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(Config config) { + super(config); + } + + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + super.sourceConfInit(fp, tap, jobConf); + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); + ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); + } + + @Override + public void sinkConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (this.config.getKlass() == null) { + throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); + } + + DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); + DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); + TBaseWriteSupport.setThriftClass(jobConf, this.config.getKlass()); + } +} diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java new file mode 100644 index 0000000000..4532d3b3f8 --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.parquet.cascading; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.CompositeTap; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.schema.MessageType; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that converts Parquet groups into Cascading tuples. + * If you provide it with sourceFields, it will selectively materialize only the columns for those fields. + * The names must match the names in the Parquet schema. + * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the + * Parquet schema. + * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be + * flattened to a top-level field in the Cascading tuple. + * + * @author Avi Bryant + */ + +public class ParquetTupleScheme extends Scheme{ + + private static final long serialVersionUID = 0L; + private String parquetSchema; + private final FilterPredicate filterPredicate; + + public ParquetTupleScheme() { + super(); + this.filterPredicate = null; + } + + public ParquetTupleScheme(Fields sourceFields) { + super(sourceFields); + this.filterPredicate = null; + } + + public ParquetTupleScheme(FilterPredicate filterPredicate) { + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) { + super(sourceFields); + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + /** + * ParquetTupleScheme constructor used a sink need to be implemented + * + * @param sourceFields used for the reading step + * @param sinkFields used for the writing step + * @param schema is mandatory if you add sinkFields and needs to be the + * toString() from a MessageType. This value is going to be parsed when the + * parquet file will be created. + */ + public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { + super(sourceFields, sinkFields); + parquetSchema = schema; + this.filterPredicate = null; + } + + @SuppressWarnings("rawtypes") + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); + } + + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); + TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); + } + + @Override + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { + MessageType schema = readSchema(flowProcess, tap); + SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); + + setSourceFields(intersection.getSourceFields()); + + return getSourceFields(); + } + + private MessageType readSchema(FlowProcess flowProcess, Tap tap) { + try { + Hfs hfs; + + if( tap instanceof CompositeTap ) + hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next(); + else + hfs = (Hfs) tap; + + List