Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
Expand Down Expand Up @@ -753,13 +755,14 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) {
// synthetic wrapper. Must be a group with one optional or required field
return true;
} else if (elementSchema != null &&
elementSchema.getType() == Schema.Type.RECORD &&
elementSchema.getFields().size() == 1 &&
elementSchema.getFields().get(0).name().equals(
repeatedType.asGroupType().getFieldName(0))) {
elementSchema.getType() == Schema.Type.RECORD) {
Set<String> fieldNames = new HashSet<String>();
for (Schema.Field field : elementSchema.getFields()) {
fieldNames.add(field.name());
}
// The repeated type must be the element type because it matches the
// structure of the Avro element's schema.
return true;
return fieldNames.contains(repeatedType.asGroupType().getFieldName(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the purpose of this change. Is this mainly for more robust support for schema evolution? Take the following Avro schema and Parquet schema as an example:

// Avro
record Struct {
  int f0;
  int f1;
}

// Parquet
message StructArray {
  required struct_array (LIST) {
    repeated group array {
      required int32 f0;
    }
  }
}

According to the condition of this if branch, we should interpret struct_array as a LIST of Avro Struct, with all f1 field set as null. However, f1 defined in Struct isn't optional and can't be null.

Did I miss something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My last comment is a little bit ambiguous. In the example I made, the Avro record Struct is the elementSchema, while the Parquet repeated group array is the repeatedType in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what is missing here is that we should check all the missing fields are indeed optional, unless this constraint has already been guaranteed somewhere else (then let's document it in the comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the case where only one field of a nested record is projected. Avro has a projection schema and an expected schema so they can differ. Previously, the logic to match the requested schema (elementSchema) with the file's structure checked whether the name of the single field matched the name of the single field of the element schema. But that left out the case where the two do match (as in your example) but the requested schema has an optional field that should be defaulted.

Whether the other field is optional doesn't matter at this point in the code. Avro fills in default values later and throws an exception if something is missing and has no default at that point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, this totally makes sense. Thanks for the explanation!

}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
*/
package org.apache.parquet.avro;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
Expand All @@ -32,14 +30,9 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.DirectWriterTest;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import static org.apache.parquet.avro.AvroTestUtil.array;
import static org.apache.parquet.avro.AvroTestUtil.field;
Expand All @@ -49,10 +42,7 @@
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;

public class TestArrayCompatibility {

@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
public class TestArrayCompatibility extends DirectWriterTest {

public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();

Expand Down Expand Up @@ -1045,68 +1035,6 @@ public void write(RecordConsumer rc) {
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
}

private interface DirectWriter {
public void write(RecordConsumer consumer);
}

private static class DirectWriteSupport extends WriteSupport<Void> {
private RecordConsumer recordConsumer;
private final MessageType type;
private final DirectWriter writer;
private final Map<String, String> metadata;

private DirectWriteSupport(MessageType type, DirectWriter writer,
Map<String, String> metadata) {
this.type = type;
this.writer = writer;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(type, metadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(Void record) {
writer.write(recordConsumer);
}
}

private Path writeDirect(String type, DirectWriter writer) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer);
}

private Path writeDirect(String type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
}

private Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
return writeDirect(type, writer, new HashMap<String, String>());
}

private Path writeDirect(MessageType type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
File temp = tempDir.newFile(UUID.randomUUID().toString());
temp.deleteOnExit();
temp.delete();

Path path = new Path(temp.getPath());

ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
path, new DirectWriteSupport(type, writer, metadata));
parquetWriter.write(null);
parquetWriter.close();

return path;
}

public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
Path path) throws IOException {
return new AvroParquetReader<T>(path);
Expand Down
102 changes: 102 additions & 0 deletions parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

public class DirectWriterTest {

@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();

protected interface DirectWriter {
public void write(RecordConsumer consumer);
}

protected Path writeDirect(String type, DirectWriter writer) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer);
}

protected Path writeDirect(String type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
}

protected Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
return writeDirect(type, writer, new HashMap<String, String>());
}

protected Path writeDirect(MessageType type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
File temp = tempDir.newFile(UUID.randomUUID().toString());
temp.deleteOnExit();
temp.delete();

Path path = new Path(temp.getPath());

ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
path, new DirectWriteSupport(type, writer, metadata));
parquetWriter.write(null);
parquetWriter.close();

return path;
}

protected static class DirectWriteSupport extends WriteSupport<Void> {
private RecordConsumer recordConsumer;
private final MessageType type;
private final DirectWriter writer;
private final Map<String, String> metadata;

protected DirectWriteSupport(MessageType type, DirectWriter writer,
Map<String, String> metadata) {
this.type = type;
this.writer = writer;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(type, metadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(Void record) {
writer.write(recordConsumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.scrooge;

import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

Expand All @@ -31,16 +32,24 @@

public class ScroogeRecordConverter<T extends ThriftStruct> extends ThriftRecordConverter<T> {


/**
* This is for compatibility only.
* @deprecated will be removed in 2.x
*/
@Deprecated
public ScroogeRecordConverter(final Class<T> thriftClass, MessageType parquetSchema, StructType thriftType) {
this(thriftClass, parquetSchema, thriftType, null);
}

public ScroogeRecordConverter(final Class<T> thriftClass, MessageType parquetSchema, StructType thriftType, Configuration conf) {
super(new ThriftReader<T>() {
@SuppressWarnings("unchecked")
ThriftStructCodec<T> codec = (ThriftStructCodec<T>) getCodec(thriftClass);
@Override
public T readOneRecord(TProtocol protocol) throws TException {
return codec.decode(protocol);
}
}, thriftClass.getSimpleName(), parquetSchema, thriftType);
}, thriftClass.getSimpleName(), parquetSchema, thriftType, conf);
}

private static ThriftStructCodec<?> getCodec(Class<?> klass) {
Expand Down
7 changes: 7 additions & 0 deletions parquet-thrift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Loading