diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 6484ab4a62..81e751aba5 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -27,6 +27,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -54,6 +55,8 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageTypeParser; @@ -74,16 +77,19 @@ public class TestReadWrite { @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { - { false }, // use the new converters - { true } }; // use the old converters + { false, false }, // use the new converters + { true, false }, // use the old converters + { false, true } }; // use a local disk location return Arrays.asList(data); } private final boolean compat; + private final boolean local; private final Configuration testConf = new Configuration(); - public TestReadWrite(boolean compat) { + public TestReadWrite(boolean compat, boolean local) { this.compat = compat; + this.local = local; this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); testConf.setBoolean("parquet.avro.add-list-element-records", false); testConf.setBoolean("parquet.avro.write-old-list-structure", false); @@ -92,24 +98,20 @@ public TestReadWrite(boolean compat) { @Test public void testEmptyArray() throws Exception { Schema schema = new Schema.Parser().parse( - Resources.getResource("array.avsc").openStream()); + Resources.getResource("array.avsc").openStream()); // Write a record with an empty array. List emptyArray = new ArrayList<>(); - Path file = new Path(createTempFile().getPath()); + String file = createTempFile().getPath(); - try(ParquetWriter writer = AvroParquetWriter - .builder(file) - .withSchema(schema) - .withConf(testConf) - .build()) { + try(ParquetWriter writer = writer(file, schema)) { GenericData.Record record = new GenericRecordBuilder(schema) .set("myarray", emptyArray).build(); writer.write(record); } - try (AvroParquetReader reader = new AvroParquetReader<>(testConf, file)) { + try (ParquetReader reader = reader(file)) { GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); @@ -120,16 +122,12 @@ public void testEmptyArray() throws Exception { @Test public void testEmptyMap() throws Exception { Schema schema = new Schema.Parser().parse( - Resources.getResource("map.avsc").openStream()); + Resources.getResource("map.avsc").openStream()); - Path file = new Path(createTempFile().getPath()); + String file = createTempFile().getPath(); ImmutableMap emptyMap = new ImmutableMap.Builder().build(); - try(ParquetWriter writer = AvroParquetWriter - .builder(file) - .withSchema(schema) - .withConf(testConf) - .build()) { + try (ParquetWriter writer = writer(file, schema)) { // Write a record with an empty map. GenericData.Record record = new GenericRecordBuilder(schema) @@ -137,7 +135,7 @@ public void testEmptyMap() throws Exception { writer.write(record); } - try(AvroParquetReader reader = new AvroParquetReader(testConf, file)) { + try(ParquetReader reader = reader(file)) { GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); @@ -704,12 +702,10 @@ public void testDuplicatedValuesWithDictionary() throws Exception { public void testNestedLists() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("nested_array.avsc").openStream()); - Path file = new Path(createTempFile().getPath()); + String file = createTempFile().getPath(); // Parquet writer - ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(schema) - .withConf(testConf) - .build(); + ParquetWriter parquetWriter = writer(file, schema); Schema innerRecordSchema = schema.getField("l1").schema().getTypes() .get(1).getElementType().getTypes().get(1); @@ -723,7 +719,7 @@ public void testNestedLists() throws Exception { parquetWriter.write(record); parquetWriter.close(); - AvroParquetReader reader = new AvroParquetReader(testConf, file); + ParquetReader reader = reader(file); GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); @@ -867,6 +863,34 @@ private File createTempFile() throws IOException { return tmp; } + private ParquetWriter writer(String file, Schema schema) throws IOException { + if (local) { + return AvroParquetWriter + .builder(new LocalOutputFile(Paths.get(file))) + .withSchema(schema) + .withConf(testConf) + .build(); + } else { + return AvroParquetWriter + .builder(new Path(file)) + .withSchema(schema) + .withConf(testConf) + .build(); + } + } + + private ParquetReader reader(String file) throws IOException { + if (local) { + return AvroParquetReader + .builder(new LocalInputFile(Paths.get(file))) + .withDataModel(GenericData.get()) + .withConf(testConf) + .build(); + } else { + return new AvroParquetReader(testConf, new Path(file)); + } + } + /** * Return a String or Utf8 depending on whether compatibility is on */ diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java new file mode 100644 index 0000000000..7174b42d5d --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.Path; + +/** + * {@code LocalInputFile} is an implementation needed by Parquet to read + * from local data files using {@link SeekableInputStream} instances. + */ +public class LocalInputFile implements InputFile { + + private final Path path; + private long length = -1; + + public LocalInputFile(Path file) { + path = file; + } + + @Override + public long getLength() throws IOException { + if (length == -1) { + try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) { + length = file.length(); + } + } + return length; + } + + @Override + public SeekableInputStream newStream() throws IOException { + + return new SeekableInputStream() { + + private final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r"); + + @Override + public int read() throws IOException { + return randomAccessFile.read(); + } + + @Override + public long getPos() throws IOException { + return randomAccessFile.getFilePointer(); + } + + @Override + public void seek(long newPos) throws IOException { + randomAccessFile.seek(newPos); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + randomAccessFile.readFully(bytes); + } + + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + randomAccessFile.readFully(bytes, start, len); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + byte[] buffer = new byte[buf.remaining()]; + int code = read(buffer); + buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); + return code; + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + byte[] buffer = new byte[buf.remaining()]; + readFully(buffer); + buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); + } + + @Override + public void close() throws IOException { + randomAccessFile.close(); + } + }; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java new file mode 100644 index 0000000000..5925df9889 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * {@code LocalOutputFile} is an implementation needed by Parquet to write + * to local data files using {@link PositionOutputStream} instances. + */ +public class LocalOutputFile implements OutputFile { + + private class LocalPositionOutputStream extends PositionOutputStream { + + private final BufferedOutputStream stream; + private long pos = 0; + + public LocalPositionOutputStream(int buffer, StandardOpenOption... openOption) throws IOException { + stream = new BufferedOutputStream(Files.newOutputStream(path, openOption), buffer); + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void write(int data) throws IOException { + pos++; + stream.write(data); + } + + @Override + public void write(byte[] data) throws IOException { + pos += data.length; + stream.write(data); + } + + @Override + public void write(byte[] data, int off, int len) throws IOException { + pos += len; + stream.write(data, off, len); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void close() throws IOException { + stream.close(); + } + } + + private final Path path; + + public LocalOutputFile(Path file) { + path = file; + } + + @Override + public PositionOutputStream create(long buffer) throws IOException { + return new LocalPositionOutputStream((int) buffer, StandardOpenOption.CREATE_NEW); + } + + @Override + public PositionOutputStream createOrOverwrite(long buffer) throws IOException { + return new LocalPositionOutputStream((int) buffer, StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); + } + + @Override + public boolean supportsBlockSize() { + return true; + } + + @Override + public long defaultBlockSize() { + return 512; + } + + @Override + public String getPath() { + return path.toString(); + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java index 77331758bf..fede913611 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java @@ -21,16 +21,41 @@ import java.io.IOException; +/** + * {@code OutputFile} is an interface with the methods needed by Parquet to write + * data files using {@link PositionOutputStream} instances. + */ public interface OutputFile { + /** + * Opens a new {@link PositionOutputStream} for the data file to create. + * + * @return a new {@link PositionOutputStream} to write the file + * @throws IOException if the stream cannot be opened + */ PositionOutputStream create(long blockSizeHint) throws IOException; + /** + * Opens a new {@link PositionOutputStream} for the data file to create or overwrite. + * + * @return a new {@link PositionOutputStream} to write the file + * @throws IOException if the stream cannot be opened + */ PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException; + /** + * @return a flag indicating if block size is supported. + */ boolean supportsBlockSize(); + /** + * @return the default block size. + */ long defaultBlockSize(); + /** + * @return the path of the file, as a {@link String}. + */ default String getPath() { return null; } diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java new file mode 100644 index 0000000000..d13cc7f713 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class TestLocalInputOutput { + + @Test + public void outputFileOverwritesFile() throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(124); + } + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(124); + } + InputFile read = new LocalInputFile(path); + try (SeekableInputStream stream = read.newStream()) { + assertEquals(stream.read(), 124); + assertEquals(stream.read(), -1); + } + } + + @Test + public void outputFileCreateFailsAsFileAlreadyExists() throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); + write.create(512).close(); + assertThrows(FileAlreadyExistsException.class, () -> write.create(512).close()); + } + + @Test + public void outputFileCreatesFileWithOverwrite() throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(255); + } + InputFile read = new LocalInputFile(path); + try (SeekableInputStream stream = read.newStream()) { + assertEquals(stream.read(), 255); + assertEquals(stream.read(), -1); + } + } + + @Test + public void outputFileCreatesFile() throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(2); + } + InputFile read = new LocalInputFile(path); + try (SeekableInputStream stream = read.newStream()) { + assertEquals(stream.read(), 2); + assertEquals(stream.read(), -1); + } + } + + private File createTempFile() throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + return tmp; + } +}