diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 94343438df..aad197d47b 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -87,6 +87,13 @@ ${slf4j.version} test + + org.apache.parquet + parquet-hadoop + ${project.version} + test-jar + test + diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 61d7d8ef55..38a761c40a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -32,8 +32,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectData; @@ -753,13 +755,14 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) { // synthetic wrapper. Must be a group with one optional or required field return true; } else if (elementSchema != null && - elementSchema.getType() == Schema.Type.RECORD && - elementSchema.getFields().size() == 1 && - elementSchema.getFields().get(0).name().equals( - repeatedType.asGroupType().getFieldName(0))) { + elementSchema.getType() == Schema.Type.RECORD) { + Set fieldNames = new HashSet(); + for (Schema.Field field : elementSchema.getFields()) { + fieldNames.add(field.name()); + } // The repeated type must be the element type because it matches the // structure of the Avro element's schema. - return true; + return fieldNames.contains(repeatedType.asGroupType().getFieldName(0)); } return false; } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index 9c29e5030f..29264f06a5 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -18,12 +18,10 @@ */ package org.apache.parquet.avro; -import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -32,14 +30,9 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.DirectWriterTest; import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import static org.apache.parquet.avro.AvroTestUtil.array; import static org.apache.parquet.avro.AvroTestUtil.field; @@ -49,10 +42,7 @@ import static org.apache.parquet.avro.AvroTestUtil.primitive; import static org.apache.parquet.avro.AvroTestUtil.record; -public class TestArrayCompatibility { - - @Rule - public final TemporaryFolder tempDir = new TemporaryFolder(); +public class TestArrayCompatibility extends DirectWriterTest { public static final Configuration NEW_BEHAVIOR_CONF = new Configuration(); @@ -1045,68 +1035,6 @@ public void write(RecordConsumer rc) { assertReaderContains(newBehaviorReader(test), newSchema, newRecord); } - private interface DirectWriter { - public void write(RecordConsumer consumer); - } - - private static class DirectWriteSupport extends WriteSupport { - private RecordConsumer recordConsumer; - private final MessageType type; - private final DirectWriter writer; - private final Map metadata; - - private DirectWriteSupport(MessageType type, DirectWriter writer, - Map metadata) { - this.type = type; - this.writer = writer; - this.metadata = metadata; - } - - @Override - public WriteContext init(Configuration configuration) { - return new WriteContext(type, metadata); - } - - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } - - @Override - public void write(Void record) { - writer.write(recordConsumer); - } - } - - private Path writeDirect(String type, DirectWriter writer) throws IOException { - return writeDirect(MessageTypeParser.parseMessageType(type), writer); - } - - private Path writeDirect(String type, DirectWriter writer, - Map metadata) throws IOException { - return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata); - } - - private Path writeDirect(MessageType type, DirectWriter writer) throws IOException { - return writeDirect(type, writer, new HashMap()); - } - - private Path writeDirect(MessageType type, DirectWriter writer, - Map metadata) throws IOException { - File temp = tempDir.newFile(UUID.randomUUID().toString()); - temp.deleteOnExit(); - temp.delete(); - - Path path = new Path(temp.getPath()); - - ParquetWriter parquetWriter = new ParquetWriter( - path, new DirectWriteSupport(type, writer, metadata)); - parquetWriter.write(null); - parquetWriter.close(); - - return path; - } - public AvroParquetReader oldBehaviorReader( Path path) throws IOException { return new AvroParquetReader(path); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java new file mode 100644 index 0000000000..074d2e8b66 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; + +public class DirectWriterTest { + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + protected interface DirectWriter { + public void write(RecordConsumer consumer); + } + + protected Path writeDirect(String type, DirectWriter writer) throws IOException { + return writeDirect(MessageTypeParser.parseMessageType(type), writer); + } + + protected Path writeDirect(String type, DirectWriter writer, + Map metadata) throws IOException { + return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata); + } + + protected Path writeDirect(MessageType type, DirectWriter writer) throws IOException { + return writeDirect(type, writer, new HashMap()); + } + + protected Path writeDirect(MessageType type, DirectWriter writer, + Map metadata) throws IOException { + File temp = tempDir.newFile(UUID.randomUUID().toString()); + temp.deleteOnExit(); + temp.delete(); + + Path path = new Path(temp.getPath()); + + ParquetWriter parquetWriter = new ParquetWriter( + path, new DirectWriteSupport(type, writer, metadata)); + parquetWriter.write(null); + parquetWriter.close(); + + return path; + } + + protected static class DirectWriteSupport extends WriteSupport { + private RecordConsumer recordConsumer; + private final MessageType type; + private final DirectWriter writer; + private final Map metadata; + + protected DirectWriteSupport(MessageType type, DirectWriter writer, + Map metadata) { + this.type = type; + this.writer = writer; + this.metadata = metadata; + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(type, metadata); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(Void record) { + writer.write(recordConsumer); + } + } +} diff --git a/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeRecordConverter.java b/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeRecordConverter.java index d385999abf..9c4faa0d1f 100644 --- a/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeRecordConverter.java +++ b/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeRecordConverter.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.scrooge; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; @@ -31,8 +32,16 @@ public class ScroogeRecordConverter extends ThriftRecordConverter { - + /** + * This is for compatibility only. + * @deprecated will be removed in 2.x + */ + @Deprecated public ScroogeRecordConverter(final Class thriftClass, MessageType parquetSchema, StructType thriftType) { + this(thriftClass, parquetSchema, thriftType, null); + } + + public ScroogeRecordConverter(final Class thriftClass, MessageType parquetSchema, StructType thriftType, Configuration conf) { super(new ThriftReader() { @SuppressWarnings("unchecked") ThriftStructCodec codec = (ThriftStructCodec) getCodec(thriftClass); @@ -40,7 +49,7 @@ public ScroogeRecordConverter(final Class thriftClass, MessageType parquetSch public T readOneRecord(TProtocol protocol) throws TException { return codec.decode(protocol); } - }, thriftClass.getSimpleName(), parquetSchema, thriftType); + }, thriftClass.getSimpleName(), parquetSchema, thriftType, conf); } private static ThriftStructCodec getCodec(Class klass) { diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index 2583870c9e..66594ca0ff 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -121,6 +121,13 @@ ${slf4j.version} test + + org.apache.parquet + parquet-hadoop + ${project.version} + test-jar + test + diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java index a859128a74..1c020ae4cf 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java @@ -19,6 +19,7 @@ package org.apache.parquet.hadoop.thrift; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Set; @@ -203,17 +204,16 @@ private void initThriftClassFromMultipleFiles(Map> fileMetad } @SuppressWarnings("unchecked") - private void initThriftClass(Map fileMetadata, Configuration conf) throws ClassNotFoundException { + private void initThriftClass(ThriftMetaData metadata, Configuration conf) throws ClassNotFoundException { if (thriftClass != null) { return; } String className = conf.get(THRIFT_READ_CLASS_KEY, null); if (className == null) { - final ThriftMetaData metaData = ThriftMetaData.fromExtraMetaData(fileMetadata); - if (metaData == null) { + if (metadata == null) { throw new ParquetDecodingException("Could not read file as the Thrift class is not provided and could not be resolved from the file"); } - thriftClass = (Class)metaData.getThriftClass(); + thriftClass = (Class)metadata.getThriftClass(); } else { thriftClass = (Class)Class.forName(className); } @@ -225,17 +225,56 @@ public RecordMaterializer prepareForRead(Configuration configuration, org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) { ThriftMetaData thriftMetaData = ThriftMetaData.fromExtraMetaData(keyValueMetaData); try { - initThriftClass(keyValueMetaData, configuration); + initThriftClass(thriftMetaData, configuration); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Thrift object class for metadata: " + thriftMetaData, e); + } + + // if there was not metadata in the file, get it from requested class + if (thriftMetaData == null) { + thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass); + } + + String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT); + return getRecordConverterInstance(converterClassName, thriftClass, + readContext.getRequestedSchema(), thriftMetaData.getDescriptor(), + configuration); + } + + @SuppressWarnings("unchecked") + private static ThriftRecordConverter getRecordConverterInstance( + String converterClassName, Class thriftClass, + MessageType requestedSchema, StructType descriptor, Configuration conf) { + Class> converterClass; + try { + converterClass = (Class>) Class.forName(converterClassName); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Thrift converter class: " + converterClassName, e); + } + + try { + // first try the new version that accepts a Configuration + try { + Constructor> constructor = + converterClass.getConstructor(Class.class, MessageType.class, StructType.class, Configuration.class); + return constructor.newInstance(thriftClass, requestedSchema, descriptor, conf); + } catch (IllegalAccessException e) { + // try the other constructor pattern + } catch (NoSuchMethodException e) { + // try to find the other constructor pattern + } - String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT); - @SuppressWarnings("unchecked") - Class> converterClass = (Class>) Class.forName(converterClassName); Constructor> constructor = converterClass.getConstructor(Class.class, MessageType.class, StructType.class); - ThriftRecordConverter converter = constructor.newInstance(thriftClass, readContext.getRequestedSchema(), thriftMetaData.getDescriptor()); - return converter; - } catch (Exception t) { - throw new RuntimeException("Unable to create Thrift Converter for Thrift metadata " + thriftMetaData, t); + return constructor.newInstance(thriftClass, requestedSchema, descriptor); + } catch (InstantiationException e) { + throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e); + } catch (InvocationTargetException e) { + throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot access constructor for Thrift converter class: " + converterClassName, e); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Cannot find constructor for Thrift converter class: " + converterClassName, e); } } } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java index 17a68d678a..6483e5919a 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.thrift; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; @@ -28,7 +29,16 @@ public class TBaseRecordConverter> extends ThriftRecordConverter { + /** + * This is for compatibility only. + * @deprecated will be removed in 2.x + */ + @Deprecated public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType) { + this(thriftClass, requestedParquetSchema, thriftType, null); + } + + public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType, Configuration conf) { super(new ThriftReader() { @Override public T readOneRecord(TProtocol protocol) throws TException { @@ -42,7 +52,7 @@ public T readOneRecord(TProtocol protocol) throws TException { throw new ParquetDecodingException("Thrift class or constructor not public " + thriftClass, e); } } - }, thriftClass.getSimpleName(), requestedParquetSchema, thriftType); + }, thriftClass.getSimpleName(), requestedParquetSchema, thriftType, conf); } } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftMetaData.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftMetaData.java index a89f8d97c1..a7628cc6e6 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftMetaData.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftMetaData.java @@ -23,6 +23,7 @@ import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.thrift.struct.ThriftType; import org.apache.parquet.thrift.struct.ThriftType.StructType; +import org.apache.thrift.TBase; /** * @@ -86,19 +87,35 @@ public StructType getDescriptor() { * Reads ThriftMetadata from the parquet file footer. * * @param extraMetaData extraMetaData field of the parquet footer - * @return + * @return the ThriftMetaData used to write a data file */ public static ThriftMetaData fromExtraMetaData( Map extraMetaData) { final String thriftClassName = extraMetaData.get(THRIFT_CLASS); final String thriftDescriptorString = extraMetaData.get(THRIFT_DESCRIPTOR); - if (thriftClassName == null && thriftDescriptorString == null) { + if (thriftClassName == null || thriftDescriptorString == null) { return null; } final StructType descriptor = parseDescriptor(thriftDescriptorString); return new ThriftMetaData(thriftClassName, descriptor); } + /** + * Creates ThriftMetaData from a Thrift-generated class. + * + * @param thriftClass a Thrift-generated class + * @return ThriftMetaData for the given class + */ + @SuppressWarnings("unchecked") + public static ThriftMetaData fromThriftClass(Class thriftClass) { + if (thriftClass != null && TBase.class.isAssignableFrom(thriftClass)) { + Class> tClass = (Class>) thriftClass; + StructType descriptor = new ThriftSchemaConverter().toStructType(tClass); + return new ThriftMetaData(thriftClass.getName(), descriptor); + } + return null; + } + private static StructType parseDescriptor(String json) { try { return (StructType)ThriftType.fromJSON(json); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java index e18b0e6d17..3160d5fce2 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; -import org.apache.parquet.io.ParquetDecodingException; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TField; import org.apache.thrift.protocol.TList; @@ -34,6 +34,9 @@ import org.apache.thrift.protocol.TStruct; import org.apache.thrift.protocol.TType; +import org.apache.parquet.Log; +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -63,6 +66,12 @@ */ public class ThriftRecordConverter extends RecordMaterializer { + private static final Log LOG = Log.getLog(ThriftRecordConverter.class); + + public static final String IGNORE_NULL_LIST_ELEMENTS = + "parquet.thrift.ignore-null-elements"; + private static final boolean IGNORE_NULL_LIST_ELEMENTS_DEFAULT = false; + final static ParquetProtocol readFieldEnd = new ParquetProtocol("readFieldEnd()") { @Override public void readFieldEnd() throws TException { @@ -472,7 +481,7 @@ public int readI32() throws TException { * @author Julien Le Dem * */ - static class MapConverter extends GroupConverter { + class MapConverter extends GroupConverter { private final GroupCounter child; private final List mapEvents = new ArrayList(); @@ -534,7 +543,7 @@ public TMap readMapBegin() throws TException { * @author Julien Le Dem * */ - static class MapKeyValueConverter extends GroupConverter { + class MapKeyValueConverter extends GroupConverter { private Converter keyConverter; private Converter valueConverter; @@ -572,7 +581,7 @@ public void end() { * @author Julien Le Dem * */ - static class SetConverter extends CollectionConverter { + class SetConverter extends CollectionConverter { final ParquetProtocol readSetEnd = new ParquetProtocol("readSetEnd()") { @Override @@ -609,7 +618,7 @@ void collectionEnd() { * @author Julien Le Dem * */ - static class ListConverter extends CollectionConverter { + class ListConverter extends CollectionConverter { final ParquetProtocol readListEnd = new ParquetProtocol("readListEnd()") { @Override @@ -646,28 +655,36 @@ void collectionEnd() { * @author Julien Le Dem * */ - static abstract class CollectionConverter extends GroupConverter { + abstract class CollectionConverter extends GroupConverter { + private ElementConverter elementConverter = null; private final Converter child; private final Counter childCounter; private List listEvents = new ArrayList(); private final List parentEvents; private ThriftTypeID valuesType; - private final Type nestedType; CollectionConverter(List parentEvents, GroupType parquetSchema, ThriftField values) { this.parentEvents = parentEvents; if (parquetSchema.getFieldCount() != 1) { throw new IllegalArgumentException("lists have only one field. " + parquetSchema + " size = " + parquetSchema.getFieldCount()); } - nestedType = parquetSchema.getType(0); + Type repeatedType = parquetSchema.getType(0); valuesType = values.getType().getType(); - if (nestedType.isPrimitive()) { - PrimitiveCounter counter = new PrimitiveCounter(newConverter(listEvents, nestedType, values).asPrimitiveConverter()); - child = counter; - childCounter = counter; + if (ThriftSchemaConverter.isListElementType(repeatedType, values)) { + if (repeatedType.isPrimitive()) { + PrimitiveCounter counter = new PrimitiveCounter(newConverter(listEvents, repeatedType, values).asPrimitiveConverter()); + child = counter; + childCounter = counter; + } else { + GroupCounter counter = new GroupCounter(newConverter(listEvents, repeatedType, values).asGroupConverter()); + child = counter; + childCounter = counter; + } } else { - GroupCounter counter = new GroupCounter(newConverter(listEvents, nestedType, values).asGroupConverter()); + this.elementConverter = new ElementConverter(parquetSchema.getName(), + listEvents, repeatedType.asGroupType(), values); + GroupCounter counter = new GroupCounter(elementConverter); child = counter; childCounter = counter; } @@ -689,7 +706,10 @@ public void start() { @Override public void end() { - final int count = childCounter.getCount(); + int count = childCounter.getCount(); + if (elementConverter != null) { + count -= elementConverter.getNullElementCount(); + } collectionStart(count, valuesType.getThriftType()); parentEvents.addAll(listEvents); listEvents.clear(); @@ -702,12 +722,63 @@ public void end() { } + class ElementConverter extends GroupConverter { + + private Converter elementConverter; + private List listEvents; + private List elementEvents; + private int nullElementCount; + + public ElementConverter(String listName, List listEvents, + GroupType repeatedType, ThriftField thriftElement) { + this.listEvents = listEvents; + this.elementEvents = new ArrayList(); + Type elementType = repeatedType.getType(0); + if (elementType.isRepetition(Type.Repetition.OPTIONAL)) { + if (ignoreNullElements) { + LOG.warn("List " + listName + + " has optional elements: null elements are ignored."); + } else { + throw new ParquetDecodingException("Cannot read list " + listName + + " with optional elements: set " + IGNORE_NULL_LIST_ELEMENTS + + " to ignore nulls."); + } + } + elementConverter = newConverter(elementEvents, elementType, thriftElement); + } + + @Override + public Converter getConverter(int fieldIndex) { + Preconditions.checkArgument( + fieldIndex == 0, "Illegal field index: %s", fieldIndex); + return elementConverter; + } + + @Override + public void start() { + elementEvents.clear(); + } + + @Override + public void end() { + if (elementEvents.size() > 0) { + listEvents.addAll(elementEvents); + } else { + nullElementCount += 1; + } + } + + public int getNullElementCount() { + return nullElementCount; + } + } + /** * converts to Struct * @author Julien Le Dem * */ - static class StructConverter extends GroupConverter { + class StructConverter extends GroupConverter { private final int schemaSize; @@ -789,9 +860,19 @@ public void end() { } private final ThriftReader thriftReader; private final ParquetReadProtocol protocol; - private final GroupConverter structConverter; + private GroupConverter structConverter; private List rootEvents = new ArrayList(); private boolean missingRequiredFieldsInProjection = false; + private boolean ignoreNullElements = IGNORE_NULL_LIST_ELEMENTS_DEFAULT; + + /** + * This is for compatibility only. + * @deprecated will be removed in 2.x + */ + @Deprecated + public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType) { + this(thriftReader, name, requestedParquetSchema, thriftType, null); + } /** * @@ -799,12 +880,18 @@ public void end() { * @param name the name of that type ( the thrift class simple name) * @param requestedParquetSchema the schema for the incoming columnar events * @param thriftType the thrift type descriptor + * @param conf a Configuration */ - public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType) { + public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType, Configuration conf) { super(); this.thriftReader = thriftReader; this.protocol = new ParquetReadProtocol(); this.thriftType = thriftType; + if (conf != null) { + this.ignoreNullElements = conf.getBoolean( + IGNORE_NULL_LIST_ELEMENTS, + IGNORE_NULL_LIST_ELEMENTS_DEFAULT); + } MessageType fullSchema = ThriftSchemaConverter.convertWithoutProjection(thriftType); missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema); this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType)); @@ -874,7 +961,7 @@ public GroupConverter getRootConverter() { return structConverter; } - private static Converter newConverter(List events, Type type, ThriftField field) { + private Converter newConverter(List events, Type type, ThriftField field) { switch (field.getType().getType()) { case LIST: return new ListConverter(events, type.asGroupType(), field); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java index 98820c37ee..b72f605ac3 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java @@ -20,10 +20,13 @@ import com.twitter.elephantbird.thrift.TStructDescriptor; import com.twitter.elephantbird.thrift.TStructDescriptor.Field; +import java.util.HashSet; +import java.util.Set; import org.apache.thrift.TBase; import org.apache.thrift.TEnum; import org.apache.thrift.TUnion; +import org.apache.parquet.schema.Type; import org.apache.parquet.schema.MessageType; import org.apache.parquet.thrift.projection.FieldProjectionFilter; import org.apache.parquet.thrift.struct.ThriftField; @@ -37,6 +40,8 @@ import java.util.Collection; import java.util.List; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; + /** * Given a thrift class, this class converts it to parquet schema, * a {@link FieldProjectionFilter} can be specified for projection pushdown. @@ -98,6 +103,39 @@ private static StructType toStructType(TStructDescriptor struct) { return new StructType(children, structOrUnionType(struct.getThriftClass())); } + /** + * Returns whether the given type is the element type of a list or is a + * synthetic group with one field that is the element type. This is + * determined by checking whether the type can be a synthetic group and by + * checking whether a potential synthetic group matches the expected + * ThriftField. + *

+ * This method never guesses because the expected ThriftField is known. + * + * @param repeatedType a type that may be the element type + * @param thriftElement the expected Schema for list elements + * @return {@code true} if the repeatedType is the element schema + */ + static boolean isListElementType(Type repeatedType, + ThriftField thriftElement) { + if (repeatedType.isPrimitive() || + (repeatedType.asGroupType().getFieldCount() != 1) || + (repeatedType.asGroupType().getType(0).isRepetition(REPEATED))) { + // The repeated type must be the element type because it is an invalid + // synthetic wrapper. Must be a group with one optional or required field + return true; + } else if (thriftElement != null && thriftElement.getType() instanceof StructType) { + Set fieldNames = new HashSet(); + for (ThriftField field : ((StructType) thriftElement.getType()).getChildren()) { + fieldNames.add(field.getName()); + } + // If the repeated type is a subset of the structure of the ThriftField, + // then it must be the element type. + return fieldNames.contains(repeatedType.asGroupType().getFieldName(0)); + } + return false; + } + private static ThriftField toThriftField(String name, Field field, ThriftField.Requirement requirement) { ThriftType type; switch (ThriftTypeID.fromByte(field.getType())) { diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestArrayCompatibility.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestArrayCompatibility.java new file mode 100644 index 0000000000..df6154a464 --- /dev/null +++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestArrayCompatibility.java @@ -0,0 +1,779 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.thrift; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.thrift.test.compat.ListOfLists; +import org.apache.thrift.TBase; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.apache.parquet.DirectWriterTest; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.thrift.ThriftParquetReader; +import org.apache.parquet.thrift.ThriftRecordConverter; +import org.apache.parquet.thrift.test.compat.ListOfCounts; +import org.apache.parquet.thrift.test.compat.ListOfInts; +import org.apache.parquet.thrift.test.compat.ListOfLocations; +import org.apache.parquet.thrift.test.compat.ListOfSingleElementGroups; +import org.apache.parquet.thrift.test.compat.Location; +import org.apache.parquet.thrift.test.compat.SingleElementGroup; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestArrayCompatibility extends DirectWriterTest { + + @Test + @Ignore("Not yet supported") + public void testUnannotatedListOfPrimitives() throws Exception { + Path test = writeDirect( + "message UnannotatedListOfPrimitives {" + + " repeated int32 list_of_ints;" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_ints", 0); + + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("list_of_ints", 0); + rc.endMessage(); + } + }); + } + + @Test + @Ignore("Not yet supported") + public void testUnannotatedListOfGroups() throws Exception { + Path test = writeDirect( + "message UnannotatedListOfGroups {" + + " repeated group list_of_points {" + + " required float x;" + + " required float y;" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_points", 0); + + rc.startGroup(); + rc.startField("x", 0); + rc.addFloat(1.0f); + rc.endField("x", 0); + rc.startField("y", 1); + rc.addFloat(1.0f); + rc.endField("y", 1); + rc.endGroup(); + + rc.startGroup(); + rc.startField("x", 0); + rc.addFloat(2.0f); + rc.endField("x", 0); + rc.startField("y", 1); + rc.addFloat(2.0f); + rc.endField("y", 1); + rc.endGroup(); + + rc.endField("list_of_points", 0); + rc.endMessage(); + } + }); + } + + @Test + public void testRepeatedPrimitiveInList() throws Exception { + Path test = writeDirect( + "message RepeatedPrimitiveInList {" + + " required group list_of_ints (LIST) {" + + " repeated int32 array;" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("list_of_ints", 0); + + rc.startGroup(); + rc.startField("array", 0); + + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("array", 0); + rc.endGroup(); + + rc.endField("list_of_ints", 0); + rc.endMessage(); + } + }); + + ListOfInts expected = new ListOfInts(Lists.newArrayList(34, 35,36)); + ListOfInts actual = reader(test, ListOfInts.class).read(); + Assert.assertEquals("Should read record correctly", expected, actual); + } + + @Test + public void testMultiFieldGroupInList() throws Exception { + // tests the missing element layer, detected by a multi-field group + Path test = writeDirect( + "message MultiFieldGroupInList {" + + " optional group locations (LIST) {" + + " repeated group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLocations expected = new ListOfLocations(); + expected.addToLocations(new Location(0.0, 0.0)); + expected.addToLocations(new Location(0.0, 180.0)); + + assertReaderContains(reader(test, ListOfLocations.class), expected); + } + + @Test + public void testSingleFieldGroupInList() throws Exception { + // this tests the case where older data has an ambiguous structure, but the + // correct interpretation can be determined from the thrift class + + Path test = writeDirect( + "message SingleFieldGroupInList {" + + " optional group single_element_groups (LIST) {" + + " repeated group single_element_group {" + + " required int64 count;" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("single_element_groups", 0); + + rc.startGroup(); + rc.startField("single_element_group", 0); // start writing array contents + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(1234L); + rc.endField("count", 0); + rc.endGroup(); + + rc.startGroup(); + rc.startField("count", 0); + rc.addLong(2345L); + rc.endField("count", 0); + rc.endGroup(); + + rc.endField("single_element_group", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("single_element_groups", 0); + rc.endMessage(); + } + }); + + // the behavior in this case depends on the thrift class used to read + + // test a class with the extra single_element_group level + ListOfSingleElementGroups expectedOldBehavior = new ListOfSingleElementGroups(); + expectedOldBehavior.addToSingle_element_groups(new SingleElementGroup(1234L)); + expectedOldBehavior.addToSingle_element_groups(new SingleElementGroup(2345L)); + + assertReaderContains(reader(test, ListOfSingleElementGroups.class), expectedOldBehavior); + + // test a class without the extra level + ListOfCounts expectedNewBehavior = new ListOfCounts(); + expectedNewBehavior.addToSingle_element_groups(1234L); + expectedNewBehavior.addToSingle_element_groups(2345L); + + assertReaderContains(reader(test, ListOfCounts.class), expectedNewBehavior); + } + + @Test + public void testNewOptionalGroupInList() throws Exception { + Path test = writeDirect( + "message NewOptionalGroupInList {" + + " optional group locations (LIST) {" + + " repeated group list {" + + " optional group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("list", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a null element (element field is omitted) + rc.startGroup(); // array level + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("list", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLocations expected = new ListOfLocations(); + expected.addToLocations(new Location(0.0, 0.0)); + // null is not included because thrift does not allow null in lists + //expected.addToLocations(null); + expected.addToLocations(new Location(0.0, 180.0)); + + try { + assertReaderContains(reader(test, ListOfLocations.class), expected); + fail("Should fail: locations are optional and not ignored"); + } catch (RuntimeException e) { + // e is a RuntimeException wrapping the decoding exception + assertTrue(e.getCause().getCause().getMessage().contains("locations")); + } + + assertReaderContains(readerIgnoreNulls(test, ListOfLocations.class), expected); + } + + @Test + public void testNewRequiredGroupInList() throws Exception { + Path test = writeDirect( + "message NewRequiredGroupInList {" + + " optional group locations (LIST) {" + + " repeated group list {" + + " required group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("list", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("list", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLocations expected = new ListOfLocations(); + expected.addToLocations(new Location(0.0, 180.0)); + expected.addToLocations(new Location(0.0, 0.0)); + + assertReaderContains(reader(test, ListOfLocations.class), expected); + } + + @Test + public void testAvroCompatRequiredGroupInList() throws Exception { + Path test = writeDirect( + "message AvroCompatRequiredGroupInList {" + + " optional group locations (LIST) {" + + " repeated group array {" + + " required group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("array", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(90.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(-90.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("array", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLocations expected = new ListOfLocations(); + expected.addToLocations(new Location(90.0, 180.0)); + expected.addToLocations(new Location(-90.0, 0.0)); + + assertReaderContains(reader(test, ListOfLocations.class), expected); + } + + @Test + public void testAvroCompatListInList() throws Exception { + Path test = writeDirect( + "message AvroCompatListInList {" + + " optional group listOfLists (LIST) {" + + " repeated group array (LIST) {" + + " repeated int32 array;" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("array", 0); // start writing array contents + + rc.startGroup(); + rc.startField("array", 0); // start writing inner array contents + + // write [34, 35, 36] + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("array", 0); // finished writing inner array contents + rc.endGroup(); + + // write an empty list + rc.startGroup(); + rc.endGroup(); + + rc.startGroup(); + rc.startField("array", 0); // start writing inner array contents + + // write [32, 33, 34] + rc.addInteger(32); + rc.addInteger(33); + rc.addInteger(34); + + rc.endField("array", 0); // finished writing inner array contents + rc.endGroup(); + + rc.endField("array", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLists expected = new ListOfLists(); + expected.addToListOfLists(Arrays.asList(34, 35, 36)); + expected.addToListOfLists(Arrays.asList()); + expected.addToListOfLists(Arrays.asList(32, 33, 34)); + + // should detect the "array" name + assertReaderContains(reader(test, ListOfLists.class), expected); + } + + @Test + public void testThriftCompatListInList() throws Exception { + Path test = writeDirect( + "message ThriftCompatListInList {" + + " optional group listOfLists (LIST) {" + + " repeated group listOfLists_tuple (LIST) {" + + " repeated int32 listOfLists_tuple_tuple;" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("listOfLists_tuple", 0); // start writing array contents + + rc.startGroup(); + rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents + + // write [34, 35, 36] + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents + rc.endGroup(); + + // write an empty list + rc.startGroup(); + rc.endGroup(); + + rc.startGroup(); + rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents + + // write [32, 33, 34] + rc.addInteger(32); + rc.addInteger(33); + rc.addInteger(34); + + rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents + rc.endGroup(); + + rc.endField("listOfLists_tuple", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLists expected = new ListOfLists(); + expected.addToListOfLists(Arrays.asList(34, 35, 36)); + expected.addToListOfLists(Arrays.asList()); + expected.addToListOfLists(Arrays.asList(32, 33, 34)); + + // should detect the "_tuple" names + assertReaderContains(reader(test, ListOfLists.class), expected); + } + + @Test + public void testOldThriftCompatRequiredGroupInList() throws Exception { + Path test = writeDirect( + "message OldThriftCompatRequiredGroupInList {" + + " optional group locations (LIST) {" + + " repeated group locations_tuple {" + + " required group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("locations_tuple", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("locations_tuple", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLocations expected = new ListOfLocations(); + expected.addToLocations(new Location(0.0, 180.0)); + expected.addToLocations(new Location(0.0, 0.0)); + + assertReaderContains(reader(test, ListOfLocations.class), expected); + } + + @Test + public void testHiveCompatOptionalGroupInList() throws Exception { + Path test = writeDirect( + "message HiveCompatOptionalGroupInList {" + + " optional group locations (LIST) {" + + " repeated group bag {" + + " optional group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("bag", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + // write a second non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(0.0); + rc.endField("longitude", 1); + rc.endGroup(); + + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("bag", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + ListOfLocations expected = new ListOfLocations(); + expected.addToLocations(new Location(0.0, 180.0)); + expected.addToLocations(new Location(0.0, 0.0)); + + try { + assertReaderContains(reader(test, ListOfLocations.class), expected); + fail("Should fail: locations are optional and not ignored"); + } catch (RuntimeException e) { + // e is a RuntimeException wrapping the decoding exception + assertTrue(e.getCause().getCause().getMessage().contains("locations")); + } + + assertReaderContains(readerIgnoreNulls(test, ListOfLocations.class), expected); + } + + public > ParquetReader reader( + Path file, Class thriftClass) throws IOException { + return ThriftParquetReader.build(file) + .withThriftClass(thriftClass) + .build(); + } + + public > ParquetReader readerIgnoreNulls( + Path file, Class thriftClass) throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ThriftRecordConverter.IGNORE_NULL_LIST_ELEMENTS, true); + return ThriftParquetReader.build(file) + .withThriftClass(thriftClass) + .withConf(conf) + .build(); + } + + public void assertReaderContains(ParquetReader reader, T... expected) + throws IOException { + T record; + List actual = Lists.newArrayList(); + while ((record = reader.read()) != null) { + actual.add(record); + } + Assert.assertEquals("Should match exepected records", + Lists.newArrayList(expected), actual); + } +} diff --git a/parquet-thrift/src/test/thrift/array_compat.thrift b/parquet-thrift/src/test/thrift/array_compat.thrift new file mode 100644 index 0000000000..ce03a59fde --- /dev/null +++ b/parquet-thrift/src/test/thrift/array_compat.thrift @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace java org.apache.parquet.thrift.test.compat + +struct ListOfInts { + 1: required list list_of_ints; +} + +struct Location { + 1: required double latitude; + 2: required double longitude; +} + +struct ListOfLocations { + 1: optional list locations; +} + +struct SingleElementGroup { + 1: required i64 count; +} + +struct SingleElementGroupDifferentName { + 1: required i64 differentFieldName; +} + +struct ListOfSingleElementGroups { + 1: optional list single_element_groups; +} + +struct ListOfCounts { + 1: optional list single_element_groups; +} + +struct ListOfLists { + 1: optional list> listOfLists; +}