Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -362,13 +364,14 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) {
// synthetic wrapper (must be a group with one field).
return true;
} else if (elementSchema != null &&
elementSchema.getType() == Schema.Type.RECORD &&
elementSchema.getFields().size() == 1 &&
elementSchema.getFields().get(0).name().equals(
repeatedType.asGroupType().getFieldName(0))) {
elementSchema.getType() == Schema.Type.RECORD) {
Set<String> fieldNames = new HashSet<String>();
for (Schema.Field field : elementSchema.getFields()) {
fieldNames.add(field.name());
}
// The repeated type must be the element type because it matches the
// structure of the Avro element's schema.
return true;
return fieldNames.contains(repeatedType.asGroupType().getFieldName(0));
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
*/
package org.apache.parquet.avro;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
Expand All @@ -32,14 +30,9 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.DirectWriterTest;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import static org.apache.parquet.avro.AvroTestUtil.array;
import static org.apache.parquet.avro.AvroTestUtil.field;
Expand All @@ -49,10 +42,7 @@
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;

public class TestArrayCompatibility {

@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
public class TestArrayCompatibility extends DirectWriterTest {

public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();

Expand Down Expand Up @@ -909,68 +899,6 @@ public void write(RecordConsumer rc) {
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
}

private interface DirectWriter {
public void write(RecordConsumer consumer);
}

private static class DirectWriteSupport extends WriteSupport<Void> {
private RecordConsumer recordConsumer;
private final MessageType type;
private final DirectWriter writer;
private final Map<String, String> metadata;

private DirectWriteSupport(MessageType type, DirectWriter writer,
Map<String, String> metadata) {
this.type = type;
this.writer = writer;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(type, metadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(Void record) {
writer.write(recordConsumer);
}
}

private Path writeDirect(String type, DirectWriter writer) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer);
}

private Path writeDirect(String type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
}

private Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
return writeDirect(type, writer, new HashMap<String, String>());
}

private Path writeDirect(MessageType type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
File temp = tempDir.newFile(UUID.randomUUID().toString());
temp.deleteOnExit();
temp.delete();

Path path = new Path(temp.getPath());

ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
path, new DirectWriteSupport(type, writer, metadata));
parquetWriter.write(null);
parquetWriter.close();

return path;
}

public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
Path path) throws IOException {
return new AvroParquetReader<T>(path);
Expand Down
11 changes: 11 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal> <!-- publish test-jar for other modules -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
102 changes: 102 additions & 0 deletions parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

public class DirectWriterTest {

@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();

protected interface DirectWriter {
public void write(RecordConsumer consumer);
}

protected Path writeDirect(String type, DirectWriter writer) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer);
}

protected Path writeDirect(String type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
}

protected Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
return writeDirect(type, writer, new HashMap<String, String>());
}

protected Path writeDirect(MessageType type, DirectWriter writer,
Map<String, String> metadata) throws IOException {
File temp = tempDir.newFile(UUID.randomUUID().toString());
temp.deleteOnExit();
temp.delete();

Path path = new Path(temp.getPath());

ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
path, new DirectWriteSupport(type, writer, metadata));
parquetWriter.write(null);
parquetWriter.close();

return path;
}

protected static class DirectWriteSupport extends WriteSupport<Void> {
private RecordConsumer recordConsumer;
private final MessageType type;
private final DirectWriter writer;
private final Map<String, String> metadata;

protected DirectWriteSupport(MessageType type, DirectWriter writer,
Map<String, String> metadata) {
this.type = type;
this.writer = writer;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(type, metadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(Void record) {
writer.write(recordConsumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.scrooge;

import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

Expand All @@ -31,16 +32,24 @@

public class ScroogeRecordConverter<T extends ThriftStruct> extends ThriftRecordConverter<T> {


/**
* This is for compatibility only.
* @deprecated will be removed in 2.x
*/
@Deprecated
public ScroogeRecordConverter(final Class<T> thriftClass, MessageType parquetSchema, StructType thriftType) {
this(thriftClass, parquetSchema, thriftType, null);
}

public ScroogeRecordConverter(final Class<T> thriftClass, MessageType parquetSchema, StructType thriftType, Configuration conf) {
super(new ThriftReader<T>() {
@SuppressWarnings("unchecked")
ThriftStructCodec<T> codec = (ThriftStructCodec<T>) getCodec(thriftClass);
@Override
public T readOneRecord(TProtocol protocol) throws TException {
return codec.decode(protocol);
}
}, thriftClass.getSimpleName(), parquetSchema, thriftType);
}, thriftClass.getSimpleName(), parquetSchema, thriftType, conf);
}

private static ThriftStructCodec<?> getCodec(Class<?> klass) {
Expand Down
7 changes: 7 additions & 0 deletions parquet-thrift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@
<version>${thrift.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Loading