From 6cafabfbedff5f97a4063b78e21cc6b71cf3051e Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Mon, 12 Jun 2023 21:42:12 +0200 Subject: [PATCH 01/18] PARQUET-1822: Add nio Path wrappers Add disk InputFile and OutputFile implementations --- .../org/apache/parquet/io/DiskInputFile.java | 170 ++++++++++++++++ .../org/apache/parquet/io/DiskOutputFile.java | 182 ++++++++++++++++++ 2 files changed, 352 insertions(+) create mode 100644 parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java new file mode 100644 index 0000000000..9b12bb1582 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java @@ -0,0 +1,170 @@ +package org.apache.parquet.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.Path; + +/** + * {@code DiskInputFile} is an implementation needed by Parquet to read + * data files from disk using {@link SeekableInputStream} instances. + */ +public class DiskInputFile implements InputFile { + + private final Path path; + + public DiskInputFile(Path file) { + path = file; + } + + /** + * @return the total length of the file, in bytes. + * @throws IOException if the length cannot be determined + */ + @Override + public long getLength() throws IOException { + RandomAccessFile file = new RandomAccessFile(path.toFile(), "r"); + long length = file.length(); + file.close(); + return length; + } + + /** + * Open a new {@link SeekableInputStream} for the underlying data file. + * + * @return a new {@link SeekableInputStream} to read the file + * @throws IOException if the stream cannot be opened + */ + @Override + public SeekableInputStream newStream() throws IOException { + + return new SeekableInputStream() { + + final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r"); + + /** + * Reads the next byte of data from the input stream. The value byte is + * returned as an {@code int} in the range {@code 0} to + * {@code 255}. If no byte is available because the end of the stream + * has been reached, the value {@code -1} is returned. This method + * blocks until input data is available, the end of the stream is detected, + * or an exception is thrown. + * + * @return the next byte of data, or {@code -1} if the end of the + * stream is reached. + * @throws IOException if an I/O error occurs. + */ + @Override + public int read() throws IOException { + return randomAccessFile.read(); + } + + /** + * Return the current position in the InputStream. + * + * @return current position in bytes from the start of the stream + */ + @Override + public long getPos() throws IOException { + return randomAccessFile.getFilePointer(); + } + + /** + * Seek to a new position in the InputStream. + * + * @param newPos the new position to seek to + * @throws IOException If the underlying stream throws IOException + */ + @Override + public void seek(long newPos) throws IOException { + randomAccessFile.seek(newPos); + } + + /** + * Read a byte array of data, from position 0 to the end of the array. + *

+ * This method is equivalent to {@code read(bytes, 0, bytes.length)}. + *

+ * This method will block until len bytes are available to copy into the + * array, or will throw {@link EOFException} if the stream ends before the + * array is full. + * + * @param bytes a byte array to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer bytes left than are needed to + * fill the array, {@code bytes.length} + */ + @Override + public void readFully(byte[] bytes) throws IOException { + randomAccessFile.readFully(bytes); + } + + /** + * Read {@code len} bytes of data into an array, at position {@code start}. + *

+ * This method will block until len bytes are available to copy into the + * array, or will throw {@link EOFException} if the stream ends before the + * array is full. + * + * @param bytes a byte array to fill with data from the stream + * @param start the starting position in the byte array for data + * @param len the length of bytes to read into the byte array + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer than {@code len} bytes left + */ + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + randomAccessFile.readFully(bytes, start, len); + } + + /** + * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. + *

+ * This method will copy available bytes into the buffer, reading at most + * {@code buf.remaining()} bytes. The number of bytes actually copied is + * returned by the method, or -1 is returned to signal that the end of the + * underlying stream has been reached. + * + * @param buf a byte buffer to fill with data from the stream + * @return the number of bytes read or -1 if the stream ended + * @throws IOException If the underlying stream throws IOException + */ + @Override + public int read(ByteBuffer buf) throws IOException { + byte[] buffer = new byte[buf.remaining()]; + int code = read(buffer); + buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); + return code; + } + + /** + * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. + *

+ * This method will block until {@code buf.remaining()} bytes are available + * to copy into the buffer, or will throw {@link EOFException} if the stream + * ends before the buffer is full. + * + * @param buf a byte buffer to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer bytes left than are needed to + * fill the buffer, {@code buf.remaining()} + */ + @Override + public void readFully(ByteBuffer buf) throws IOException { + byte[] buffer = new byte[buf.remaining()]; + readFully(buffer); + buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); + } + + /** + * Closes the resource. + * @throws IOException if the underlying resource throws an IOException + */ + @Override + public void close() throws IOException { + randomAccessFile.close(); + } + }; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java new file mode 100644 index 0000000000..c6364cc632 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java @@ -0,0 +1,182 @@ +package org.apache.parquet.io; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * {@code DiskOutputFile} is an implementation needed by Parquet to write + * data files to disk using {@link PositionOutputStream} instances. + */ +public class DiskOutputFile implements OutputFile { + + private final Path path; + + public DiskOutputFile(Path file) { + path = file; + } + + /** + * Opens a new {@link PositionOutputStream} for the data file to create. + * + * @return a new {@link PositionOutputStream} to write the file + * @throws IOException if the stream cannot be opened + */ + @Override + public PositionOutputStream create(long buffer) throws IOException { + return new PositionOutputStream() { + + private final BufferedOutputStream stream = + new BufferedOutputStream(Files.newOutputStream(path), (int) buffer); + private long pos = 0; + + /** + * Reports the current position of this output stream. + * + * @return a long, the current position in bytes starting from 0 + */ + @Override + public long getPos() { + return pos; + } + + /** + * Writes the specified byte to this output stream. The general + * contract for {@code write} is that one byte is written + * to the output stream. The byte to be written is the eight + * low-order bits of the argument {@code b}. The 24 + * high-order bits of {@code b} are ignored. + * + * @param data the {@code byte}. + * @throws IOException if an I/O error occurs. In particular, + * an {@code IOException} may be thrown if the + * output stream has been closed. + */ + @Override + public void write(int data) throws IOException { + pos++; + stream.write(data); + } + + /** + * Writes {@code data.length} bytes from the specified byte array + * to this output stream. The general contract for {@code write(data)} + * is that it should have exactly the same effect as the call + * {@code write(data, 0, data.length)}. + * + * @param data the data. + * @throws IOException if an I/O error occurs. + * @see java.io.OutputStream#write(byte[], int, int) + */ + @Override + public void write(byte[] data) throws IOException { + pos += data.length; + stream.write(data); + } + + /** + * Writes {@code len} bytes from the specified byte array + * starting at offset {@code off} to this output stream. + * The general contract for {@code write(b, off, len)} is that + * some of the bytes in the array {@code b} are written to the + * output stream in order; element {@code b[off]} is the first + * byte written and {@code b[off+len-1]} is the last byte written + * by this operation. + *

+ * The {@code write} method of {@code OutputStream} calls + * the write method of one argument on each of the bytes to be + * written out. Subclasses are encouraged to override this method and + * provide a more efficient implementation. + *

+ * If {@code b} is {@code null}, a + * {@code NullPointerException} is thrown. + *

+ * If {@code off} is negative, or {@code len} is negative, or + * {@code off+len} is greater than the length of the array + * {@code b}, then an {@code IndexOutOfBoundsException} is thrown. + * + * @param data the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @throws IOException if an I/O error occurs. In particular, + * an {@code IOException} is thrown if the output + * stream is closed. + */ + @Override + public void write(byte[] data, int off, int len) throws IOException { + pos += len; + stream.write(data, off, len); + } + + /** + * Flushes this output stream and forces any buffered output bytes + * to be written out. The general contract of {@code flush} is + * that calling it is an indication that, if any bytes previously + * written have been buffered by the implementation of the output + * stream, such bytes should immediately be written to their + * intended destination. + *

+ * If the intended destination of this stream is an abstraction provided by + * the underlying operating system, for example a file, then flushing the + * stream guarantees only that bytes previously written to the stream are + * passed to the operating system for writing; it does not guarantee that + * they are actually written to a physical device such as a disk drive. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void flush() throws IOException { + stream.flush(); + } + + /** + * Closes this output stream and releases any system resources + * associated with this stream. The general contract of {@code close} + * is that it closes the output stream. A closed stream cannot perform + * output operations and cannot be reopened. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void close() throws IOException { + stream.close(); + } + }; + } + + /** + * Opens a new {@link PositionOutputStream} for the data file to create or overwrite. + * + * @return a new {@link PositionOutputStream} to write the file + * @throws IOException if the stream cannot be opened + */ + @Override + public PositionOutputStream createOrOverwrite(long buffer) throws IOException { + return create(buffer); + } + + /** + * @return a flag indicating if block size is supported. + */ + @Override + public boolean supportsBlockSize() { + return true; + } + + /** + * @return the default block size. + */ + @Override + public long defaultBlockSize() { + return 512; + } + + /** + * @return the path of the file, as a {@link String}. + */ + @Override + public String getPath() { + return path.toString(); + } +} From f41fc157c2fd5a24ba9948617cd79dd14710d5f5 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Mon, 12 Jun 2023 21:53:41 +0200 Subject: [PATCH 02/18] Add more documentation Add some Javadoc to OutputFile --- .../org/apache/parquet/io/OutputFile.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java index 77331758bf..fede913611 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java @@ -21,16 +21,41 @@ import java.io.IOException; +/** + * {@code OutputFile} is an interface with the methods needed by Parquet to write + * data files using {@link PositionOutputStream} instances. + */ public interface OutputFile { + /** + * Opens a new {@link PositionOutputStream} for the data file to create. + * + * @return a new {@link PositionOutputStream} to write the file + * @throws IOException if the stream cannot be opened + */ PositionOutputStream create(long blockSizeHint) throws IOException; + /** + * Opens a new {@link PositionOutputStream} for the data file to create or overwrite. + * + * @return a new {@link PositionOutputStream} to write the file + * @throws IOException if the stream cannot be opened + */ PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException; + /** + * @return a flag indicating if block size is supported. + */ boolean supportsBlockSize(); + /** + * @return the default block size. + */ long defaultBlockSize(); + /** + * @return the path of the file, as a {@link String}. + */ default String getPath() { return null; } From 3a81d6bfe6eeca5b057758df42d00c183d688f32 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Mon, 12 Jun 2023 22:35:03 +0200 Subject: [PATCH 03/18] Add read/write tests with disk impl --- .../apache/parquet/avro/TestReadWrite.java | 130 ++++++++++++++++-- 1 file changed, 120 insertions(+), 10 deletions(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 6484ab4a62..cd98c92767 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -27,6 +27,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -54,6 +55,8 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.DiskInputFile; +import org.apache.parquet.io.DiskOutputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageTypeParser; @@ -92,7 +95,7 @@ public TestReadWrite(boolean compat) { @Test public void testEmptyArray() throws Exception { Schema schema = new Schema.Parser().parse( - Resources.getResource("array.avsc").openStream()); + Resources.getResource("array.avsc").openStream()); // Write a record with an empty array. List emptyArray = new ArrayList<>(); @@ -100,10 +103,10 @@ public void testEmptyArray() throws Exception { Path file = new Path(createTempFile().getPath()); try(ParquetWriter writer = AvroParquetWriter - .builder(file) - .withSchema(schema) - .withConf(testConf) - .build()) { + .builder(file) + .withSchema(schema) + .withConf(testConf) + .build()) { GenericData.Record record = new GenericRecordBuilder(schema) .set("myarray", emptyArray).build(); writer.write(record); @@ -117,19 +120,52 @@ public void testEmptyArray() throws Exception { } } + @Test + public void testEmptyArrayDisk() throws Exception { + Schema schema = new Schema.Parser().parse( + Resources.getResource("array.avsc").openStream()); + + // Write a record with an empty array. + List emptyArray = new ArrayList<>(); + + java.nio.file.Path file = Paths.get(createTempFile().getPath()); + + try(ParquetWriter writer = AvroParquetWriter + .builder(new DiskOutputFile(file)) + .withSchema(schema) + .withConf(testConf) + .build()) { + GenericData.Record record = new GenericRecordBuilder(schema) + .set("myarray", emptyArray).build(); + writer.write(record); + } + + try (ParquetReader reader = AvroParquetReader + .builder(new DiskInputFile(file)) + .withDataModel(GenericData.get()) + .withConf(testConf) + .build()) { + + GenericRecord nextRecord = reader.read(); + + assertNotNull(nextRecord); + assertEquals(emptyArray, nextRecord.get("myarray")); + } + } + @Test public void testEmptyMap() throws Exception { Schema schema = new Schema.Parser().parse( - Resources.getResource("map.avsc").openStream()); + Resources.getResource("map.avsc").openStream()); Path file = new Path(createTempFile().getPath()); ImmutableMap emptyMap = new ImmutableMap.Builder().build(); try(ParquetWriter writer = AvroParquetWriter - .builder(file) - .withSchema(schema) - .withConf(testConf) - .build()) { + .builder(file) + .withSchema(schema) + .withConf(testConf) + .build()) { // Write a record with an empty map. GenericData.Record record = new GenericRecordBuilder(schema) @@ -145,6 +181,39 @@ public void testEmptyMap() throws Exception { } } + @Test + public void testEmptyMapDisk() throws Exception { + Schema schema = new Schema.Parser().parse( + Resources.getResource("map.avsc").openStream()); + + java.nio.file.Path file = Paths.get(createTempFile().getPath()); + ImmutableMap emptyMap = new ImmutableMap.Builder().build(); + + try(ParquetWriter writer = AvroParquetWriter + .builder(new DiskOutputFile(file)) + .withSchema(schema) + .withConf(testConf) + .build()) { + + // Write a record with an empty map. + GenericData.Record record = new GenericRecordBuilder(schema) + .set("mymap", emptyMap).build(); + writer.write(record); + } + + try (ParquetReader reader = AvroParquetReader + .builder(new DiskInputFile(file)) + .withDataModel(GenericData.get()) + .withConf(testConf) + .build()) { + + GenericRecord nextRecord = reader.read(); + + assertNotNull(nextRecord); + assertEquals(emptyMap, nextRecord.get("mymap")); + } + } + @Test public void testMapWithNulls() throws Exception { Schema schema = new Schema.Parser().parse( @@ -734,6 +803,47 @@ public void testNestedLists() throws Exception { assertEquals(str("hello"), l2List.get(0)); } + @Test + public void testNestedListsDisk() throws Exception { + Schema schema = new Schema.Parser().parse( + Resources.getResource("nested_array.avsc").openStream()); + java.nio.file.Path file = Paths.get(createTempFile().getPath()); + + // Parquet writer + try (ParquetWriter parquetWriter = AvroParquetWriter.builder(new DiskOutputFile(file)) + .withSchema(schema) + .withConf(testConf) + .build()) { + + Schema innerRecordSchema = schema.getField("l1").schema().getTypes() + .get(1).getElementType().getTypes().get(1); + + GenericRecord record = new GenericRecordBuilder(schema) + .set("l1", Collections.singletonList( + new GenericRecordBuilder(innerRecordSchema).set("l2", Collections.singletonList("hello")).build() + )) + .build(); + + parquetWriter.write(record); + } + + try (ParquetReader reader = AvroParquetReader + .builder(new DiskInputFile(file)) + .withDataModel(GenericData.get()) + .withConf(testConf) + .build()) { + + GenericRecord nextRecord = reader.read(); + + assertNotNull(nextRecord); + assertNotNull(nextRecord.get("l1")); + List l1List = (List) nextRecord.get("l1"); + assertNotNull(l1List.get(0)); + List l2List = (List) ((GenericRecord) l1List.get(0)).get("l2"); + assertEquals(str("hello"), l2List.get(0)); + } + } + /** * A test demonstrating the most simple way to write and read Parquet files * using Avro {@link GenericRecord}. From 0cc76f1031fbd8bb6e00b04284a926b4d1fbc1e6 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Tue, 13 Jun 2023 07:45:16 +0200 Subject: [PATCH 04/18] Add license headers --- .../org/apache/parquet/io/DiskInputFile.java | 18 ++++++++++++++++++ .../org/apache/parquet/io/DiskOutputFile.java | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java index 9b12bb1582..62bc5cd016 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.parquet.io; import java.io.EOFException; diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java index c6364cc632..987250cc80 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.parquet.io; import java.io.BufferedOutputStream; From 26ceb052b1e2457a60133be864970a451a313fed Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Tue, 13 Jun 2023 10:18:41 +0200 Subject: [PATCH 05/18] Fix reader test --- .../src/test/java/org/apache/parquet/avro/TestReadWrite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index cd98c92767..8203735bde 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -840,7 +840,7 @@ public void testNestedListsDisk() throws Exception { List l1List = (List) nextRecord.get("l1"); assertNotNull(l1List.get(0)); List l2List = (List) ((GenericRecord) l1List.get(0)).get("l2"); - assertEquals(str("hello"), l2List.get(0)); + assertEquals(new Utf8("hello"), l2List.get(0)); } } From 710f0dde1999f18f14a13441a578ffe9f983a83d Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 14:29:42 +0200 Subject: [PATCH 06/18] Remove Javadoc See comments made by @gszadovszky under #1111 --- .../org/apache/parquet/io/DiskInputFile.java | 89 ---------------- .../org/apache/parquet/io/DiskOutputFile.java | 100 ------------------ 2 files changed, 189 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java index 62bc5cd016..39e23ca852 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java @@ -18,7 +18,6 @@ */ package org.apache.parquet.io; -import java.io.EOFException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; @@ -36,10 +35,6 @@ public DiskInputFile(Path file) { path = file; } - /** - * @return the total length of the file, in bytes. - * @throws IOException if the length cannot be determined - */ @Override public long getLength() throws IOException { RandomAccessFile file = new RandomAccessFile(path.toFile(), "r"); @@ -48,12 +43,6 @@ public long getLength() throws IOException { return length; } - /** - * Open a new {@link SeekableInputStream} for the underlying data file. - * - * @return a new {@link SeekableInputStream} to read the file - * @throws IOException if the stream cannot be opened - */ @Override public SeekableInputStream newStream() throws IOException { @@ -61,93 +50,31 @@ public SeekableInputStream newStream() throws IOException { final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r"); - /** - * Reads the next byte of data from the input stream. The value byte is - * returned as an {@code int} in the range {@code 0} to - * {@code 255}. If no byte is available because the end of the stream - * has been reached, the value {@code -1} is returned. This method - * blocks until input data is available, the end of the stream is detected, - * or an exception is thrown. - * - * @return the next byte of data, or {@code -1} if the end of the - * stream is reached. - * @throws IOException if an I/O error occurs. - */ @Override public int read() throws IOException { return randomAccessFile.read(); } - /** - * Return the current position in the InputStream. - * - * @return current position in bytes from the start of the stream - */ @Override public long getPos() throws IOException { return randomAccessFile.getFilePointer(); } - /** - * Seek to a new position in the InputStream. - * - * @param newPos the new position to seek to - * @throws IOException If the underlying stream throws IOException - */ @Override public void seek(long newPos) throws IOException { randomAccessFile.seek(newPos); } - /** - * Read a byte array of data, from position 0 to the end of the array. - *

- * This method is equivalent to {@code read(bytes, 0, bytes.length)}. - *

- * This method will block until len bytes are available to copy into the - * array, or will throw {@link EOFException} if the stream ends before the - * array is full. - * - * @param bytes a byte array to fill with data from the stream - * @throws IOException If the underlying stream throws IOException - * @throws EOFException If the stream has fewer bytes left than are needed to - * fill the array, {@code bytes.length} - */ @Override public void readFully(byte[] bytes) throws IOException { randomAccessFile.readFully(bytes); } - /** - * Read {@code len} bytes of data into an array, at position {@code start}. - *

- * This method will block until len bytes are available to copy into the - * array, or will throw {@link EOFException} if the stream ends before the - * array is full. - * - * @param bytes a byte array to fill with data from the stream - * @param start the starting position in the byte array for data - * @param len the length of bytes to read into the byte array - * @throws IOException If the underlying stream throws IOException - * @throws EOFException If the stream has fewer than {@code len} bytes left - */ @Override public void readFully(byte[] bytes, int start, int len) throws IOException { randomAccessFile.readFully(bytes, start, len); } - /** - * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. - *

- * This method will copy available bytes into the buffer, reading at most - * {@code buf.remaining()} bytes. The number of bytes actually copied is - * returned by the method, or -1 is returned to signal that the end of the - * underlying stream has been reached. - * - * @param buf a byte buffer to fill with data from the stream - * @return the number of bytes read or -1 if the stream ended - * @throws IOException If the underlying stream throws IOException - */ @Override public int read(ByteBuffer buf) throws IOException { byte[] buffer = new byte[buf.remaining()]; @@ -156,18 +83,6 @@ public int read(ByteBuffer buf) throws IOException { return code; } - /** - * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. - *

- * This method will block until {@code buf.remaining()} bytes are available - * to copy into the buffer, or will throw {@link EOFException} if the stream - * ends before the buffer is full. - * - * @param buf a byte buffer to fill with data from the stream - * @throws IOException If the underlying stream throws IOException - * @throws EOFException If the stream has fewer bytes left than are needed to - * fill the buffer, {@code buf.remaining()} - */ @Override public void readFully(ByteBuffer buf) throws IOException { byte[] buffer = new byte[buf.remaining()]; @@ -175,10 +90,6 @@ public void readFully(ByteBuffer buf) throws IOException { buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); } - /** - * Closes the resource. - * @throws IOException if the underlying resource throws an IOException - */ @Override public void close() throws IOException { randomAccessFile.close(); diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java index 987250cc80..139cbcde51 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java @@ -35,12 +35,6 @@ public DiskOutputFile(Path file) { path = file; } - /** - * Opens a new {@link PositionOutputStream} for the data file to create. - * - * @return a new {@link PositionOutputStream} to write the file - * @throws IOException if the stream cannot be opened - */ @Override public PositionOutputStream create(long buffer) throws IOException { return new PositionOutputStream() { @@ -49,113 +43,34 @@ public PositionOutputStream create(long buffer) throws IOException { new BufferedOutputStream(Files.newOutputStream(path), (int) buffer); private long pos = 0; - /** - * Reports the current position of this output stream. - * - * @return a long, the current position in bytes starting from 0 - */ @Override public long getPos() { return pos; } - /** - * Writes the specified byte to this output stream. The general - * contract for {@code write} is that one byte is written - * to the output stream. The byte to be written is the eight - * low-order bits of the argument {@code b}. The 24 - * high-order bits of {@code b} are ignored. - * - * @param data the {@code byte}. - * @throws IOException if an I/O error occurs. In particular, - * an {@code IOException} may be thrown if the - * output stream has been closed. - */ @Override public void write(int data) throws IOException { pos++; stream.write(data); } - /** - * Writes {@code data.length} bytes from the specified byte array - * to this output stream. The general contract for {@code write(data)} - * is that it should have exactly the same effect as the call - * {@code write(data, 0, data.length)}. - * - * @param data the data. - * @throws IOException if an I/O error occurs. - * @see java.io.OutputStream#write(byte[], int, int) - */ @Override public void write(byte[] data) throws IOException { pos += data.length; stream.write(data); } - /** - * Writes {@code len} bytes from the specified byte array - * starting at offset {@code off} to this output stream. - * The general contract for {@code write(b, off, len)} is that - * some of the bytes in the array {@code b} are written to the - * output stream in order; element {@code b[off]} is the first - * byte written and {@code b[off+len-1]} is the last byte written - * by this operation. - *

- * The {@code write} method of {@code OutputStream} calls - * the write method of one argument on each of the bytes to be - * written out. Subclasses are encouraged to override this method and - * provide a more efficient implementation. - *

- * If {@code b} is {@code null}, a - * {@code NullPointerException} is thrown. - *

- * If {@code off} is negative, or {@code len} is negative, or - * {@code off+len} is greater than the length of the array - * {@code b}, then an {@code IndexOutOfBoundsException} is thrown. - * - * @param data the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - * @throws IOException if an I/O error occurs. In particular, - * an {@code IOException} is thrown if the output - * stream is closed. - */ @Override public void write(byte[] data, int off, int len) throws IOException { pos += len; stream.write(data, off, len); } - /** - * Flushes this output stream and forces any buffered output bytes - * to be written out. The general contract of {@code flush} is - * that calling it is an indication that, if any bytes previously - * written have been buffered by the implementation of the output - * stream, such bytes should immediately be written to their - * intended destination. - *

- * If the intended destination of this stream is an abstraction provided by - * the underlying operating system, for example a file, then flushing the - * stream guarantees only that bytes previously written to the stream are - * passed to the operating system for writing; it does not guarantee that - * they are actually written to a physical device such as a disk drive. - * - * @throws IOException if an I/O error occurs. - */ @Override public void flush() throws IOException { stream.flush(); } - /** - * Closes this output stream and releases any system resources - * associated with this stream. The general contract of {@code close} - * is that it closes the output stream. A closed stream cannot perform - * output operations and cannot be reopened. - * - * @throws IOException if an I/O error occurs. - */ @Override public void close() throws IOException { stream.close(); @@ -163,36 +78,21 @@ public void close() throws IOException { }; } - /** - * Opens a new {@link PositionOutputStream} for the data file to create or overwrite. - * - * @return a new {@link PositionOutputStream} to write the file - * @throws IOException if the stream cannot be opened - */ @Override public PositionOutputStream createOrOverwrite(long buffer) throws IOException { return create(buffer); } - /** - * @return a flag indicating if block size is supported. - */ @Override public boolean supportsBlockSize() { return true; } - /** - * @return the default block size. - */ @Override public long defaultBlockSize() { return 512; } - /** - * @return the path of the file, as a {@link String}. - */ @Override public String getPath() { return path.toString(); From a859954a3c65029f203c63c710f224e912986c39 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:16:08 +0200 Subject: [PATCH 07/18] Rename Disk -> Local See comments made by @gszadovszky under #1111 --- .../org/apache/parquet/avro/TestReadWrite.java | 16 ++++++++-------- .../{DiskInputFile.java => LocalInputFile.java} | 4 ++-- ...{DiskOutputFile.java => LocalOutputFile.java} | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) rename parquet-common/src/main/java/org/apache/parquet/io/{DiskInputFile.java => LocalInputFile.java} (97%) rename parquet-common/src/main/java/org/apache/parquet/io/{DiskOutputFile.java => LocalOutputFile.java} (96%) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 8203735bde..7d096ccf31 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -55,8 +55,8 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.io.DiskInputFile; -import org.apache.parquet.io.DiskOutputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageTypeParser; @@ -131,7 +131,7 @@ public void testEmptyArrayDisk() throws Exception { java.nio.file.Path file = Paths.get(createTempFile().getPath()); try(ParquetWriter writer = AvroParquetWriter - .builder(new DiskOutputFile(file)) + .builder(new LocalOutputFile(file)) .withSchema(schema) .withConf(testConf) .build()) { @@ -141,7 +141,7 @@ public void testEmptyArrayDisk() throws Exception { } try (ParquetReader reader = AvroParquetReader - .builder(new DiskInputFile(file)) + .builder(new LocalInputFile(file)) .withDataModel(GenericData.get()) .withConf(testConf) .build()) { @@ -190,7 +190,7 @@ public void testEmptyMapDisk() throws Exception { ImmutableMap emptyMap = new ImmutableMap.Builder().build(); try(ParquetWriter writer = AvroParquetWriter - .builder(new DiskOutputFile(file)) + .builder(new LocalOutputFile(file)) .withSchema(schema) .withConf(testConf) .build()) { @@ -202,7 +202,7 @@ public void testEmptyMapDisk() throws Exception { } try (ParquetReader reader = AvroParquetReader - .builder(new DiskInputFile(file)) + .builder(new LocalInputFile(file)) .withDataModel(GenericData.get()) .withConf(testConf) .build()) { @@ -810,7 +810,7 @@ public void testNestedListsDisk() throws Exception { java.nio.file.Path file = Paths.get(createTempFile().getPath()); // Parquet writer - try (ParquetWriter parquetWriter = AvroParquetWriter.builder(new DiskOutputFile(file)) + try (ParquetWriter parquetWriter = AvroParquetWriter.builder(new LocalOutputFile(file)) .withSchema(schema) .withConf(testConf) .build()) { @@ -828,7 +828,7 @@ public void testNestedListsDisk() throws Exception { } try (ParquetReader reader = AvroParquetReader - .builder(new DiskInputFile(file)) + .builder(new LocalInputFile(file)) .withDataModel(GenericData.get()) .withConf(testConf) .build()) { diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java similarity index 97% rename from parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java rename to parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java index 39e23ca852..19a74f0256 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/DiskInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -27,11 +27,11 @@ * {@code DiskInputFile} is an implementation needed by Parquet to read * data files from disk using {@link SeekableInputStream} instances. */ -public class DiskInputFile implements InputFile { +public class LocalInputFile implements InputFile { private final Path path; - public DiskInputFile(Path file) { + public LocalInputFile(Path file) { path = file; } diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java similarity index 96% rename from parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java rename to parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java index 139cbcde51..2fe945ca60 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/DiskOutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java @@ -27,11 +27,11 @@ * {@code DiskOutputFile} is an implementation needed by Parquet to write * data files to disk using {@link PositionOutputStream} instances. */ -public class DiskOutputFile implements OutputFile { +public class LocalOutputFile implements OutputFile { private final Path path; - public DiskOutputFile(Path file) { + public LocalOutputFile(Path file) { path = file; } From 5a9a4de0a381baef1e9966cda7b3f62fa08ab02e Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 16:09:54 +0200 Subject: [PATCH 08/18] Update Javadoc See comments made by @gszadovszky under #1111 --- .../src/main/java/org/apache/parquet/io/LocalInputFile.java | 2 +- .../src/main/java/org/apache/parquet/io/LocalOutputFile.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java index 19a74f0256..4badf45287 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -25,7 +25,7 @@ /** * {@code DiskInputFile} is an implementation needed by Parquet to read - * data files from disk using {@link SeekableInputStream} instances. + * from local data files using {@link SeekableInputStream} instances. */ public class LocalInputFile implements InputFile { diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java index 2fe945ca60..5c096d49d1 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java @@ -25,7 +25,7 @@ /** * {@code DiskOutputFile} is an implementation needed by Parquet to write - * data files to disk using {@link PositionOutputStream} instances. + * to local data files using {@link PositionOutputStream} instances. */ public class LocalOutputFile implements OutputFile { From 0775c493f8f73484a8bb2adecd004da4ccdef075 Mon Sep 17 00:00:00 2001 From: Atour <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 17:09:39 +0200 Subject: [PATCH 09/18] Update parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java Co-authored-by: Gang Wu --- .../src/main/java/org/apache/parquet/io/LocalInputFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java index 4badf45287..44801d8e7c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -24,7 +24,7 @@ import java.nio.file.Path; /** - * {@code DiskInputFile} is an implementation needed by Parquet to read + * {@code LocalInputFile} is an implementation needed by Parquet to read * from local data files using {@link SeekableInputStream} instances. */ public class LocalInputFile implements InputFile { From a2002848d2c1e89666280ed28240fa78df39fd3f Mon Sep 17 00:00:00 2001 From: Atour <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 17:09:48 +0200 Subject: [PATCH 10/18] Update parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java Co-authored-by: Gang Wu --- .../src/main/java/org/apache/parquet/io/LocalOutputFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java index 5c096d49d1..07fdb44bf2 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java @@ -24,7 +24,7 @@ import java.nio.file.Path; /** - * {@code DiskOutputFile} is an implementation needed by Parquet to write + * {@code LocalOutputFile} is an implementation needed by Parquet to write * to local data files using {@link PositionOutputStream} instances. */ public class LocalOutputFile implements OutputFile { From 7643868f9a36ea57e7456707f2b4d804e3e7c1c6 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 17:15:49 +0200 Subject: [PATCH 11/18] Update tests disk -> local --- .../test/java/org/apache/parquet/avro/TestReadWrite.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 7d096ccf31..5cda8b5ef3 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -121,7 +121,7 @@ public void testEmptyArray() throws Exception { } @Test - public void testEmptyArrayDisk() throws Exception { + public void testEmptyArrayLocal() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("array.avsc").openStream()); @@ -182,7 +182,7 @@ public void testEmptyMap() throws Exception { } @Test - public void testEmptyMapDisk() throws Exception { + public void testEmptyMapLocal() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("map.avsc").openStream()); @@ -804,7 +804,7 @@ public void testNestedLists() throws Exception { } @Test - public void testNestedListsDisk() throws Exception { + public void testNestedListsLocal() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("nested_array.avsc").openStream()); java.nio.file.Path file = Paths.get(createTempFile().getPath()); From a5d54440fc0cc6afb448ad0370baeff4fbba7b66 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 17:25:15 +0200 Subject: [PATCH 12/18] Resolves issues regarding create/overwrite See comments made by @wgtmac under #1111 --- .../org/apache/parquet/io/LocalInputFile.java | 11 ++- .../apache/parquet/io/LocalOutputFile.java | 87 ++++++++++--------- 2 files changed, 54 insertions(+), 44 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java index 44801d8e7c..7174b42d5d 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -30,6 +30,7 @@ public class LocalInputFile implements InputFile { private final Path path; + private long length = -1; public LocalInputFile(Path file) { path = file; @@ -37,9 +38,11 @@ public LocalInputFile(Path file) { @Override public long getLength() throws IOException { - RandomAccessFile file = new RandomAccessFile(path.toFile(), "r"); - long length = file.length(); - file.close(); + if (length == -1) { + try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) { + length = file.length(); + } + } return length; } @@ -48,7 +51,7 @@ public SeekableInputStream newStream() throws IOException { return new SeekableInputStream() { - final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r"); + private final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r"); @Override public int read() throws IOException { diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java index 07fdb44bf2..5925df9889 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; /** * {@code LocalOutputFile} is an implementation needed by Parquet to write @@ -29,6 +30,49 @@ */ public class LocalOutputFile implements OutputFile { + private class LocalPositionOutputStream extends PositionOutputStream { + + private final BufferedOutputStream stream; + private long pos = 0; + + public LocalPositionOutputStream(int buffer, StandardOpenOption... openOption) throws IOException { + stream = new BufferedOutputStream(Files.newOutputStream(path, openOption), buffer); + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void write(int data) throws IOException { + pos++; + stream.write(data); + } + + @Override + public void write(byte[] data) throws IOException { + pos += data.length; + stream.write(data); + } + + @Override + public void write(byte[] data, int off, int len) throws IOException { + pos += len; + stream.write(data, off, len); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void close() throws IOException { + stream.close(); + } + } + private final Path path; public LocalOutputFile(Path file) { @@ -37,50 +81,13 @@ public LocalOutputFile(Path file) { @Override public PositionOutputStream create(long buffer) throws IOException { - return new PositionOutputStream() { - - private final BufferedOutputStream stream = - new BufferedOutputStream(Files.newOutputStream(path), (int) buffer); - private long pos = 0; - - @Override - public long getPos() { - return pos; - } - - @Override - public void write(int data) throws IOException { - pos++; - stream.write(data); - } - - @Override - public void write(byte[] data) throws IOException { - pos += data.length; - stream.write(data); - } - - @Override - public void write(byte[] data, int off, int len) throws IOException { - pos += len; - stream.write(data, off, len); - } - - @Override - public void flush() throws IOException { - stream.flush(); - } - - @Override - public void close() throws IOException { - stream.close(); - } - }; + return new LocalPositionOutputStream((int) buffer, StandardOpenOption.CREATE_NEW); } @Override public PositionOutputStream createOrOverwrite(long buffer) throws IOException { - return create(buffer); + return new LocalPositionOutputStream((int) buffer, StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); } @Override From e5ac8b682f08dad1e178dcaf4174e5a5aeb7ac01 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 18:13:48 +0200 Subject: [PATCH 13/18] Adds tests for create/overwrite --- .../parquet/io/TestLocalInputOutput.java | 65 +++++++++++++++++++ .../disk_output_file_create_overwrite.parquet | 1 + 2 files changed, 66 insertions(+) create mode 100644 parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java create mode 100644 parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java new file mode 100644 index 0000000000..1e4f59b866 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -0,0 +1,65 @@ +package org.apache.parquet.io; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class TestLocalInputOutput { + + Path pathFileExists = Paths.get("src/test/resources/disk_output_file_create_overwrite.parquet"); + Path pathNewFile = Paths.get("src/test/resources/disk_output_file_create.parquet"); + + @Test + public void outputFileOverwritesFile() throws IOException { + OutputFile write = new LocalOutputFile(pathFileExists); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(124); + } + InputFile read = new LocalInputFile(pathFileExists); + try (SeekableInputStream stream = read.newStream()) { + assertEquals(stream.read(), 124); + assertEquals(stream.read(), -1); + } + } + + @Test + public void outputFileCreateFailsAsFileAlreadyExists() { + OutputFile write = new LocalOutputFile(pathFileExists); + assertThrows(FileAlreadyExistsException.class, () -> write.create(512)); + } + + @Test + public void outputFileCreatesFileWithOverwrite() throws IOException { + OutputFile write = new LocalOutputFile(pathNewFile); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(255); + } + InputFile read = new LocalInputFile(pathNewFile); + try (SeekableInputStream stream = read.newStream()) { + assertEquals(stream.read(), 255); + assertEquals(stream.read(), -1); + } + Files.delete(pathNewFile); + } + + @Test + public void outputFileCreatesFile() throws IOException { + OutputFile write = new LocalOutputFile(pathNewFile); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(2); + } + InputFile read = new LocalInputFile(pathNewFile); + try (SeekableInputStream stream = read.newStream()) { + assertEquals(stream.read(), 2); + assertEquals(stream.read(), -1); + } + Files.delete(pathNewFile); + } +} diff --git a/parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet b/parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet new file mode 100644 index 0000000000..948cf947f8 --- /dev/null +++ b/parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet @@ -0,0 +1 @@ +| From b38c84abb8d324ae3d60d1dd84e334305b1b3934 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Thu, 15 Jun 2023 18:21:10 +0200 Subject: [PATCH 14/18] Adds license header to test file --- .../parquet/io/TestLocalInputOutput.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java index 1e4f59b866..3e511b4a7f 100644 --- a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.parquet.io; import org.junit.Test; From 9f309a2ca0f253eba7e43116c1b3a57d13fa262d Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sat, 24 Jun 2023 20:37:50 +0200 Subject: [PATCH 15/18] Use temp files for tests --- .../parquet/io/TestLocalInputOutput.java | 40 +++++++++++-------- .../disk_output_file_create_overwrite.parquet | 1 - 2 files changed, 24 insertions(+), 17 deletions(-) delete mode 100644 parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java index 3e511b4a7f..c600249255 100644 --- a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -18,29 +18,29 @@ */ package org.apache.parquet.io; -import org.junit.Test; - +import java.io.File; import java.io.IOException; import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; public class TestLocalInputOutput { - Path pathFileExists = Paths.get("src/test/resources/disk_output_file_create_overwrite.parquet"); - Path pathNewFile = Paths.get("src/test/resources/disk_output_file_create.parquet"); - @Test public void outputFileOverwritesFile() throws IOException { - OutputFile write = new LocalOutputFile(pathFileExists); + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); try (PositionOutputStream stream = write.createOrOverwrite(512)) { stream.write(124); } - InputFile read = new LocalInputFile(pathFileExists); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(124); + } + InputFile read = new LocalInputFile(path); try (SeekableInputStream stream = read.newStream()) { assertEquals(stream.read(), 124); assertEquals(stream.read(), -1); @@ -48,36 +48,44 @@ public void outputFileOverwritesFile() throws IOException { } @Test - public void outputFileCreateFailsAsFileAlreadyExists() { - OutputFile write = new LocalOutputFile(pathFileExists); + public void outputFileCreateFailsAsFileAlreadyExists() throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); assertThrows(FileAlreadyExistsException.class, () -> write.create(512)); } @Test public void outputFileCreatesFileWithOverwrite() throws IOException { - OutputFile write = new LocalOutputFile(pathNewFile); + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); try (PositionOutputStream stream = write.createOrOverwrite(512)) { stream.write(255); } - InputFile read = new LocalInputFile(pathNewFile); + InputFile read = new LocalInputFile(path); try (SeekableInputStream stream = read.newStream()) { assertEquals(stream.read(), 255); assertEquals(stream.read(), -1); } - Files.delete(pathNewFile); } @Test public void outputFileCreatesFile() throws IOException { - OutputFile write = new LocalOutputFile(pathNewFile); + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); try (PositionOutputStream stream = write.createOrOverwrite(512)) { stream.write(2); } - InputFile read = new LocalInputFile(pathNewFile); + InputFile read = new LocalInputFile(path); try (SeekableInputStream stream = read.newStream()) { assertEquals(stream.read(), 2); assertEquals(stream.read(), -1); } - Files.delete(pathNewFile); + } + + private File createTempFile() throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + return tmp; } } diff --git a/parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet b/parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet deleted file mode 100644 index 948cf947f8..0000000000 --- a/parquet-common/src/test/resources/disk_output_file_create_overwrite.parquet +++ /dev/null @@ -1 +0,0 @@ -| From 223e7ddc21f9991a652ee2cf7a1a5999c568bfab Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sat, 24 Jun 2023 21:03:10 +0200 Subject: [PATCH 16/18] Parameterizes tests --- .../apache/parquet/avro/TestReadWrite.java | 172 +++++------------- 1 file changed, 43 insertions(+), 129 deletions(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 5cda8b5ef3..0a08d22100 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -77,16 +77,19 @@ public class TestReadWrite { @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { - { false }, // use the new converters - { true } }; // use the old converters + { false, false }, // use the new converters + { true, false }, // use the old converters + { true, true } }; // use a local disk location return Arrays.asList(data); } private final boolean compat; + private final boolean local; private final Configuration testConf = new Configuration(); - public TestReadWrite(boolean compat) { + public TestReadWrite(boolean compat, boolean local) { this.compat = compat; + this.local = local; this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); testConf.setBoolean("parquet.avro.add-list-element-records", false); testConf.setBoolean("parquet.avro.write-old-list-structure", false); @@ -100,52 +103,15 @@ public void testEmptyArray() throws Exception { // Write a record with an empty array. List emptyArray = new ArrayList<>(); - Path file = new Path(createTempFile().getPath()); - - try(ParquetWriter writer = AvroParquetWriter - .builder(file) - .withSchema(schema) - .withConf(testConf) - .build()) { - GenericData.Record record = new GenericRecordBuilder(schema) - .set("myarray", emptyArray).build(); - writer.write(record); - } - - try (AvroParquetReader reader = new AvroParquetReader<>(testConf, file)) { - GenericRecord nextRecord = reader.read(); - - assertNotNull(nextRecord); - assertEquals(emptyArray, nextRecord.get("myarray")); - } - } - - @Test - public void testEmptyArrayLocal() throws Exception { - Schema schema = new Schema.Parser().parse( - Resources.getResource("array.avsc").openStream()); - - // Write a record with an empty array. - List emptyArray = new ArrayList<>(); + String file = createTempFile().getPath(); - java.nio.file.Path file = Paths.get(createTempFile().getPath()); - - try(ParquetWriter writer = AvroParquetWriter - .builder(new LocalOutputFile(file)) - .withSchema(schema) - .withConf(testConf) - .build()) { + try(ParquetWriter writer = writer(file, schema)) { GenericData.Record record = new GenericRecordBuilder(schema) .set("myarray", emptyArray).build(); writer.write(record); } - try (ParquetReader reader = AvroParquetReader - .builder(new LocalInputFile(file)) - .withDataModel(GenericData.get()) - .withConf(testConf) - .build()) { - + try (ParquetReader reader = reader(file)) { GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); @@ -158,14 +124,10 @@ public void testEmptyMap() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("map.avsc").openStream()); - Path file = new Path(createTempFile().getPath()); + String file = createTempFile().getPath(); ImmutableMap emptyMap = new ImmutableMap.Builder().build(); - try(ParquetWriter writer = AvroParquetWriter - .builder(file) - .withSchema(schema) - .withConf(testConf) - .build()) { + try (ParquetWriter writer = writer(file, schema)) { // Write a record with an empty map. GenericData.Record record = new GenericRecordBuilder(schema) @@ -173,40 +135,7 @@ public void testEmptyMap() throws Exception { writer.write(record); } - try(AvroParquetReader reader = new AvroParquetReader(testConf, file)) { - GenericRecord nextRecord = reader.read(); - - assertNotNull(nextRecord); - assertEquals(emptyMap, nextRecord.get("mymap")); - } - } - - @Test - public void testEmptyMapLocal() throws Exception { - Schema schema = new Schema.Parser().parse( - Resources.getResource("map.avsc").openStream()); - - java.nio.file.Path file = Paths.get(createTempFile().getPath()); - ImmutableMap emptyMap = new ImmutableMap.Builder().build(); - - try(ParquetWriter writer = AvroParquetWriter - .builder(new LocalOutputFile(file)) - .withSchema(schema) - .withConf(testConf) - .build()) { - - // Write a record with an empty map. - GenericData.Record record = new GenericRecordBuilder(schema) - .set("mymap", emptyMap).build(); - writer.write(record); - } - - try (ParquetReader reader = AvroParquetReader - .builder(new LocalInputFile(file)) - .withDataModel(GenericData.get()) - .withConf(testConf) - .build()) { - + try(ParquetReader reader = reader(file)) { GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); @@ -773,12 +702,10 @@ public void testDuplicatedValuesWithDictionary() throws Exception { public void testNestedLists() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("nested_array.avsc").openStream()); - Path file = new Path(createTempFile().getPath()); + String file = createTempFile().getPath(); // Parquet writer - ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(schema) - .withConf(testConf) - .build(); + ParquetWriter parquetWriter = writer(file, schema); Schema innerRecordSchema = schema.getField("l1").schema().getTypes() .get(1).getElementType().getTypes().get(1); @@ -792,7 +719,7 @@ public void testNestedLists() throws Exception { parquetWriter.write(record); parquetWriter.close(); - AvroParquetReader reader = new AvroParquetReader(testConf, file); + ParquetReader reader = reader(file); GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); @@ -803,47 +730,6 @@ public void testNestedLists() throws Exception { assertEquals(str("hello"), l2List.get(0)); } - @Test - public void testNestedListsLocal() throws Exception { - Schema schema = new Schema.Parser().parse( - Resources.getResource("nested_array.avsc").openStream()); - java.nio.file.Path file = Paths.get(createTempFile().getPath()); - - // Parquet writer - try (ParquetWriter parquetWriter = AvroParquetWriter.builder(new LocalOutputFile(file)) - .withSchema(schema) - .withConf(testConf) - .build()) { - - Schema innerRecordSchema = schema.getField("l1").schema().getTypes() - .get(1).getElementType().getTypes().get(1); - - GenericRecord record = new GenericRecordBuilder(schema) - .set("l1", Collections.singletonList( - new GenericRecordBuilder(innerRecordSchema).set("l2", Collections.singletonList("hello")).build() - )) - .build(); - - parquetWriter.write(record); - } - - try (ParquetReader reader = AvroParquetReader - .builder(new LocalInputFile(file)) - .withDataModel(GenericData.get()) - .withConf(testConf) - .build()) { - - GenericRecord nextRecord = reader.read(); - - assertNotNull(nextRecord); - assertNotNull(nextRecord.get("l1")); - List l1List = (List) nextRecord.get("l1"); - assertNotNull(l1List.get(0)); - List l2List = (List) ((GenericRecord) l1List.get(0)).get("l2"); - assertEquals(new Utf8("hello"), l2List.get(0)); - } - } - /** * A test demonstrating the most simple way to write and read Parquet files * using Avro {@link GenericRecord}. @@ -977,6 +863,34 @@ private File createTempFile() throws IOException { return tmp; } + private ParquetWriter writer(String file, Schema schema) throws IOException { + if (local) { + return AvroParquetWriter + .builder(new LocalOutputFile(Paths.get(file))) + .withSchema(schema) + .withConf(testConf) + .build(); + } else { + return AvroParquetWriter + .builder(new Path(file)) + .withSchema(schema) + .withConf(testConf) + .build(); + } + } + + private ParquetReader reader(String file) throws IOException { + if (local) { + return AvroParquetReader + .builder(new LocalInputFile(Paths.get(file))) + .withDataModel(GenericData.get()) + .withConf(testConf) + .build(); + } else { + return new AvroParquetReader(testConf, new Path(file)); + } + } + /** * Return a String or Utf8 depending on whether compatibility is on */ From 912547d6807c23a4ace67d643105228a4c0cb4bc Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sat, 24 Jun 2023 21:27:31 +0200 Subject: [PATCH 17/18] Update test --- .../test/java/org/apache/parquet/io/TestLocalInputOutput.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java index c600249255..d13cc7f713 100644 --- a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -51,7 +51,8 @@ public void outputFileOverwritesFile() throws IOException { public void outputFileCreateFailsAsFileAlreadyExists() throws IOException { Path path = Paths.get(createTempFile().getPath()); OutputFile write = new LocalOutputFile(path); - assertThrows(FileAlreadyExistsException.class, () -> write.create(512)); + write.create(512).close(); + assertThrows(FileAlreadyExistsException.class, () -> write.create(512).close()); } @Test From 2ed43c6c81a83e028aeda93e35cea12bbc6be743 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sat, 24 Jun 2023 23:14:03 +0200 Subject: [PATCH 18/18] Fix parameterization --- .../test/java/org/apache/parquet/avro/TestReadWrite.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 0a08d22100..81e751aba5 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -77,9 +77,9 @@ public class TestReadWrite { @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { - { false, false }, // use the new converters - { true, false }, // use the old converters - { true, true } }; // use a local disk location + { false, false }, // use the new converters + { true, false }, // use the old converters + { false, true } }; // use a local disk location return Arrays.asList(data); }