diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 535bb85546..ea4c37a884 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -84,6 +84,14 @@ + + + src/test/avro + + + src/main/resources + + maven-enforcer-plugin @@ -97,7 +105,15 @@ avro-maven-plugin ${avro.version} + + compile-avsc + generate-test-sources + + schema + + + compile-idl generate-test-sources idl-protocol diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java index f3cb1ec1aa..817f074300 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java @@ -18,10 +18,14 @@ */ package org.apache.parquet.avro; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; @@ -44,6 +48,40 @@ public AvroPrimitiveConverter(ParentValueContainer parent) { } } + abstract static class BinaryConverter extends AvroPrimitiveConverter { + private T[] dict = null; + + public BinaryConverter(ParentValueContainer parent) { + super(parent); + } + + public abstract T convert(Binary binary); + + @Override + public void addBinary(Binary value) { + parent.add(convert(value)); + } + + @Override + public boolean hasDictionarySupport() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public void setDictionary(Dictionary dictionary) { + dict = (T[]) new Object[dictionary.getMaxId() + 1]; + for (int i = 0; i <= dictionary.getMaxId(); i++) { + dict[i] = convert(dictionary.decodeToBinary(i)); + } + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + parent.add(dict[dictionaryId]); + } + } + static final class FieldByteConverter extends AvroPrimitiveConverter { public FieldByteConverter(ParentValueContainer parent) { super(parent); @@ -54,6 +92,7 @@ public void addInt(int value) { parent.addByte((byte) value); } } + static final class FieldShortConverter extends AvroPrimitiveConverter { public FieldShortConverter(ParentValueContainer parent) { super(parent); @@ -133,7 +172,6 @@ final public void addLong(long value) { final public void addFloat(float value) { parent.addFloat(value); } - } static final class FieldDoubleConverter extends AvroPrimitiveConverter { @@ -162,62 +200,84 @@ final public void addDouble(double value) { } } - static final class FieldByteArrayConverter extends AvroPrimitiveConverter { + static final class FieldByteArrayConverter extends BinaryConverter { public FieldByteArrayConverter(ParentValueContainer parent) { super(parent); } @Override - final public void addBinary(Binary value) { - parent.add(value.getBytes()); + public byte[] convert(Binary binary) { + return binary.getBytes(); } } - static final class FieldByteBufferConverter extends AvroPrimitiveConverter { + static final class FieldByteBufferConverter extends BinaryConverter { public FieldByteBufferConverter(ParentValueContainer parent) { super(parent); } @Override - final public void addBinary(Binary value) { - parent.add(ByteBuffer.wrap(value.getBytes())); + public ByteBuffer convert(Binary binary) { + return ByteBuffer.wrap(binary.getBytes()); } } - static final class FieldStringConverter extends AvroPrimitiveConverter { - // TODO: dictionary support should be generic and provided by a parent - // TODO: this always produces strings, but should respect avro.java.string - private String[] dict; - + static final class FieldStringConverter extends BinaryConverter { public FieldStringConverter(ParentValueContainer parent) { super(parent); } @Override - final public void addBinary(Binary value) { - parent.add(value.toStringUsingUTF8()); + public String convert(Binary binary) { + return binary.toStringUsingUTF8(); } + } - @Override - public boolean hasDictionarySupport() { - return true; + static final class FieldUTF8Converter extends BinaryConverter { + public FieldUTF8Converter(ParentValueContainer parent) { + super(parent); } @Override - public void setDictionary(Dictionary dictionary) { - dict = new String[dictionary.getMaxId() + 1]; - for (int i = 0; i <= dictionary.getMaxId(); i++) { - dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8(); + public Utf8 convert(Binary binary) { + return new Utf8(binary.getBytes()); + } + } + + static final class FieldStringableConverter extends BinaryConverter { + private final String stringableName; + private final Constructor ctor; + + public FieldStringableConverter(ParentValueContainer parent, + Class stringableClass) { + super(parent); + stringableName = stringableClass.getName(); + try { + this.ctor = stringableClass.getConstructor(String.class); + } catch (NoSuchMethodException e) { + throw new ParquetDecodingException( + "Unable to get String constructor for " + stringableName, e); } } @Override - public void addValueFromDictionary(int dictionaryId) { - parent.add(dict[dictionaryId]); + public Object convert(Binary binary) { + try { + return ctor.newInstance(binary.toStringUsingUTF8()); + } catch (InstantiationException e) { + throw new ParquetDecodingException( + "Cannot convert binary to " + stringableName, e); + } catch (IllegalAccessException e) { + throw new ParquetDecodingException( + "Cannot convert binary to " + stringableName, e); + } catch (InvocationTargetException e) { + throw new ParquetDecodingException( + "Cannot convert binary to " + stringableName, e); + } } } - static final class FieldEnumConverter extends AvroPrimitiveConverter { + static final class FieldEnumConverter extends BinaryConverter { private final Schema schema; private final GenericData model; @@ -229,12 +289,12 @@ public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema, } @Override - final public void addBinary(Binary value) { - parent.add(model.createEnum(value.toStringUsingUTF8(), schema)); + public Object convert(Binary binary) { + return model.createEnum(binary.toStringUsingUTF8(), schema); } } - static final class FieldFixedConverter extends AvroPrimitiveConverter { + static final class FieldFixedConverter extends BinaryConverter { private final Schema schema; private final GenericData model; @@ -246,8 +306,8 @@ public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema, } @Override - final public void addBinary(Binary value) { - parent.add(model.createFixed(null /* reuse */, value.getBytes(), schema)); + public Object convert(Binary binary) { + return model.createFixed(null /* reuse */, binary.getBytes(), schema); } } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index ed1b97eb87..57ad18a71a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.shorts.ShortArrayList; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -36,13 +37,15 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.Stringable; import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.ClassUtils; import org.apache.parquet.Preconditions; +import org.apache.parquet.avro.AvroConverters.FieldStringConverter; +import org.apache.parquet.avro.AvroConverters.FieldStringableConverter; import org.apache.parquet.io.InvalidRecordException; -import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -59,6 +62,10 @@ */ class AvroRecordConverter extends AvroConverters.AvroGroupConverter { + private static final String STRINGABLE_PROP = "avro.java.string"; + private static final String JAVA_CLASS_PROP = "java-class"; + private static final String JAVA_KEY_CLASS_PROP = "java-key-class"; + protected T currentRecord; private final Converter[] converters; @@ -86,18 +93,40 @@ public AvroRecordConverter(ParentValueContainer parent, avroFieldIndexes.put(field.name(), avroFieldIndex++); } + Class recordClass = null; + if (model instanceof ReflectData) { + recordClass = getDatumClass(avroSchema, model); + } + int parquetFieldIndex = 0; for (Type parquetField: parquetSchema.getFields()) { final Schema.Field avroField = getAvroField(parquetField.getName()); Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema()); final int finalAvroIndex = avroFieldIndexes.remove(avroField.name()); - converters[parquetFieldIndex++] = newConverter( - nonNullSchema, parquetField, this.model, new ParentValueContainer() { + ParentValueContainer container = new ParentValueContainer() { @Override public void add(Object value) { AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value); } - }); + }; + converters[parquetFieldIndex] = newConverter( + nonNullSchema, parquetField, this.model, container); + + // @Stringable doesn't affect the reflected schema; must be enforced here + if (recordClass != null && + converters[parquetFieldIndex] instanceof FieldStringConverter) { + try { + Field field = recordClass.getDeclaredField(avroField.name()); + if (field.isAnnotationPresent(Stringable.class)) { + converters[parquetFieldIndex] = new FieldStringableConverter( + container, field.getType()); + } + } catch (NoSuchFieldException e) { + // must not be stringable + } + } + + parquetFieldIndex += 1; } // store defaults for any new Avro fields from avroSchema that are not in @@ -163,7 +192,7 @@ private static Converter newConverter(Schema schema, Type type, } return new AvroConverters.FieldByteBufferConverter(parent); } else if (schema.getType().equals(Schema.Type.STRING)) { - return new AvroConverters.FieldStringConverter(parent); + return newStringConverter(schema, model, parent); } else if (schema.getType().equals(Schema.Type.RECORD)) { return new AvroRecordConverter(parent, type.asGroupType(), schema, model); } else if (schema.getType().equals(Schema.Type.ENUM)) { @@ -188,6 +217,51 @@ private static Converter newConverter(Schema schema, Type type, "Cannot convert Avro type: %s to Parquet type: %s", schema, type)); } + private static Converter newStringConverter(Schema schema, GenericData model, + ParentValueContainer parent) { + Class stringableClass = getStringableClass(schema, model); + if (stringableClass == String.class) { + return new FieldStringConverter(parent); + } else if (stringableClass == CharSequence.class) { + return new AvroConverters.FieldUTF8Converter(parent); + } + return new FieldStringableConverter(parent, stringableClass); + } + + private static Class getStringableClass(Schema schema, GenericData model) { + if (model instanceof SpecificData) { + // both specific and reflect (and any subclasses) use this logic + boolean isMap = (schema.getType() == Schema.Type.MAP); + String stringableClass = schema.getProp( + isMap ? JAVA_KEY_CLASS_PROP : JAVA_CLASS_PROP); + if (stringableClass != null) { + try { + return ClassUtils.forName(model.getClassLoader(), stringableClass); + } catch (ClassNotFoundException e) { + // not available, use a String instead + } + } + } + + if (ReflectData.class.isAssignableFrom(model.getClass())) { + // reflect uses String, not Utf8 + return String.class; + } + + // generic and specific use the avro.java.string setting + String name = schema.getProp(STRINGABLE_PROP); + if (name == null) { + return CharSequence.class; + } + + switch (GenericData.StringType.valueOf(name)) { + case String: + return String.class; + default: + return CharSequence.class; // will use Utf8 + } + } + @SuppressWarnings("unchecked") private static Class getDatumClass(Schema schema, GenericData model) { if (model instanceof SpecificData) { @@ -733,13 +807,13 @@ public void end() { } } - static final class MapConverter extends GroupConverter { + static final class MapConverter extends GroupConverter { private final ParentValueContainer parent; private final Converter keyValueConverter; private final Schema schema; private final Class mapClass; - private Map map; + private Map map; public MapConverter(ParentValueContainer parent, GroupType mapType, Schema mapSchema, GenericData model) { @@ -767,29 +841,31 @@ public void end() { } @SuppressWarnings("unchecked") - private Map newMap() { + private Map newMap() { if (mapClass == null || mapClass.isAssignableFrom(HashMap.class)) { - return new HashMap(); + return new HashMap(); } else { - return (Map) ReflectData.newInstance(mapClass, schema); + return (Map) ReflectData.newInstance(mapClass, schema); } } final class MapKeyValueConverter extends GroupConverter { - private String key; + private K key; private V value; private final Converter keyConverter; private final Converter valueConverter; public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, GenericData model) { - keyConverter = new PrimitiveConverter() { - @Override - final public void addBinary(Binary value) { - key = value.toStringUsingUTF8(); - } - }; + keyConverter = newStringConverter(mapSchema, model, + new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + MapKeyValueConverter.this.key = (K) value; + } + }); Type valueType = keyValueType.getType(1); Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); diff --git a/parquet-avro/src/test/avro/stringBehavior.avsc b/parquet-avro/src/test/avro/stringBehavior.avsc new file mode 100644 index 0000000000..7787b59c7b --- /dev/null +++ b/parquet-avro/src/test/avro/stringBehavior.avsc @@ -0,0 +1,35 @@ +{ + "name" : "StringBehaviorTest", + "namespace": "org.apache.parquet.avro", + "type" : "record", + "fields" : [ { + "name" : "default_class", + "type" : "string" + }, { + "name" : "string_class", + "type" : {"type": "string", "avro.java.string": "String"} + }, { + "name" : "stringable_class", + "type" : {"type": "string", "java-class": "java.math.BigDecimal"} + }, { + "name" : "default_map", + "type" : { + "type" : "map", + "values" : "int" + } + }, { + "name" : "string_map", + "type" : { + "type" : "map", + "values" : "int", + "avro.java.string": "String" + } + }, { + "name" : "stringable_map", + "type" : { + "type" : "map", + "values" : "int", + "java-key-class": "java.math.BigDecimal" + } + } ] +} diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java index d907bd462b..b8f34af1ad 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java @@ -21,6 +21,7 @@ import com.google.common.io.Resources; import java.io.IOException; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.junit.Assert; @@ -60,7 +61,7 @@ public void testStringCompatibility() throws IOException { GenericRecord r; while ((r = reader.read()) != null) { Assert.assertTrue("Should read value into a String", - r.get("text") instanceof String); + r.get("text") instanceof Utf8); } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index bea0237f59..4d37f40bb1 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -19,6 +19,7 @@ package org.apache.parquet.avro; import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; @@ -149,10 +150,10 @@ public void testMapWithNulls() throws Exception { .build(); // Write a record with a null value - Map map = new HashMap(); - map.put("thirty-four", 34); - map.put("eleventy-one", null); - map.put("one-hundred", 100); + Map map = new HashMap(); + map.put(str("thirty-four"), 34); + map.put(str("eleventy-one"), null); + map.put(str("one-hundred"), 100); GenericData.Record record = new GenericRecordBuilder(schema) .set("mymap", map).build(); @@ -221,7 +222,7 @@ public void testMapWithUtf8Key() throws Exception { GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); - assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap")); + assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); } @Test @@ -298,14 +299,14 @@ public void testAll() throws Exception { assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); - assertEquals("hello", nextRecord.get("mystring")); + assertEquals(str("hello"), nextRecord.get("mystring")); assertEquals(expectedEnumSymbol, nextRecord.get("myenum")); assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); assertEquals(integerArray, nextRecord.get("myarray")); assertEquals(emptyArray, nextRecord.get("myemptyarray")); assertEquals(integerArray, nextRecord.get("myoptionalarray")); assertEquals(genericIntegerArrayWithNulls, nextRecord.get("myarrayofoptional")); - assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap")); + assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); assertEquals(emptyMap, nextRecord.get("myemptymap")); assertEquals(genericFixed, nextRecord.get("myfixed")); } @@ -517,16 +518,22 @@ public void write(Map record) { assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); - assertEquals("hello", nextRecord.get("mystring")); - assertEquals("a", nextRecord.get("myenum")); + assertEquals(str("hello"), nextRecord.get("mystring")); + assertEquals(str("a"), nextRecord.get("myenum")); // enum symbols are unknown assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); assertEquals(integerArray, nextRecord.get("myarray")); assertEquals(integerArray, nextRecord.get("myoptionalarray")); assertEquals(ingeterArrayWithNulls, nextRecord.get("myarrayofoptional")); assertEquals(genericRecordArray, nextRecord.get("myrecordarray")); - assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap")); + assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); assertEquals(genericFixed, nextRecord.get("myfixed")); } + /** + * Return a String or Utf8 depending on whether compatibility is on + */ + public CharSequence str(String value) { + return compat ? value : new Utf8(value); + } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java similarity index 96% rename from parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java rename to parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java index 5dd58f8b5c..34a160ae1f 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java @@ -58,7 +58,7 @@ import static org.junit.Assert.fail; @RunWith(Parameterized.class) -public class TestReadWriteOldBehavior { +public class TestReadWriteOldListBehavior { @Parameterized.Parameters public static Collection data() { @@ -71,7 +71,7 @@ public static Collection data() { private final boolean compat; private final Configuration testConf = new Configuration(false); - public TestReadWriteOldBehavior(boolean compat) { + public TestReadWriteOldListBehavior(boolean compat) { this.compat = compat; this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); } @@ -144,10 +144,10 @@ public void testMapWithNulls() throws Exception { new AvroParquetWriter(file, schema); // Write a record with a null value - Map map = new HashMap(); - map.put("thirty-four", 34); - map.put("eleventy-one", null); - map.put("one-hundred", 100); + Map map = new HashMap(); + map.put(str("thirty-four"), 34); + map.put(str("eleventy-one"), null); + map.put(str("one-hundred"), 100); GenericData.Record record = new GenericRecordBuilder(schema) .set("mymap", map).build(); @@ -210,7 +210,7 @@ public void testMapWithUtf8Key() throws Exception { GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); - assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap")); + assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); } @Test @@ -277,14 +277,14 @@ public void testAll() throws Exception { assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); - assertEquals("hello", nextRecord.get("mystring")); + assertEquals(str("hello"), nextRecord.get("mystring")); assertEquals(expectedEnumSymbol, nextRecord.get("myenum")); assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); assertEquals(integerArray, nextRecord.get("myarray")); assertEquals(emptyArray, nextRecord.get("myemptyarray")); assertEquals(integerArray, nextRecord.get("myoptionalarray")); assertEquals(integerArray, nextRecord.get("myarrayofoptional")); - assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap")); + assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); assertEquals(emptyMap, nextRecord.get("myemptymap")); assertEquals(genericFixed, nextRecord.get("myfixed")); } @@ -573,16 +573,22 @@ public void write(Map record) { assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); - assertEquals("hello", nextRecord.get("mystring")); - assertEquals("a", nextRecord.get("myenum")); + assertEquals(str("hello"), nextRecord.get("mystring")); + assertEquals(str("a"), nextRecord.get("myenum")); assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); assertEquals(integerArray, nextRecord.get("myarray")); assertEquals(integerArray, nextRecord.get("myoptionalarray")); assertEquals(genericRecordArrayWithNullIntegers, nextRecord.get("myarrayofoptional")); assertEquals(genericRecordArray, nextRecord.get("myrecordarray")); - assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap")); + assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); assertEquals(genericFixed, nextRecord.get("myfixed")); } + /** + * Return a String or Utf8 depending on whether compatibility is on + */ + public CharSequence str(String value) { + return compat ? value : new Utf8(value); + } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java index dffaf570db..c4bf5bd6ed 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java @@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; @@ -65,14 +66,14 @@ public void testWriteReflectReadGeneric() throws IOException { Path path = writePojosToParquetFile(2, CompressionCodecName.UNCOMPRESSED, false); ParquetReader reader = new AvroParquetReader(conf, path); - GenericRecord object = getGenericPojo(); + GenericRecord object = getGenericPojoUtf8(); for (int i = 0; i < 2; i += 1) { assertEquals(object, reader.read()); } assertNull(reader.read()); } - private GenericRecord getGenericPojo() { + private GenericRecord getGenericPojoUtf8() { Schema schema = ReflectData.get().getSchema(Pojo.class); GenericData.Record record = new GenericData.Record(schema); record.put("myboolean", true); @@ -83,21 +84,21 @@ private GenericRecord getGenericPojo() { record.put("myfloat", 3.1f); record.put("mydouble", 4.1); record.put("mybytes", ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 })); - record.put("mystring", "Hello"); + record.put("mystring", new Utf8("Hello")); record.put("myenum", new GenericData.EnumSymbol( schema.getField("myenum").schema(), "A")); - Map map = new HashMap(); - map.put("a", "1"); - map.put("b", "2"); + Map map = new HashMap(); + map.put(new Utf8("a"), new Utf8("1")); + map.put(new Utf8("b"), new Utf8("2")); record.put("mymap", map); record.put("myshortarray", new GenericData.Array( schema.getField("myshortarray").schema(), Lists.newArrayList(1, 2))); record.put("myintarray", new GenericData.Array( schema.getField("myintarray").schema(), Lists.newArrayList(1, 2))); - record.put("mystringarray", new GenericData.Array( - schema.getField("mystringarray").schema(), Lists.newArrayList("a", "b"))); - record.put("mylist", new GenericData.Array( - schema.getField("mylist").schema(), Lists.newArrayList("a", "b", "c"))); + record.put("mystringarray", new GenericData.Array( + schema.getField("mystringarray").schema(), Lists.newArrayList(new Utf8("a"), new Utf8("b")))); + record.put("mylist", new GenericData.Array( + schema.getField("mylist").schema(), Lists.newArrayList(new Utf8("a"), new Utf8("b"), new Utf8("c")))); return record; } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java new file mode 100644 index 0000000000..c0cad995a5 --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.io.Resources; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.reflect.AvroSchema; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.Stringable; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Map; + +public class TestStringBehavior { + public static Schema SCHEMA = null; + public static BigDecimal BIG_DECIMAL = new BigDecimal("3.14"); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + public Path parquetFile; + public File avroFile; + + @BeforeClass + public static void readSchemaFile() throws IOException { + TestStringBehavior.SCHEMA = new Schema.Parser().parse( + Resources.getResource("stringBehavior.avsc").openStream()); + } + + @Before + public void writeDataFiles() throws IOException { + // convert BIG_DECIMAL to string by hand so generic can write it + GenericRecord record = new GenericRecordBuilder(SCHEMA) + .set("default_class", "default") + .set("string_class", "string") + .set("stringable_class", BIG_DECIMAL.toString()) + .set("default_map", ImmutableMap.of("default_key", 34)) + .set("string_map", ImmutableMap.of("string_key", 35)) + .set("stringable_map", ImmutableMap.of(BIG_DECIMAL.toString(), 36)) + .build(); + + File file = temp.newFile("parquet"); + file.delete(); + file.deleteOnExit(); + + parquetFile = new Path(file.getPath()); + ParquetWriter parquet = AvroParquetWriter + .builder(parquetFile) + .withDataModel(GenericData.get()) + .withSchema(SCHEMA) + .build(); + + try { + parquet.write(record); + } finally { + parquet.close(); + } + + avroFile = temp.newFile("avro"); + avroFile.delete(); + avroFile.deleteOnExit(); + DataFileWriter avro = new DataFileWriter( + new GenericDatumWriter(SCHEMA)).create(SCHEMA, avroFile); + + try { + avro.append(record); + } finally { + avro.close(); + } + } + + @Test + public void testGeneric() throws IOException { + GenericRecord avroRecord; + DataFileReader avro = new DataFileReader( + avroFile, new GenericDatumReader(SCHEMA)); + try { + avroRecord = avro.next(); + } finally { + avro.close(); + } + + GenericRecord parquetRecord; + Configuration conf = new Configuration(); + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); + AvroReadSupport.setAvroDataSupplier(conf, GenericDataSupplier.class); + AvroReadSupport.setAvroReadSchema(conf, SCHEMA); + ParquetReader parquet = AvroParquetReader + .builder(parquetFile) + .withConf(conf) + .build(); + try { + parquetRecord = parquet.read(); + } finally { + parquet.close(); + } + + Assert.assertEquals("Avro default string class should be Utf8", + Utf8.class, avroRecord.get("default_class").getClass()); + Assert.assertEquals("Parquet default string class should be Utf8", + Utf8.class, parquetRecord.get("default_class").getClass()); + + Assert.assertEquals("Avro avro.java.string=String class should be String", + String.class, avroRecord.get("string_class").getClass()); + Assert.assertEquals("Parquet avro.java.string=String class should be String", + String.class, parquetRecord.get("string_class").getClass()); + + Assert.assertEquals("Avro stringable class should be Utf8", + Utf8.class, avroRecord.get("stringable_class").getClass()); + Assert.assertEquals("Parquet stringable class should be Utf8", + Utf8.class, parquetRecord.get("stringable_class").getClass()); + + Assert.assertEquals("Avro map default string class should be Utf8", + Utf8.class, keyClass(avroRecord.get("default_map"))); + Assert.assertEquals("Parquet map default string class should be Utf8", + Utf8.class, keyClass(parquetRecord.get("default_map"))); + + Assert.assertEquals("Avro map avro.java.string=String class should be String", + String.class, keyClass(avroRecord.get("string_map"))); + Assert.assertEquals("Parquet map avro.java.string=String class should be String", + String.class, keyClass(parquetRecord.get("string_map"))); + + Assert.assertEquals("Avro map stringable class should be Utf8", + Utf8.class, keyClass(avroRecord.get("stringable_map"))); + Assert.assertEquals("Parquet map stringable class should be Utf8", + Utf8.class, keyClass(parquetRecord.get("stringable_map"))); + } + + + @Test + public void testSpecific() throws IOException { + org.apache.parquet.avro.StringBehaviorTest avroRecord; + DataFileReader avro = + new DataFileReader(avroFile, + new SpecificDatumReader( + org.apache.parquet.avro.StringBehaviorTest.getClassSchema())); + try { + avroRecord = avro.next(); + } finally { + avro.close(); + } + + org.apache.parquet.avro.StringBehaviorTest parquetRecord; + Configuration conf = new Configuration(); + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); + AvroReadSupport.setAvroDataSupplier(conf, SpecificDataSupplier.class); + AvroReadSupport.setAvroReadSchema(conf, + org.apache.parquet.avro.StringBehaviorTest.getClassSchema()); + ParquetReader parquet = + AvroParquetReader + .builder(parquetFile) + .withConf(conf) + .build(); + try { + parquetRecord = parquet.read(); + } finally { + parquet.close(); + } + + Assert.assertEquals("Avro default string class should be String", + Utf8.class, avroRecord.default_class.getClass()); + Assert.assertEquals("Parquet default string class should be String", + Utf8.class, parquetRecord.default_class.getClass()); + + Assert.assertEquals("Avro avro.java.string=String class should be String", + String.class, avroRecord.string_class.getClass()); + Assert.assertEquals("Parquet avro.java.string=String class should be String", + String.class, parquetRecord.string_class.getClass()); + + Assert.assertEquals("Avro stringable class should be BigDecimal", + BigDecimal.class, avroRecord.stringable_class.getClass()); + Assert.assertEquals("Parquet stringable class should be BigDecimal", + BigDecimal.class, parquetRecord.stringable_class.getClass()); + Assert.assertEquals("Should have the correct BigDecimal value", + BIG_DECIMAL, parquetRecord.stringable_class); + + Assert.assertEquals("Avro map default string class should be String", + Utf8.class, keyClass(avroRecord.default_map)); + Assert.assertEquals("Parquet map default string class should be String", + Utf8.class, keyClass(parquetRecord.default_map)); + + Assert.assertEquals("Avro map avro.java.string=String class should be String", + String.class, keyClass(avroRecord.string_map)); + Assert.assertEquals("Parquet map avro.java.string=String class should be String", + String.class, keyClass(parquetRecord.string_map)); + + Assert.assertEquals("Avro map stringable class should be BigDecimal", + BigDecimal.class, keyClass(avroRecord.stringable_map)); + Assert.assertEquals("Parquet map stringable class should be BigDecimal", + BigDecimal.class, keyClass(parquetRecord.stringable_map)); + } + + @Test + public void testReflect() throws IOException { + Schema reflectSchema = ReflectData.get() + .getSchema(ReflectRecord.class); + + ReflectRecord avroRecord; + DataFileReader avro = new DataFileReader( + avroFile, new ReflectDatumReader(reflectSchema)); + try { + avroRecord = avro.next(); + } finally { + avro.close(); + } + + ReflectRecord parquetRecord; + Configuration conf = new Configuration(); + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); + AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class); + AvroReadSupport.setAvroReadSchema(conf, reflectSchema); + ParquetReader parquet = AvroParquetReader + .builder(parquetFile) + .withConf(conf) + .build(); + try { + parquetRecord = parquet.read(); + } finally { + parquet.close(); + } + + Assert.assertEquals("Avro default string class should be String", + String.class, avroRecord.default_class.getClass()); + Assert.assertEquals("Parquet default string class should be String", + String.class, parquetRecord.default_class.getClass()); + + Assert.assertEquals("Avro avro.java.string=String class should be String", + String.class, avroRecord.string_class.getClass()); + Assert.assertEquals("Parquet avro.java.string=String class should be String", + String.class, parquetRecord.string_class.getClass()); + + Assert.assertEquals("Avro stringable class should be BigDecimal", + BigDecimal.class, avroRecord.stringable_class.getClass()); + Assert.assertEquals("Parquet stringable class should be BigDecimal", + BigDecimal.class, parquetRecord.stringable_class.getClass()); + Assert.assertEquals("Should have the correct BigDecimal value", + BIG_DECIMAL, parquetRecord.stringable_class); + + Assert.assertEquals("Avro map default string class should be String", + String.class, keyClass(avroRecord.default_map)); + Assert.assertEquals("Parquet map default string class should be String", + String.class, keyClass(parquetRecord.default_map)); + + Assert.assertEquals("Avro map avro.java.string=String class should be String", + String.class, keyClass(avroRecord.string_map)); + Assert.assertEquals("Parquet map avro.java.string=String class should be String", + String.class, keyClass(parquetRecord.string_map)); + + Assert.assertEquals("Avro map stringable class should be BigDecimal", + BigDecimal.class, keyClass(avroRecord.stringable_map)); + Assert.assertEquals("Parquet map stringable class should be BigDecimal", + BigDecimal.class, keyClass(parquetRecord.stringable_map)); + } + + @Test + public void testReflectJavaClass() throws IOException { + Schema reflectSchema = ReflectData.get() + .getSchema(ReflectRecordJavaClass.class); + System.err.println("Schema: " + reflectSchema.toString(true)); + ReflectRecordJavaClass avroRecord; + DataFileReader avro = + new DataFileReader(avroFile, + new ReflectDatumReader(reflectSchema)); + try { + avroRecord = avro.next(); + } finally { + avro.close(); + } + + ReflectRecordJavaClass parquetRecord; + Configuration conf = new Configuration(); + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); + AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class); + AvroReadSupport.setAvroReadSchema(conf, reflectSchema); + AvroReadSupport.setRequestedProjection(conf, reflectSchema); + ParquetReader parquet = AvroParquetReader + .builder(parquetFile) + .withConf(conf) + .build(); + try { + parquetRecord = parquet.read(); + } finally { + parquet.close(); + } + + // Avro uses String even if CharSequence is set + Assert.assertEquals("Avro default string class should be String", + String.class, avroRecord.default_class.getClass()); + Assert.assertEquals("Parquet default string class should be String", + String.class, parquetRecord.default_class.getClass()); + + Assert.assertEquals("Avro stringable class should be BigDecimal", + BigDecimal.class, avroRecord.stringable_class.getClass()); + Assert.assertEquals("Parquet stringable class should be BigDecimal", + BigDecimal.class, parquetRecord.stringable_class.getClass()); + Assert.assertEquals("Should have the correct BigDecimal value", + BIG_DECIMAL, parquetRecord.stringable_class); + } + + public static class ReflectRecord { + private String default_class; + private String string_class; + @Stringable + private BigDecimal stringable_class; + private Map default_map; + private Map string_map; + private Map stringable_map; + } + + public static class ReflectRecordJavaClass { + // test avro.java.string behavior + @AvroSchema("{\"type\": \"string\", \"avro.java.string\": \"CharSequence\"}") + private String default_class; + // test using java-class property instead of Stringable + @AvroSchema("{\"type\": \"string\", \"java-class\": \"java.math.BigDecimal\"}") + private BigDecimal stringable_class; + } + + public static Class keyClass(Object obj) { + Assert.assertTrue("Should be a map", obj instanceof Map); + Map map = (Map) obj; + return Iterables.getFirst(map.keySet(), null).getClass(); + } +}