Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@
</dependencies>

<build>
<resources>
<resource>
<directory>src/test/avro</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
Expand All @@ -97,7 +105,15 @@
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>compile-avsc</id>
<phase>generate-test-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
<execution>
<id>compile-idl</id>
<phase>generate-test-sources</phase>
<goals>
<goal>idl-protocol</goal>
Expand Down
118 changes: 89 additions & 29 deletions parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.parquet.avro;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
Expand All @@ -44,6 +48,40 @@ public AvroPrimitiveConverter(ParentValueContainer parent) {
}
}

abstract static class BinaryConverter<T> extends AvroPrimitiveConverter {
private T[] dict = null;

public BinaryConverter(ParentValueContainer parent) {
super(parent);
}

public abstract T convert(Binary binary);

@Override
public void addBinary(Binary value) {
parent.add(convert(value));
}

@Override
public boolean hasDictionarySupport() {
return true;
}

@Override
@SuppressWarnings("unchecked")
public void setDictionary(Dictionary dictionary) {
dict = (T[]) new Object[dictionary.getMaxId() + 1];
for (int i = 0; i <= dictionary.getMaxId(); i++) {
dict[i] = convert(dictionary.decodeToBinary(i));
}
}

@Override
public void addValueFromDictionary(int dictionaryId) {
parent.add(dict[dictionaryId]);
}
}

static final class FieldByteConverter extends AvroPrimitiveConverter {
public FieldByteConverter(ParentValueContainer parent) {
super(parent);
Expand All @@ -54,6 +92,7 @@ public void addInt(int value) {
parent.addByte((byte) value);
}
}

static final class FieldShortConverter extends AvroPrimitiveConverter {
public FieldShortConverter(ParentValueContainer parent) {
super(parent);
Expand Down Expand Up @@ -133,7 +172,6 @@ final public void addLong(long value) {
final public void addFloat(float value) {
parent.addFloat(value);
}

}

static final class FieldDoubleConverter extends AvroPrimitiveConverter {
Expand Down Expand Up @@ -162,62 +200,84 @@ final public void addDouble(double value) {
}
}

static final class FieldByteArrayConverter extends AvroPrimitiveConverter {
static final class FieldByteArrayConverter extends BinaryConverter<byte[]> {
public FieldByteArrayConverter(ParentValueContainer parent) {
super(parent);
}

@Override
final public void addBinary(Binary value) {
parent.add(value.getBytes());
public byte[] convert(Binary binary) {
return binary.getBytes();
}
}

static final class FieldByteBufferConverter extends AvroPrimitiveConverter {
static final class FieldByteBufferConverter extends BinaryConverter<ByteBuffer> {
public FieldByteBufferConverter(ParentValueContainer parent) {
super(parent);
}

@Override
final public void addBinary(Binary value) {
parent.add(ByteBuffer.wrap(value.getBytes()));
public ByteBuffer convert(Binary binary) {
return ByteBuffer.wrap(binary.getBytes());
}
}

static final class FieldStringConverter extends AvroPrimitiveConverter {
// TODO: dictionary support should be generic and provided by a parent
// TODO: this always produces strings, but should respect avro.java.string
private String[] dict;

static final class FieldStringConverter extends BinaryConverter<String> {
public FieldStringConverter(ParentValueContainer parent) {
super(parent);
}

@Override
final public void addBinary(Binary value) {
parent.add(value.toStringUsingUTF8());
public String convert(Binary binary) {
return binary.toStringUsingUTF8();
}
}

@Override
public boolean hasDictionarySupport() {
return true;
static final class FieldUTF8Converter extends BinaryConverter<Utf8> {
public FieldUTF8Converter(ParentValueContainer parent) {
super(parent);
}

@Override
public void setDictionary(Dictionary dictionary) {
dict = new String[dictionary.getMaxId() + 1];
for (int i = 0; i <= dictionary.getMaxId(); i++) {
dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
public Utf8 convert(Binary binary) {
return new Utf8(binary.getBytes());
}
}

static final class FieldStringableConverter extends BinaryConverter<Object> {
private final String stringableName;
private final Constructor<?> ctor;

public FieldStringableConverter(ParentValueContainer parent,
Class<?> stringableClass) {
super(parent);
stringableName = stringableClass.getName();
try {
this.ctor = stringableClass.getConstructor(String.class);
} catch (NoSuchMethodException e) {
throw new ParquetDecodingException(
"Unable to get String constructor for " + stringableName, e);
}
}

@Override
public void addValueFromDictionary(int dictionaryId) {
parent.add(dict[dictionaryId]);
public Object convert(Binary binary) {
try {
return ctor.newInstance(binary.toStringUsingUTF8());
} catch (InstantiationException e) {
throw new ParquetDecodingException(
"Cannot convert binary to " + stringableName, e);
} catch (IllegalAccessException e) {
throw new ParquetDecodingException(
"Cannot convert binary to " + stringableName, e);
} catch (InvocationTargetException e) {
throw new ParquetDecodingException(
"Cannot convert binary to " + stringableName, e);
}
}
}

static final class FieldEnumConverter extends AvroPrimitiveConverter {
static final class FieldEnumConverter extends BinaryConverter<Object> {
private final Schema schema;
private final GenericData model;

Expand All @@ -229,12 +289,12 @@ public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema,
}

@Override
final public void addBinary(Binary value) {
parent.add(model.createEnum(value.toStringUsingUTF8(), schema));
public Object convert(Binary binary) {
return model.createEnum(binary.toStringUsingUTF8(), schema);
}
}

static final class FieldFixedConverter extends AvroPrimitiveConverter {
static final class FieldFixedConverter extends BinaryConverter<Object> {
private final Schema schema;
private final GenericData model;

Expand All @@ -246,8 +306,8 @@ public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema,
}

@Override
final public void addBinary(Binary value) {
parent.add(model.createFixed(null /* reuse */, value.getBytes(), schema));
public Object convert(Binary binary) {
return model.createFixed(null /* reuse */, binary.getBytes(), schema);
}
}
}
Loading