diff --git a/parquet-avro/README.md b/parquet-avro/README.md new file mode 100644 index 0000000000..8338165d0a --- /dev/null +++ b/parquet-avro/README.md @@ -0,0 +1,44 @@ + + +Apache Avro integration +====== + +**TODO**: Add description and examples how to use parquet-avro + +## Available options via Hadoop Configuration + +### Configuration for reading + +| Name | Type | Description | +|-----------------------------------------|-----------|----------------------------------------------------------------------| +| `parquet.avro.data.supplier` | `Class` | The implementation of the interface org.apache.parquet.avro.AvroDataSupplier. Available implementations in the library: GenericDataSupplier, ReflectDataSupplier, SpecificDataSupplier.
The default value is `org.apache.parquet.avro.SpecificDataSupplier` | +| `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. | +| `parquet.avro.projection` | `String` | The Avro schema to be used for projection. | +| `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.
The default value is `true`. | + +### Configuration for writing + +| Name | Type | Description | +|-----------------------------------------|-----------|----------------------------------------------------------------------| +| `parquet.avro.write.data.supplier` | `Class` | The implementation of the interface org.apache.parquet.avro.AvroDataSupplier. Available implementations in the library: GenericDataSupplier, ReflectDataSupplier, SpecificDataSupplier.
The default value is `org.apache.parquet.avro.SpecificDataSupplier` | +| `parquet.avro.schema` | `String` | The Avro schema to be used for generating the Parquet schema of the file. | +| `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.
The default value is `true` | +| `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schmea is a list element.
The default value is `true`. | +| `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.
The default value is `false`. | diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java index b9c07cf786..c3722583c3 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java @@ -29,6 +29,8 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.PrimitiveStringifier; +import org.apache.parquet.schema.PrimitiveType; public class AvroConverters { @@ -314,4 +316,18 @@ public Object convert(Binary binary) { return model.createFixed(null /* reuse */, binary.getBytes(), schema); } } + + static final class FieldUUIDConverter extends BinaryConverter { + private final PrimitiveStringifier stringifier; + + public FieldUUIDConverter(ParentValueContainer parent, PrimitiveType type) { + super(parent); + stringifier = type.stringifier(); + } + + @Override + public String convert(Binary binary) { + return stringifier.stringify(binary); + } + } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 45504b6cde..ea5b907157 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,20 +33,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.LinkedHashMap; import org.apache.avro.AvroTypeException; import org.apache.avro.Conversion; import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.AvroIgnore; import org.apache.avro.reflect.AvroName; -import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.Stringable; import org.apache.avro.specific.SpecificData; @@ -54,7 +51,6 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.avro.AvroConverters.FieldStringConverter; import org.apache.parquet.avro.AvroConverters.FieldStringableConverter; -import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -280,6 +276,9 @@ private static Converter newConverter(Schema schema, Type type, } return new AvroConverters.FieldByteBufferConverter(parent); case STRING: + if (logicalType != null && logicalType.getName().equals(LogicalTypes.uuid().getName())) { + return new AvroConverters.FieldUUIDConverter(parent, type.asPrimitiveType()); + } return newStringConverter(schema, model, parent); case RECORD: return new AvroRecordConverter(parent, type.asGroupType(), schema, model); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 0cece97b86..c1bb3c50bf 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -31,6 +31,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import java.util.ArrayList; import java.util.Arrays; @@ -44,6 +45,8 @@ import static org.apache.avro.JsonProperties.NULL_VALUE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT; +import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID; +import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID_DEFAULT; import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS; import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS; import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType; @@ -52,6 +55,7 @@ import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; import static org.apache.parquet.schema.Type.Repetition.REPEATED; @@ -69,10 +73,10 @@ public class AvroSchemaConverter { private final boolean assumeRepeatedIsListElement; private final boolean writeOldListStructure; + private final boolean writeParquetUUID; public AvroSchemaConverter() { - this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT; - this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; + this(ADD_LIST_ELEMENT_RECORDS_DEFAULT); } /** @@ -84,6 +88,7 @@ public AvroSchemaConverter() { AvroSchemaConverter(boolean assumeRepeatedIsListElement) { this.assumeRepeatedIsListElement = assumeRepeatedIsListElement; this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; + this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; } public AvroSchemaConverter(Configuration conf) { @@ -91,6 +96,7 @@ public AvroSchemaConverter(Configuration conf) { ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT); this.writeOldListStructure = conf.getBoolean( WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT); + this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT); } /** @@ -145,6 +151,7 @@ private Type convertField(String fieldName, Schema schema) { private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) { Types.PrimitiveBuilder builder; Schema.Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); if (type.equals(Schema.Type.BOOLEAN)) { builder = Types.primitive(BOOLEAN, repetition); } else if (type.equals(Schema.Type.INT)) { @@ -158,7 +165,12 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet } else if (type.equals(Schema.Type.BYTES)) { builder = Types.primitive(BINARY, repetition); } else if (type.equals(Schema.Type.STRING)) { - builder = Types.primitive(BINARY, repetition).as(stringType()); + if (logicalType != null && logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) { + builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .length(LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.BYTES); + } else { + builder = Types.primitive(BINARY, repetition).as(stringType()); + } } else if (type.equals(Schema.Type.RECORD)) { return new GroupType(repetition, fieldName, convertFields(schema.getFields())); } else if (type.equals(Schema.Type.ENUM)) { @@ -186,7 +198,6 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet // schema translation can only be done for known logical types because this // creates an equivalence - LogicalType logicalType = schema.getLogicalType(); if (logicalType != null) { if (logicalType instanceof LogicalTypes.Decimal) { LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; @@ -306,8 +317,12 @@ public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) { } @Override public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) { - int size = parquetType.asPrimitiveType().getTypeLength(); - return Schema.createFixed(parquetType.getName(), null, null, size); + if (annotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) { + return Schema.create(Schema.Type.STRING); + } else { + int size = parquetType.asPrimitiveType().getTypeLength(); + return Schema.createFixed(parquetType.getName(), null, null, size); + } } @Override public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) { @@ -419,6 +434,8 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) { return timestampType(true, MILLIS); } else if (logicalType instanceof LogicalTypes.TimestampMicros) { return timestampType(true, MICROS); + } else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) { + return uuidType(); } return null; } @@ -461,6 +478,11 @@ public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnn } return empty(); } + + @Override + public Optional visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + return of(LogicalTypes.uuid()); + } }).orElse(null); } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 859a133c3c..0616120f3a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; + import org.apache.avro.Conversion; import org.apache.avro.LogicalType; import org.apache.avro.Schema; @@ -35,6 +37,7 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.apache.hadoop.util.ReflectionUtils; @@ -60,6 +63,8 @@ public static void setAvroDataSupplier( public static final String WRITE_OLD_LIST_STRUCTURE = "parquet.avro.write-old-list-structure"; static final boolean WRITE_OLD_LIST_STRUCTURE_DEFAULT = true; + public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid"; + static final boolean WRITE_PARQUET_UUID_DEFAULT = false; private static final String MAP_REPEATED_NAME = "key_value"; private static final String MAP_KEY_NAME = "key"; @@ -228,7 +233,7 @@ private void writeMap(GroupType schema, Schema avroSchema, recordConsumer.endGroup(); } - private void writeUnion(GroupType parquetSchema, Schema avroSchema, + private void writeUnion(GroupType parquetSchema, Schema avroSchema, Object value) { recordConsumer.startGroup(); @@ -343,7 +348,11 @@ private void writeValueWithoutConversion(Type type, Schema avroSchema, Object va } break; case STRING: - recordConsumer.addBinary(fromAvroString(value)); + if (type.asPrimitiveType().getLogicalTypeAnnotation() instanceof UUIDLogicalTypeAnnotation) { + recordConsumer.addBinary(fromUUIDString(value)); + } else { + recordConsumer.addBinary(fromAvroString(value)); + } break; case RECORD: writeRecord(type.asGroupType(), avroSchema, value); @@ -363,6 +372,20 @@ private void writeValueWithoutConversion(Type type, Schema avroSchema, Object va } } + private Binary fromUUIDString(Object value) { + byte[] data = new byte[UUIDLogicalTypeAnnotation.BYTES]; + UUID uuid = UUID.fromString(value.toString()); + writeLong(data, 0, uuid.getMostSignificantBits()); + writeLong(data, Long.BYTES, uuid.getLeastSignificantBits()); + return Binary.fromConstantByteArray(data); + } + + private void writeLong(byte[] array, int offset, long value) { + for (int i = 0; i < Long.BYTES; ++i) { + array[i + offset] = (byte) (value >>> ((Long.BYTES - i - 1) * Byte.SIZE)); + } + } + private Binary fromAvroString(Object value) { if (value instanceof Utf8) { Utf8 utf8 = (Utf8) value; diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java index 717e36e482..8a9e424168 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.junit.Assert; import org.junit.rules.TemporaryFolder; @@ -82,15 +83,17 @@ public static GenericRecord instance(Schema schema, Object... pairs) { } public static List read(GenericData model, Schema schema, File file) throws IOException { + return read(new Configuration(false), model, schema, file); + } + + public static List read(Configuration conf, GenericData model, Schema schema, File file) throws IOException { List data = new ArrayList(); - Configuration conf = new Configuration(false); AvroReadSupport.setRequestedProjection(conf, schema); AvroReadSupport.setAvroReadSchema(conf, schema); try (ParquetReader fileReader = AvroParquetReader - .builder(new Path(file.toString())) + .builder(HadoopInputFile.fromPath(new Path(file.toString()), conf)) .withDataModel(model) // reflect disables compatibility - .withConf(conf) .build()) { D datum; while ((datum = fileReader.read()) != null) { @@ -103,6 +106,12 @@ public static List read(GenericData model, Schema schema, File file) thro @SuppressWarnings("unchecked") public static File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException { + return write(temp, new Configuration(false), model, schema, data); + } + + @SuppressWarnings("unchecked") + public static File write(TemporaryFolder temp, Configuration conf, GenericData model, Schema schema, D... data) + throws IOException { File file = temp.newFile(); Assert.assertTrue(file.delete()); @@ -118,4 +127,10 @@ public static File write(TemporaryFolder temp, GenericData model, Schema sch return file; } + + public static Configuration conf(String name, boolean value) { + Configuration conf = new Configuration(false); + conf.setBoolean(name, value); + return conf; + } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index f5b348b6b0..786477bf95 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -37,6 +37,9 @@ import static org.apache.avro.Schema.Type.INT; import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.STRING; +import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility; +import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; import static org.apache.parquet.avro.AvroTestUtil.field; import static org.apache.parquet.avro.AvroTestUtil.optionalField; import static org.apache.parquet.avro.AvroTestUtil.primitive; @@ -766,6 +769,35 @@ public void testReuseNameInNestedStructureAtSameLevel() throws Exception { testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema); } + @Test + public void testUUIDType() throws Exception { + Schema fromAvro = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)), null, null))); + String parquet = "message myrecord {\n" + + " required binary uuid (STRING);\n" + + "}\n"; + Schema toAvro = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("uuid", Schema.create(STRING), null, null))); + + testAvroToParquetConversion(fromAvro, parquet); + testParquetToAvroConversion(toAvro, parquet); + + assertEquals(COMPATIBLE, checkReaderWriterCompatibility(fromAvro, toAvro).getType()); + } + + @Test + public void testUUIDTypeWithParquetUUID() throws Exception { + Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(STRING)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("uuid", uuid, null, null))); + + testRoundTripConversion(AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), + expected, + "message myrecord {\n" + + " required fixed_len_byte_array(16) uuid (UUID);\n" + + "}\n"); + } + public static Schema optional(Schema original) { return Schema.createUnion(Lists.newArrayList( Schema.create(Schema.Type.NULL), diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java index eb2ef080e1..4ae4ed62eb 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java @@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; @@ -41,6 +42,7 @@ import java.util.UUID; import static org.apache.avro.Schema.Type.STRING; +import static org.apache.parquet.avro.AvroTestUtil.conf; import static org.apache.parquet.avro.AvroTestUtil.field; import static org.apache.parquet.avro.AvroTestUtil.instance; import static org.apache.parquet.avro.AvroTestUtil.optionalField; @@ -60,7 +62,6 @@ public class TestGenericLogicalTypes { public static final BigDecimal D1 = new BigDecimal("-34.34"); public static final BigDecimal D2 = new BigDecimal("117230.00"); - @BeforeClass public static void addDecimalAndUUID() { GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion()); @@ -92,6 +93,25 @@ public void testReadUUID() throws IOException { Arrays.asList(u1, u2), read(GENERIC, uuidSchema, test)); } + @Test + public void testReadUUIDWithParquetUUID() throws IOException { + Schema uuidSchema = record("R", + field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); + GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID()); + File test = write(conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), uuidSchema, u1, u2); + + Assert.assertEquals("Should read UUID objects", + Arrays.asList(u1, u2), read(GENERIC, uuidSchema, test)); + + GenericRecord s1 = instance(uuidSchema, "uuid", u1.get("uuid").toString()); + GenericRecord s2 = instance(uuidSchema, "uuid", u2.get("uuid").toString()); + + Assert.assertEquals("Should read UUID as Strings", + Arrays.asList(s1, s2), read(GenericData.get(), uuidSchema, test)); + + } + @Test public void testWriteUUIDReadStringSchema() throws IOException { Schema uuidSchema = record("R", @@ -130,19 +150,34 @@ public void testWriteNullableUUID() throws IOException { Schema nullableUuidSchema = record("R", optionalField("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); GenericRecord u1 = instance(nullableUuidSchema, "uuid", UUID.randomUUID()); - GenericRecord u2 = instance(nullableUuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(nullableUuidSchema, "uuid", null); Schema stringUuidSchema = Schema.create(STRING); stringUuidSchema.addProp(GenericData.STRING_PROP, "String"); Schema nullableStringSchema = record("R", optionalField("uuid", stringUuidSchema)); GenericRecord s1 = instance(nullableStringSchema, "uuid", u1.get("uuid").toString()); - GenericRecord s2 = instance(nullableStringSchema, "uuid", u2.get("uuid").toString()); + GenericRecord s2 = instance(nullableStringSchema, "uuid", null); File test = write(GENERIC, nullableUuidSchema, u1, u2); Assert.assertEquals("Should read UUIDs as Strings", Arrays.asList(s1, s2), read(GENERIC, nullableStringSchema, test)); } + @Test + public void testWriteNullableUUIDWithParuqetUUID() throws IOException { + Schema nullableUuidSchema = record("R", + optionalField("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING)))); + GenericRecord u1 = instance(nullableUuidSchema, "uuid", UUID.randomUUID()); + GenericRecord u2 = instance(nullableUuidSchema, "uuid", null); + + GenericRecord s1 = instance(nullableUuidSchema, "uuid", u1.get("uuid").toString()); + GenericRecord s2 = instance(nullableUuidSchema, "uuid", null); + + File test = write(GENERIC, nullableUuidSchema, u1, u2); + Assert.assertEquals("Should read UUIDs as Strings", + Arrays.asList(s1, s2), read(GenericData.get(), nullableUuidSchema, test)); + } + @Test public void testReadDecimalFixed() throws IOException { Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4); @@ -246,8 +281,16 @@ private File write(Schema schema, D... data) throws IOException { return write(GenericData.get(), schema, data); } + private File write(Configuration conf, Schema schema, D... data) throws IOException { + return write(conf, GenericData.get(), schema, data); + } + private File write(GenericData model, Schema schema, D... data) throws IOException { return AvroTestUtil.write(temp, model, schema, data); } + private File write(Configuration conf, GenericData model, Schema schema, D... data) throws IOException { + return AvroTestUtil.write(temp, conf, model, schema, data); + } + } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java index 142191f4fc..31661c0ed3 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java @@ -30,6 +30,7 @@ import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificData; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; @@ -41,8 +42,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.UUID; +import static org.apache.parquet.avro.AvroTestUtil.conf; import static org.apache.parquet.avro.AvroTestUtil.read; /** @@ -357,6 +360,43 @@ public void testReadUUID() throws IOException { read(REFLECT, uuidStringSchema, test)); } + @Test + public void testReadUUIDWithParquetUUID() throws IOException { + Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithStringUUID r1 = new RecordWithStringUUID(); + r1.uuid = u1.toString(); + RecordWithStringUUID r2 = new RecordWithStringUUID(); + r2.uuid = u2.toString(); + + List expected = Arrays.asList( + new RecordWithUUID(), new RecordWithUUID()); + expected.get(0).uuid = u1; + expected.get(1).uuid = u2; + + File test = write( + AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), + uuidSchema, r1, r2); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, read(REFLECT, uuidSchema, test)); + + // verify that the field's type overrides the logical type + Schema uuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema()); + + Assert.assertEquals("Should not convert to UUID if accessor is String", + Arrays.asList(r1, r2), + read(REFLECT, uuidStringSchema, test)); + } + @Test public void testWriteUUID() throws IOException { Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) @@ -393,6 +433,41 @@ public void testWriteUUID() throws IOException { read(ReflectData.get(), uuidStringSchema, test)); } + @Test + public void testWriteUUIDWithParuetUUID() throws IOException { + Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithUUID r1 = new RecordWithUUID(); + r1.uuid = u1; + RecordWithUUID r2 = new RecordWithUUID(); + r2.uuid = u2; + + List expected = Arrays.asList( + new RecordWithStringUUID(), new RecordWithStringUUID()); + expected.get(0).uuid = u1.toString(); + expected.get(1).uuid = u2.toString(); + + File test = write( + AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), + REFLECT, uuidSchema, r1, r2); + + Assert.assertEquals("Should read UUID objects", + Arrays.asList(r1, r2), + read(REFLECT, uuidSchema, test)); + + Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema()); + Assert.assertEquals("Should read uuid as Strings", + expected, + read(ReflectData.get(), uuidStringSchema, test)); + } + @Test public void testWriteNullableUUID() throws IOException { Schema nullableUuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) @@ -425,6 +500,45 @@ public void testWriteNullableUUID() throws IOException { read(REFLECT, nullableUuidStringSchema, test)); } + @Test + public void testWriteNullableUUIDWithParquetUUID() throws IOException { + Schema nullableUuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) + .fields().optionalString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema( + nullableUuidSchema.getField("uuid").schema().getTypes().get(1)); + + UUID u1 = UUID.randomUUID(); + UUID u2 = null; + + RecordWithUUID r1 = new RecordWithUUID(); + r1.uuid = u1; + RecordWithUUID r2 = new RecordWithUUID(); + r2.uuid = u2; + + List expected = Arrays.asList( + new RecordWithStringUUID(), new RecordWithStringUUID()); + expected.get(0).uuid = u1.toString(); + expected.get(1).uuid = null; + + File test = write( + AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), + REFLECT, nullableUuidSchema, r1, r2); + + Assert.assertEquals("Should read uuid as UUID objects", + Arrays.asList(r1, r2), + read(REFLECT, nullableUuidSchema, test)); + + Schema nullableUuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().optionalString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema( + nullableUuidStringSchema.getField("uuid").schema().getTypes().get(1)); + + Assert.assertEquals("Should read uuid as String without UUID conversion", + expected, + read(REFLECT, nullableUuidStringSchema, test)); + } + @Test public void testWriteUUIDMissingLogicalType() throws IOException { Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()) @@ -498,6 +612,42 @@ public void testReadUUIDGenericRecord() throws IOException { read(REFLECT, uuidStringSchema, test)); } + @Test + public void testReadUUIDGenericRecordWithParquetUUID() throws IOException { + Schema uuidSchema = SchemaBuilder.record("RecordWithUUID") + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + RecordWithStringUUID r1 = new RecordWithStringUUID(); + r1.uuid = u1.toString(); + RecordWithStringUUID r2 = new RecordWithStringUUID(); + r2.uuid = u2.toString(); + + List expected = Arrays.asList( + new GenericData.Record(uuidSchema), new GenericData.Record(uuidSchema)); + expected.get(0).put("uuid", u1); + expected.get(1).put("uuid", u2); + + File test = write( + AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), + uuidSchema, r1, r2); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, read(REFLECT, uuidSchema, test)); + + Schema uuidStringSchema = SchemaBuilder + .record(RecordWithStringUUID.class.getName()) + .fields().requiredString("uuid").endRecord(); + LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema()); + + Assert.assertEquals("Should not convert to UUID if accessor is String", + Arrays.asList(r1, r2), + read(REFLECT, uuidStringSchema, test)); + } + @Test public void testReadUUIDArray() throws IOException { Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName()) @@ -523,6 +673,31 @@ public void testReadUUIDArray() throws IOException { read(REFLECT, uuidArraySchema, test).get(0)); } + @Test + public void testReadUUIDArrayWithParquetUUID() throws IOException { + Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + LogicalTypes.uuid().addToSchema( + uuidArraySchema.getField("uuids").schema().getElementType()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord r = new GenericData.Record(uuidArraySchema); + r.put("uuids", Arrays.asList(u1.toString(), u2.toString())); + + RecordWithUUIDArray expected = new RecordWithUUIDArray(); + expected.uuids = new UUID[] {u1, u2}; + + File test = write(AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), uuidArraySchema, r); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, + read(REFLECT, uuidArraySchema, test).get(0)); + } + @Test public void testWriteUUIDArray() throws IOException { Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName()) @@ -558,6 +733,43 @@ public void testWriteUUIDArray() throws IOException { read(ReflectData.get(), stringArraySchema, test).get(0)); } + @Test + public void testWriteUUIDArrayWithParquetUUID() throws IOException { + Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + LogicalTypes.uuid().addToSchema( + uuidArraySchema.getField("uuids").schema().getElementType()); + + Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray") + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + LogicalTypes.uuid().addToSchema( + stringArraySchema.getField("uuids").schema().getElementType()); + stringArraySchema.getField("uuids").schema() + .addProp(SpecificData.CLASS_PROP, List.class.getName()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord expected = new GenericData.Record(stringArraySchema); + List uuids = new ArrayList(); + uuids.add(u1.toString()); + uuids.add(u2.toString()); + expected.put("uuids", uuids); + + RecordWithUUIDArray r = new RecordWithUUIDArray(); + r.uuids = new UUID[] {u1, u2}; + + File test = write(AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), REFLECT, uuidArraySchema, r); + + Assert.assertEquals("Should read UUIDs as Strings", + expected, + read(ReflectData.get(), stringArraySchema, test).get(0)); + } + @Test public void testReadUUIDList() throws IOException { Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName()) @@ -584,6 +796,32 @@ public void testReadUUIDList() throws IOException { expected, read(REFLECT, uuidListSchema, test).get(0)); } + @Test + public void testReadUUIDListWithParquetUUID() throws IOException { + Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + uuidListSchema.getField("uuids").schema().addProp( + SpecificData.CLASS_PROP, List.class.getName()); + LogicalTypes.uuid().addToSchema( + uuidListSchema.getField("uuids").schema().getElementType()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord r = new GenericData.Record(uuidListSchema); + r.put("uuids", Arrays.asList(u1.toString(), u2.toString())); + + RecordWithUUIDList expected = new RecordWithUUIDList(); + expected.uuids = Arrays.asList(u1, u2); + + File test = write(AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), uuidListSchema, r); + + Assert.assertEquals("Should convert Strings to UUIDs", + expected, read(REFLECT, uuidListSchema, test).get(0)); + } + @Test public void testWriteUUIDList() throws IOException { Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName()) @@ -618,14 +856,61 @@ public void testWriteUUIDList() throws IOException { read(REFLECT, stringArraySchema, test).get(0)); } + @Test + public void testWriteUUIDListWithParquetUUID() throws IOException { + Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName()) + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + uuidListSchema.getField("uuids").schema().addProp( + SpecificData.CLASS_PROP, List.class.getName()); + LogicalTypes.uuid().addToSchema( + uuidListSchema.getField("uuids").schema().getElementType()); + + Schema reflectSchema = SchemaBuilder.record("RecordWithUUIDArray") + .fields() + .name("uuids").type().array().items().stringType().noDefault() + .endRecord(); + reflectSchema.getField("uuids").schema() + .addProp(SpecificData.CLASS_PROP, List.class.getName()); + LogicalTypes.uuid().addToSchema( + reflectSchema.getField("uuids").schema().getElementType()); + + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); + + GenericRecord expected = new GenericData.Record(reflectSchema); + expected.put("uuids", Arrays.asList(u1, u2)); + + RecordWithUUIDList r = new RecordWithUUIDList(); + r.uuids = Arrays.asList(u1, u2); + + File test = write(AvroTestUtil.conf(AvroWriteSupport.WRITE_PARQUET_UUID, true), REFLECT, uuidListSchema, r); + + Assert.assertEquals("Should read UUID objects", + expected, + read(REFLECT, reflectSchema, test).get(0)); + } + + @SuppressWarnings("unchecked") private File write(Schema schema, D... data) throws IOException { return write(ReflectData.get(), schema, data); } + @SuppressWarnings("unchecked") + private File write(Configuration conf, Schema schema, D... data) throws IOException { + return write(conf, ReflectData.get(), schema, data); + } + @SuppressWarnings("unchecked") private File write(GenericData model, Schema schema, D... data) throws IOException { return AvroTestUtil.write(temp, model, schema, data); } + + @SuppressWarnings("unchecked") + private File write(Configuration conf, GenericData model, Schema schema, D... data) throws IOException { + return AvroTestUtil.write(temp, conf, model, schema, data); + } } class RecordWithUUID { @@ -645,7 +930,7 @@ public boolean equals(Object obj) { return false; } RecordWithUUID that = (RecordWithUUID) obj; - return this.uuid.equals(that.uuid); + return Objects.equals(this.uuid, that.uuid); } } @@ -666,7 +951,7 @@ public boolean equals(Object obj) { return false; } RecordWithStringUUID that = (RecordWithStringUUID) obj; - return this.uuid.equals(that.uuid); + return Objects.equals(this.uuid, that.uuid); } } diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 9f7479a6c1..44fc1a2ea5 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -41,6 +41,13 @@ parquet-common ${project.version} + + org.apache.parquet + parquet-common + ${project.version} + test-jar + test + org.apache.parquet parquet-encoding diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java index 5f61ed60b4..495b6dcfcf 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java @@ -130,6 +130,12 @@ protected LogicalTypeAnnotation fromString(List params) { return bsonType(); } }, + UUID { + @Override + protected LogicalTypeAnnotation fromString(List params) { + return uuidType(); + } + }, INTERVAL { @Override protected LogicalTypeAnnotation fromString(List params) { @@ -286,6 +292,10 @@ public static BsonLogicalTypeAnnotation bsonType() { return BsonLogicalTypeAnnotation.INSTANCE; } + public static UUIDLogicalTypeAnnotation uuidType() { + return UUIDLogicalTypeAnnotation.INSTANCE; + } + public static class StringLogicalTypeAnnotation extends LogicalTypeAnnotation { private static final StringLogicalTypeAnnotation INSTANCE = new StringLogicalTypeAnnotation(); @@ -861,6 +871,36 @@ PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { } } + public static class UUIDLogicalTypeAnnotation extends LogicalTypeAnnotation { + private static final UUIDLogicalTypeAnnotation INSTANCE = new UUIDLogicalTypeAnnotation(); + public static final int BYTES = 16; + + private UUIDLogicalTypeAnnotation() { + } + + @Override + @InterfaceAudience.Private + public OriginalType toOriginalType() { + // No OriginalType for UUID + return null; + } + + @Override + public Optional accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) { + return logicalTypeAnnotationVisitor.visit(this); + } + + @Override + LogicalTypeToken getType() { + return LogicalTypeToken.UUID; + } + + @Override + PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { + return PrimitiveStringifier.UUID_STRINGIFIER; + } + } + // This logical type annotation is implemented to support backward compatibility with ConvertedType. // The new logical type representation in parquet-format doesn't have any interval type, // thus this annotation is mapped to UNKNOWN. @@ -1009,6 +1049,10 @@ default Optional visit(BsonLogicalTypeAnnotation bsonLogicalType) { return empty(); } + default Optional visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + return empty(); + } + default Optional visit(IntervalLogicalTypeAnnotation intervalLogicalType) { return empty(); } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java index 4705ad94eb..29c62354ee 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java @@ -32,6 +32,7 @@ import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.naming.OperationNotSupportedException; @@ -421,4 +422,30 @@ private String stringifyWithScale(BigInteger i) { } }; } + + static final PrimitiveStringifier UUID_STRINGIFIER = new PrimitiveStringifier("UUID_STRINGIFIER") { + private final char[] digit = "0123456789abcdef".toCharArray(); + @Override + public String stringify(Binary value) { + byte[] bytes = value.getBytesUnsafe(); + StringBuilder builder = new StringBuilder(36); + appendHex(bytes, 0, 4, builder); + builder.append('-'); + appendHex(bytes, 4, 2, builder); + builder.append('-'); + appendHex(bytes, 6, 2, builder); + builder.append('-'); + appendHex(bytes, 8, 2, builder); + builder.append('-'); + appendHex(bytes, 10, 6, builder); + return builder.toString(); + } + + private void appendHex(byte[] array, int offset, int length, StringBuilder builder) { + for (int i = offset, n = offset + length; i < n; ++i) { + int value = array[i] & 0xff; + builder.append(digit[value >>> 4]).append(digit[value & 0x0f]); + } + } + }; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 21502d813b..026f418357 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -32,6 +32,8 @@ import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import static java.util.Optional.empty; import static java.util.Optional.of; @@ -383,6 +385,11 @@ public Optional visit(LogicalTypeAnnotation.DecimalLogicalT public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); } + + @Override + public Optional visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); + } }).orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: " + logicalType)); } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index c62010a58f..05db65538e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -445,20 +445,22 @@ protected PrimitiveType build(String name) { logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { @Override public Optional visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - checkBinaryPrimitiveType(stringLogicalType); - return Optional.of(true); + return checkBinaryPrimitiveType(stringLogicalType); } @Override public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - checkBinaryPrimitiveType(jsonLogicalType); - return Optional.of(true); + return checkBinaryPrimitiveType(jsonLogicalType); } @Override public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - checkBinaryPrimitiveType(bsonLogicalType); - return Optional.of(true); + return checkBinaryPrimitiveType(bsonLogicalType); + } + + @Override + public Optional visit(LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return checkFixedPrimitiveType(LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.BYTES, uuidLogicalType); } @Override @@ -495,8 +497,7 @@ public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotatio @Override public Optional visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - checkInt32PrimitiveType(dateLogicalType); - return Optional.of(true); + return checkInt32PrimitiveType(dateLogicalType); } @Override @@ -536,41 +537,43 @@ public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation in @Override public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - checkInt64PrimitiveType(timestampLogicalType); - return Optional.of(true); + return checkInt64PrimitiveType(timestampLogicalType); } @Override public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { - Preconditions.checkState( - (primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) && - (length == 12), - "INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)"); - return Optional.of(true); + return checkFixedPrimitiveType(12, intervalLogicalType); } @Override public Optional visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return checkBinaryPrimitiveType(enumLogicalType); + } + + private Optional checkFixedPrimitiveType(int l, LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState( - primitiveType == PrimitiveTypeName.BINARY, - "ENUM can only annotate binary fields"); + primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && length == l, + logicalTypeAnnotation.toString() + " can only annotate FIXED_LEN_BYTE_ARRAY(" + l + ')'); return Optional.of(true); } - private void checkBinaryPrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) { + private Optional checkBinaryPrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState( primitiveType == PrimitiveTypeName.BINARY, - logicalTypeAnnotation.toString() + " can only annotate binary fields"); + logicalTypeAnnotation.toString() + " can only annotate BINARY"); + return Optional.of(true); } - private void checkInt32PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) { + private Optional checkInt32PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState(primitiveType == PrimitiveTypeName.INT32, logicalTypeAnnotation.toString() + " can only annotate INT32"); + return Optional.of(true); } - private void checkInt64PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) { + private Optional checkInt64PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState(primitiveType == PrimitiveTypeName.INT64, logicalTypeAnnotation.toString() + " can only annotate INT64"); + return Optional.of(true); } }).orElseThrow(() -> new IllegalStateException(logicalTypeAnnotation + " can not be applied to a primitive type")); } diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveStringifier.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveStringifier.java index b5de4f850e..ea8fcd40e4 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveStringifier.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveStringifier.java @@ -52,6 +52,7 @@ import java.util.TimeZone; import java.util.concurrent.TimeUnit; +import org.apache.parquet.TestUtils; import org.apache.parquet.io.api.Binary; import org.junit.Test; @@ -84,8 +85,7 @@ public void testDefaultStringifier() { assertEquals("null", stringifier.stringify(null)); assertEquals("0x", stringifier.stringify(Binary.EMPTY)); - assertEquals("0x0123456789ABCDEF", stringifier.stringify(Binary.fromConstantByteArray( - new byte[] { 0x01, 0x23, 0x45, 0x67, (byte) 0x89, (byte) 0xAB, (byte) 0xCD, (byte) 0xEF }))); + assertEquals("0x0123456789ABCDEF", stringifier.stringify(toBinary(0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF))); } @Test @@ -309,6 +309,40 @@ public void testDecimalStringifier() { checkThrowingUnsupportedException(stringifier, Integer.TYPE, Long.TYPE, Binary.class); } + @Test + public void testUUIDStringifier() { + PrimitiveStringifier stringifier = PrimitiveStringifier.UUID_STRINGIFIER; + + assertEquals("00112233-4455-6677-8899-aabbccddeeff", stringifier.stringify( + toBinary(0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff))); + assertEquals("00000000-0000-0000-0000-000000000000", stringifier.stringify( + toBinary(0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00))); + assertEquals("ffffffff-ffff-ffff-ffff-ffffffffffff", stringifier.stringify( + toBinary(0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff))); + assertEquals("0eb1497c-19b6-42bc-b028-b4b612bed141", stringifier.stringify( + toBinary(0x0e, 0xb1, 0x49, 0x7c, 0x19, 0xb6, 0x42, 0xbc, 0xb0, 0x28, 0xb4, 0xb6, 0x12, 0xbe, 0xd1, 0x41))); + + // Check that the stringifier does not care about the length, it always takes the first 16 bytes + assertEquals("87a09cca-3b1e-4a0a-9c77-591924c3b57b", stringifier.stringify( + toBinary(0x87, 0xa0, 0x9c, 0xca, 0x3b, 0x1e, 0x4a, 0x0a, 0x9c, 0x77, 0x59, 0x19, 0x24, 0xc3, 0xb5, 0x7b, 0x00, + 0x00, 0x00))); + + // As there is no validation implemented, if the 16 bytes is not available, the array will be over-indexed + TestUtils.assertThrows("Expected exception for over-indexing", ArrayIndexOutOfBoundsException.class, + () -> stringifier.stringify( + toBinary(0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee))); + + checkThrowingUnsupportedException(stringifier, Binary.class); + } + + private Binary toBinary(int...bytes) { + byte[] array = new byte[bytes.length]; + for (int i = 0; i < array.length; ++i) { + array[i] = (byte) bytes[i]; + } + return Binary.fromConstantByteArray(array); + } + private void checkThrowingUnsupportedException(PrimitiveStringifier stringifier, Class... excludes) { Set> set = new HashSet<>(asList(excludes)); if (!set.contains(Integer.TYPE)) { @@ -354,5 +388,4 @@ private void checkThrowingUnsupportedException(PrimitiveStringifier stringifier, } } } - } diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java index fe13e604b6..c6318670a2 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java @@ -36,6 +36,7 @@ import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; @@ -45,6 +46,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertEquals; public class TestTypeBuildersWithLogicalTypes { @Test @@ -389,6 +391,18 @@ public void testDecimalLogicalTypeWithDeprecatedPrecisionMismatch() { .precision(5).named("aDecimal"); } + @Test + public void testUUIDLogicalType() { + assertEquals( + "required fixed_len_byte_array(16) uuid_field (UUID)", + Types.required(FIXED_LEN_BYTE_ARRAY).length(16).as(uuidType()).named("uuid_field").toString()); + + assertThrows("Should fail with invalid length", IllegalStateException.class, + () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(10).as(uuidType()).named("uuid_field").toString()); + assertThrows("Should fail with invalid type", IllegalStateException.class, + () -> Types.required(BINARY).as(uuidType()).named("uuid_field").toString()); + } + /** * A convenience method to avoid a large number of @Test(expected=...) tests * @param message A String message to describe this assertion diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 390846307d..811c717bb9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -95,6 +95,7 @@ import org.apache.parquet.format.Statistics; import org.apache.parquet.format.Type; import org.apache.parquet.format.TypeDefinedOrder; +import org.apache.parquet.format.UUIDType; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -107,6 +108,8 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; @@ -452,6 +455,11 @@ public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotati return of(LogicalType.BSON(new BsonType())); } + @Override + public Optional visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + return of(LogicalType.UUID(new UUIDType())); + } + @Override public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { return of(LogicalType.UNKNOWN(new NullType())); @@ -835,6 +843,11 @@ public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation return of(SortOrder.UNSIGNED); } + @Override + public Optional visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + return of(SortOrder.UNSIGNED); + } + @Override public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { return of(SortOrder.UNSIGNED); @@ -1011,6 +1024,8 @@ LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) { case TIMESTAMP: TimestampType timestamp = type.getTIMESTAMP(); return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit)); + case UUID: + return LogicalTypeAnnotation.uuidType(); default: throw new RuntimeException("Unknown logical type " + type); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 256ea36fed..319bd2db2f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -34,6 +34,7 @@ import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -296,6 +297,8 @@ public void testLogicalToConvertedTypeConversion() { assertEquals(ConvertedType.JSON, parquetMetadataConverter.convertToConvertedType(jsonType())); assertEquals(ConvertedType.BSON, parquetMetadataConverter.convertToConvertedType(bsonType())); + assertNull(parquetMetadataConverter.convertToConvertedType(uuidType())); + assertEquals(ConvertedType.LIST, parquetMetadataConverter.convertToConvertedType(listType())); assertEquals(ConvertedType.MAP, parquetMetadataConverter.convertToConvertedType(mapType())); assertEquals(ConvertedType.MAP_KEY_VALUE, parquetMetadataConverter.convertToConvertedType(LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance())); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java index aac8e435a5..b13f36f141 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java @@ -28,6 +28,7 @@ import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType; import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; @@ -59,6 +60,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; @@ -116,6 +118,7 @@ public class TestColumnIndexes { Types.optional(INT64).as(timestampType(true, TimeUnit.MILLIS)).named("timestamp-millis"), Types.optional(INT64).as(timestampType(false, TimeUnit.NANOS)).named("timestamp-nanos"), Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(OriginalType.INTERVAL).named("interval"), + Types.optional(FIXED_LEN_BYTE_ARRAY).length(16).as(uuidType()).named("uuid"), Types.optional(BINARY).as(stringType()).named("always-null")); private static List> buildGenerators(int recordCount, Random random) { @@ -169,6 +172,7 @@ private static List> buildGenerators(int recordCount, Random random) sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++), sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++), sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 12), random, recordCount, fieldIndex++), + sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 16), random, recordCount, fieldIndex++), null); }