Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
Expand Down Expand Up @@ -54,6 +55,8 @@
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageTypeParser;
Expand All @@ -74,16 +77,19 @@ public class TestReadWrite {
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{ false }, // use the new converters
{ true } }; // use the old converters
{ false, false }, // use the new converters
{ true, false }, // use the old converters
{ false, true } }; // use a local disk location
return Arrays.asList(data);
}

private final boolean compat;
private final boolean local;
private final Configuration testConf = new Configuration();

public TestReadWrite(boolean compat) {
public TestReadWrite(boolean compat, boolean local) {
this.compat = compat;
this.local = local;
this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
testConf.setBoolean("parquet.avro.add-list-element-records", false);
testConf.setBoolean("parquet.avro.write-old-list-structure", false);
Expand All @@ -92,24 +98,20 @@ public TestReadWrite(boolean compat) {
@Test
public void testEmptyArray() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("array.avsc").openStream());
Resources.getResource("array.avsc").openStream());

// Write a record with an empty array.
List<Integer> emptyArray = new ArrayList<>();

Path file = new Path(createTempFile().getPath());
String file = createTempFile().getPath();

try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
.withConf(testConf)
.build()) {
try(ParquetWriter<GenericRecord> writer = writer(file, schema)) {
GenericData.Record record = new GenericRecordBuilder(schema)
.set("myarray", emptyArray).build();
writer.write(record);
}

try (AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(testConf, file)) {
try (ParquetReader<GenericRecord> reader = reader(file)) {
GenericRecord nextRecord = reader.read();

assertNotNull(nextRecord);
Expand All @@ -120,24 +122,20 @@ public void testEmptyArray() throws Exception {
@Test
public void testEmptyMap() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());
Resources.getResource("map.avsc").openStream());

Path file = new Path(createTempFile().getPath());
String file = createTempFile().getPath();
ImmutableMap<String, Integer> emptyMap = new ImmutableMap.Builder<String, Integer>().build();

try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
.withConf(testConf)
.build()) {
try (ParquetWriter<GenericRecord> writer = writer(file, schema)) {

// Write a record with an empty map.
GenericData.Record record = new GenericRecordBuilder(schema)
.set("mymap", emptyMap).build();
writer.write(record);
}

try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file)) {
try(ParquetReader<GenericRecord> reader = reader(file)) {
GenericRecord nextRecord = reader.read();

assertNotNull(nextRecord);
Expand Down Expand Up @@ -704,12 +702,10 @@ public void testDuplicatedValuesWithDictionary() throws Exception {
public void testNestedLists() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("nested_array.avsc").openStream());
Path file = new Path(createTempFile().getPath());
String file = createTempFile().getPath();

// Parquet writer
ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(schema)
.withConf(testConf)
.build();
ParquetWriter parquetWriter = writer(file, schema);

Schema innerRecordSchema = schema.getField("l1").schema().getTypes()
.get(1).getElementType().getTypes().get(1);
Expand All @@ -723,7 +719,7 @@ public void testNestedLists() throws Exception {
parquetWriter.write(record);
parquetWriter.close();

AvroParquetReader<GenericRecord> reader = new AvroParquetReader(testConf, file);
ParquetReader<GenericRecord> reader = reader(file);
GenericRecord nextRecord = reader.read();

assertNotNull(nextRecord);
Expand Down Expand Up @@ -867,6 +863,34 @@ private File createTempFile() throws IOException {
return tmp;
}

private ParquetWriter<GenericRecord> writer(String file, Schema schema) throws IOException {
if (local) {
return AvroParquetWriter
.<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
.withSchema(schema)
.withConf(testConf)
.build();
} else {
return AvroParquetWriter
.<GenericRecord>builder(new Path(file))
.withSchema(schema)
.withConf(testConf)
.build();
}
}

private ParquetReader<GenericRecord> reader(String file) throws IOException {
if (local) {
return AvroParquetReader
.<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
.withDataModel(GenericData.get())
.withConf(testConf)
.build();
} else {
return new AvroParquetReader(testConf, new Path(file));
}
}

/**
* Return a String or Utf8 depending on whether compatibility is on
*/
Expand Down
102 changes: 102 additions & 0 deletions parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.io;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Path;

/**
* {@code LocalInputFile} is an implementation needed by Parquet to read
* from local data files using {@link SeekableInputStream} instances.
*/
public class LocalInputFile implements InputFile {

private final Path path;
private long length = -1;

public LocalInputFile(Path file) {
path = file;
}

@Override
public long getLength() throws IOException {
if (length == -1) {
try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) {
length = file.length();
}
}
return length;
}

@Override
public SeekableInputStream newStream() throws IOException {

return new SeekableInputStream() {

private final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r");

@Override
public int read() throws IOException {
return randomAccessFile.read();
}

@Override
public long getPos() throws IOException {
return randomAccessFile.getFilePointer();
}

@Override
public void seek(long newPos) throws IOException {
randomAccessFile.seek(newPos);
}

@Override
public void readFully(byte[] bytes) throws IOException {
randomAccessFile.readFully(bytes);
}

@Override
public void readFully(byte[] bytes, int start, int len) throws IOException {
randomAccessFile.readFully(bytes, start, len);
}

@Override
public int read(ByteBuffer buf) throws IOException {
byte[] buffer = new byte[buf.remaining()];
int code = read(buffer);
buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
return code;
}

@Override
public void readFully(ByteBuffer buf) throws IOException {
byte[] buffer = new byte[buf.remaining()];
readFully(buffer);
buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
}

@Override
public void close() throws IOException {
randomAccessFile.close();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.io;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

/**
* {@code LocalOutputFile} is an implementation needed by Parquet to write
* to local data files using {@link PositionOutputStream} instances.
*/
public class LocalOutputFile implements OutputFile {

private class LocalPositionOutputStream extends PositionOutputStream {

private final BufferedOutputStream stream;
private long pos = 0;

public LocalPositionOutputStream(int buffer, StandardOpenOption... openOption) throws IOException {
stream = new BufferedOutputStream(Files.newOutputStream(path, openOption), buffer);
}

@Override
public long getPos() {
return pos;
}

@Override
public void write(int data) throws IOException {
pos++;
stream.write(data);
}

@Override
public void write(byte[] data) throws IOException {
pos += data.length;
stream.write(data);
}

@Override
public void write(byte[] data, int off, int len) throws IOException {
pos += len;
stream.write(data, off, len);
}

@Override
public void flush() throws IOException {
stream.flush();
}

@Override
public void close() throws IOException {
stream.close();
}
}

private final Path path;

public LocalOutputFile(Path file) {
path = file;
}

@Override
public PositionOutputStream create(long buffer) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a test case for create and createOrOverwrite to make sure they are as expected?

return new LocalPositionOutputStream((int) buffer, StandardOpenOption.CREATE_NEW);
}

@Override
public PositionOutputStream createOrOverwrite(long buffer) throws IOException {
return new LocalPositionOutputStream((int) buffer, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
}

@Override
public boolean supportsBlockSize() {
return true;
}

@Override
public long defaultBlockSize() {
return 512;
}

@Override
public String getPath() {
return path.toString();
}
}
Loading