Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions parquet-avro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add it using separate jira?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we separate jiras in similar cases to not make cherry-picking hard in case of the related changed needs to be on another branch as well. In this case this is only a documentation so should not cause any troubles. It would be cleaner if this documentation would have already been existed and I've had to add the docs of the new keys only (which would clearly be part of this change).
If you have a strong opinion to separate this to another change I'm happy to do so, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

Apache Avro integration
======

**TODO**: Add description and examples how to use parquet-avro

## Available options via Hadoop Configuration

### Configuration for reading

| Name | Type | Description |
|-----------------------------------------|-----------|----------------------------------------------------------------------|
| `parquet.avro.data.supplier` | `Class` | The implementation of the interface org.apache.parquet.avro.AvroDataSupplier. Available implementations in the library: GenericDataSupplier, ReflectDataSupplier, SpecificDataSupplier.<br/>The default value is `org.apache.parquet.avro.SpecificDataSupplier` |
| `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. |
| `parquet.avro.projection` | `String` | The Avro schema to be used for projection. |
| `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.<br/>The default value is `true`. |

### Configuration for writing

| Name | Type | Description |
|-----------------------------------------|-----------|----------------------------------------------------------------------|
| `parquet.avro.write.data.supplier` | `Class` | The implementation of the interface org.apache.parquet.avro.AvroDataSupplier. Available implementations in the library: GenericDataSupplier, ReflectDataSupplier, SpecificDataSupplier.<br/>The default value is `org.apache.parquet.avro.SpecificDataSupplier` |
| `parquet.avro.schema` | `String` | The Avro schema to be used for generating the Parquet schema of the file. |
| `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.<br/>The default value is `true` |
| `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schmea is a list element.<br/>The default value is `true`. |
| `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.<br/>The default value is `false`. |
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.PrimitiveStringifier;
import org.apache.parquet.schema.PrimitiveType;

public class AvroConverters {

Expand Down Expand Up @@ -314,4 +316,18 @@ public Object convert(Binary binary) {
return model.createFixed(null /* reuse */, binary.getBytes(), schema);
}
}

static final class FieldUUIDConverter extends BinaryConverter<String> {
private final PrimitiveStringifier stringifier;

public FieldUUIDConverter(ParentValueContainer parent, PrimitiveType type) {
super(parent);
stringifier = type.stringifier();
}

@Override
public String convert(Binary binary) {
return stringifier.stringify(binary);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -33,28 +33,24 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.LinkedHashMap;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.AvroIgnore;
import org.apache.avro.reflect.AvroName;
import org.apache.avro.reflect.AvroSchema;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.Stringable;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.ClassUtils;
import org.apache.parquet.Preconditions;
import org.apache.parquet.avro.AvroConverters.FieldStringConverter;
import org.apache.parquet.avro.AvroConverters.FieldStringableConverter;
import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
Expand Down Expand Up @@ -280,6 +276,9 @@ private static Converter newConverter(Schema schema, Type type,
}
return new AvroConverters.FieldByteBufferConverter(parent);
case STRING:
if (logicalType != null && logicalType.getName().equals(LogicalTypes.uuid().getName())) {
return new AvroConverters.FieldUUIDConverter(parent, type.asPrimitiveType());
}
return newStringConverter(schema, model, parent);
case RECORD:
return new AvroRecordConverter(parent, type.asGroupType(), schema, model);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -44,6 +45,8 @@
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID_DEFAULT;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
Expand All @@ -52,6 +55,7 @@
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.uuidType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

Expand All @@ -69,10 +73,10 @@ public class AvroSchemaConverter {

private final boolean assumeRepeatedIsListElement;
private final boolean writeOldListStructure;
private final boolean writeParquetUUID;

public AvroSchemaConverter() {
this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
}

/**
Expand All @@ -84,13 +88,15 @@ public AvroSchemaConverter() {
AvroSchemaConverter(boolean assumeRepeatedIsListElement) {
this.assumeRepeatedIsListElement = assumeRepeatedIsListElement;
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
}

public AvroSchemaConverter(Configuration conf) {
this.assumeRepeatedIsListElement = conf.getBoolean(
ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
this.writeOldListStructure = conf.getBoolean(
WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
}

/**
Expand Down Expand Up @@ -145,6 +151,7 @@ private Type convertField(String fieldName, Schema schema) {
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
if (type.equals(Schema.Type.BOOLEAN)) {
builder = Types.primitive(BOOLEAN, repetition);
} else if (type.equals(Schema.Type.INT)) {
Expand All @@ -158,7 +165,12 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
} else if (type.equals(Schema.Type.BYTES)) {
builder = Types.primitive(BINARY, repetition);
} else if (type.equals(Schema.Type.STRING)) {
builder = Types.primitive(BINARY, repetition).as(stringType());
if (logicalType != null && logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.BYTES);
} else {
builder = Types.primitive(BINARY, repetition).as(stringType());
}
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
} else if (type.equals(Schema.Type.ENUM)) {
Expand Down Expand Up @@ -186,7 +198,6 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet

// schema translation can only be done for known logical types because this
// creates an equivalence
LogicalType logicalType = schema.getLogicalType();
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Decimal) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
Expand Down Expand Up @@ -306,8 +317,12 @@ public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
}
@Override
public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
int size = parquetType.asPrimitiveType().getTypeLength();
return Schema.createFixed(parquetType.getName(), null, null, size);
if (annotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
return Schema.create(Schema.Type.STRING);
} else {
int size = parquetType.asPrimitiveType().getTypeLength();
return Schema.createFixed(parquetType.getName(), null, null, size);
}
}
@Override
public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
Expand Down Expand Up @@ -419,6 +434,8 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
return timestampType(true, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return timestampType(true, MICROS);
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
return uuidType();
}
return null;
}
Expand Down Expand Up @@ -461,6 +478,11 @@ public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnn
}
return empty();
}

@Override
public Optional<LogicalType> visit(UUIDLogicalTypeAnnotation uuidLogicalType) {
return of(LogicalTypes.uuid());
}
}).orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
Expand All @@ -35,6 +37,7 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.hadoop.util.ReflectionUtils;
Expand All @@ -60,6 +63,8 @@ public static void setAvroDataSupplier(
public static final String WRITE_OLD_LIST_STRUCTURE =
"parquet.avro.write-old-list-structure";
static final boolean WRITE_OLD_LIST_STRUCTURE_DEFAULT = true;
public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid";
static final boolean WRITE_PARQUET_UUID_DEFAULT = false;

private static final String MAP_REPEATED_NAME = "key_value";
private static final String MAP_KEY_NAME = "key";
Expand Down Expand Up @@ -228,7 +233,7 @@ private <V> void writeMap(GroupType schema, Schema avroSchema,
recordConsumer.endGroup();
}

private void writeUnion(GroupType parquetSchema, Schema avroSchema,
private void writeUnion(GroupType parquetSchema, Schema avroSchema,
Object value) {
recordConsumer.startGroup();

Expand Down Expand Up @@ -343,7 +348,11 @@ private void writeValueWithoutConversion(Type type, Schema avroSchema, Object va
}
break;
case STRING:
recordConsumer.addBinary(fromAvroString(value));
if (type.asPrimitiveType().getLogicalTypeAnnotation() instanceof UUIDLogicalTypeAnnotation) {
recordConsumer.addBinary(fromUUIDString(value));
} else {
recordConsumer.addBinary(fromAvroString(value));
}
break;
case RECORD:
writeRecord(type.asGroupType(), avroSchema, value);
Expand All @@ -363,6 +372,20 @@ private void writeValueWithoutConversion(Type type, Schema avroSchema, Object va
}
}

private Binary fromUUIDString(Object value) {
byte[] data = new byte[UUIDLogicalTypeAnnotation.BYTES];
UUID uuid = UUID.fromString(value.toString());
writeLong(data, 0, uuid.getMostSignificantBits());
writeLong(data, Long.BYTES, uuid.getLeastSignificantBits());
return Binary.fromConstantByteArray(data);
}

private void writeLong(byte[] array, int offset, long value) {
for (int i = 0; i < Long.BYTES; ++i) {
array[i + offset] = (byte) (value >>> ((Long.BYTES - i - 1) * Byte.SIZE));
}
}

private Binary fromAvroString(Object value) {
if (value instanceof Utf8) {
Utf8 utf8 = (Utf8) value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;

Expand Down Expand Up @@ -82,15 +83,17 @@ public static GenericRecord instance(Schema schema, Object... pairs) {
}

public static <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
return read(new Configuration(false), model, schema, file);
}

public static <D> List<D> read(Configuration conf, GenericData model, Schema schema, File file) throws IOException {
List<D> data = new ArrayList<D>();
Configuration conf = new Configuration(false);
AvroReadSupport.setRequestedProjection(conf, schema);
AvroReadSupport.setAvroReadSchema(conf, schema);

try (ParquetReader<D> fileReader = AvroParquetReader
.<D>builder(new Path(file.toString()))
.<D>builder(HadoopInputFile.fromPath(new Path(file.toString()), conf))
.withDataModel(model) // reflect disables compatibility
.withConf(conf)
.build()) {
D datum;
while ((datum = fileReader.read()) != null) {
Expand All @@ -103,6 +106,12 @@ public static <D> List<D> read(GenericData model, Schema schema, File file) thro

@SuppressWarnings("unchecked")
public static <D> File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException {
return write(temp, new Configuration(false), model, schema, data);
}

@SuppressWarnings("unchecked")
public static <D> File write(TemporaryFolder temp, Configuration conf, GenericData model, Schema schema, D... data)
throws IOException {
File file = temp.newFile();
Assert.assertTrue(file.delete());

Expand All @@ -118,4 +127,10 @@ public static <D> File write(TemporaryFolder temp, GenericData model, Schema sch

return file;
}

public static Configuration conf(String name, boolean value) {
Configuration conf = new Configuration(false);
conf.setBoolean(name, value);
return conf;
}
}
Loading