From 29c49d819ec1516ee72126774fb43fe24791e48a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 3 Jun 2022 07:32:25 -0700 Subject: [PATCH 1/3] Optionally disable `validate_decimal_precision` check in `DecimalBuilder.append_value` for interop test (#1767) * Remove precision check * Remove test * Fix clippy * Move decimal precision check to ArrayData full validation and add test * Increase precision to pass existing test * Fix clippy * Fix tests * Add disable_value_validation to optionally disable value validation at DecimalBuilder * Simplify validation code * Update test --- arrow/src/array/array_binary.rs | 28 +++++++++++------ arrow/src/array/builder.rs | 28 ++++++++++++++--- arrow/src/array/data.rs | 54 ++++++++++++++++++++++++++++++-- arrow/src/array/transform/mod.rs | 3 ++ arrow/src/csv/reader.rs | 4 +-- arrow/src/ffi.rs | 1 + arrow/src/ipc/reader.rs | 2 ++ arrow/src/ipc/writer.rs | 2 ++ integration-testing/src/lib.rs | 4 +++ 9 files changed, 108 insertions(+), 18 deletions(-) diff --git a/arrow/src/array/array_binary.rs b/arrow/src/array/array_binary.rs index a3ab4aeaa115..af55854a5319 100644 --- a/arrow/src/array/array_binary.rs +++ b/arrow/src/array/array_binary.rs @@ -1480,7 +1480,7 @@ mod tests { 192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 36, 75, 238, 253, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, ]; - let array_data = ArrayData::builder(DataType::Decimal(23, 6)) + let array_data = ArrayData::builder(DataType::Decimal(38, 6)) .len(2) .add_buffer(Buffer::from(&values[..])) .build() @@ -1492,6 +1492,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn test_decimal_append_error_value() { let mut decimal_builder = DecimalBuilder::new(10, 5, 3); let mut result = decimal_builder.append_value(123456); @@ -1500,9 +1501,15 @@ mod tests { "Invalid argument error: 123456 is too large to store in a Decimal of precision 5. Max is 99999", error.to_string() ); + + unsafe { + decimal_builder.disable_value_validation(); + } + result = decimal_builder.append_value(123456); + assert!(result.is_ok()); decimal_builder.append_value(12345).unwrap(); let arr = decimal_builder.finish(); - assert_eq!("12.345", arr.value_as_string(0)); + assert_eq!("12.345", arr.value_as_string(1)); decimal_builder = DecimalBuilder::new(10, 2, 1); result = decimal_builder.append_value(100); @@ -1511,18 +1518,21 @@ mod tests { "Invalid argument error: 100 is too large to store in a Decimal of precision 2. Max is 99", error.to_string() ); + + unsafe { + decimal_builder.disable_value_validation(); + } + result = decimal_builder.append_value(100); + assert!(result.is_ok()); decimal_builder.append_value(99).unwrap(); result = decimal_builder.append_value(-100); - error = result.unwrap_err(); - assert_eq!( - "Invalid argument error: -100 is too small to store in a Decimal of precision 2. Min is -99", - error.to_string() - ); + assert!(result.is_ok()); decimal_builder.append_value(-99).unwrap(); let arr = decimal_builder.finish(); - assert_eq!("9.9", arr.value_as_string(0)); - assert_eq!("-9.9", arr.value_as_string(1)); + assert_eq!("9.9", arr.value_as_string(1)); + assert_eq!("-9.9", arr.value_as_string(3)); } + #[test] fn test_decimal_from_iter_values() { let array = DecimalArray::from_iter_values(vec![-100, 0, 101].into_iter()); diff --git a/arrow/src/array/builder.rs b/arrow/src/array/builder.rs index e22a6f81ed8f..041b7a92c33f 100644 --- a/arrow/src/array/builder.rs +++ b/arrow/src/array/builder.rs @@ -1165,6 +1165,10 @@ pub struct DecimalBuilder { builder: FixedSizeListBuilder, precision: usize, scale: usize, + + /// Should i128 values be validated for compatibility with scale and precision? + /// defaults to true + value_validation: bool, } impl ArrayBuilder for GenericBinaryBuilder { @@ -1455,16 +1459,32 @@ impl DecimalBuilder { builder: FixedSizeListBuilder::new(values_builder, byte_width), precision, scale, + value_validation: true, } } + /// Disable validation + /// + /// # Safety + /// + /// After disabling validation, caller must ensure that appended values are compatible + /// for the specified precision and scale. + pub unsafe fn disable_value_validation(&mut self) { + self.value_validation = false; + } + /// Appends a byte slice into the builder. /// /// Automatically calls the `append` method to delimit the slice appended in as a /// distinct array element. #[inline] pub fn append_value(&mut self, value: i128) -> Result<()> { - let value = validate_decimal_precision(value, self.precision)?; + let value = if self.value_validation { + validate_decimal_precision(value, self.precision)? + } else { + value + }; + let value_as_bytes = Self::from_i128_to_fixed_size_bytes( value, self.builder.value_length() as usize, @@ -1480,7 +1500,7 @@ impl DecimalBuilder { self.builder.append(true) } - fn from_i128_to_fixed_size_bytes(v: i128, size: usize) -> Result> { + pub(crate) fn from_i128_to_fixed_size_bytes(v: i128, size: usize) -> Result> { if size > 16 { return Err(ArrowError::InvalidArgumentError( "DecimalBuilder only supports values up to 16 bytes.".to_string(), @@ -3420,14 +3440,14 @@ mod tests { #[test] fn test_decimal_builder() { - let mut builder = DecimalBuilder::new(30, 23, 6); + let mut builder = DecimalBuilder::new(30, 38, 6); builder.append_value(8_887_000_000).unwrap(); builder.append_null().unwrap(); builder.append_value(-8_887_000_000).unwrap(); let decimal_array: DecimalArray = builder.finish(); - assert_eq!(&DataType::Decimal(23, 6), decimal_array.data_type()); + assert_eq!(&DataType::Decimal(38, 6), decimal_array.data_type()); assert_eq!(3, decimal_array.len()); assert_eq!(1, decimal_array.null_count()); assert_eq!(32, decimal_array.value_offset(2)); diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs index cb6b894a058d..fcf89f473fda 100644 --- a/arrow/src/array/data.rs +++ b/arrow/src/array/data.rs @@ -18,7 +18,7 @@ //! Contains `ArrayData`, a generic representation of Arrow array data which encapsulates //! common attributes and operations for Arrow array. -use crate::datatypes::{DataType, IntervalUnit, UnionMode}; +use crate::datatypes::{validate_decimal_precision, DataType, IntervalUnit, UnionMode}; use crate::error::{ArrowError, Result}; use crate::{bitmap::Bitmap, datatypes::ArrowNativeType}; use crate::{ @@ -999,6 +999,21 @@ impl ArrayData { pub fn validate_dictionary_offset(&self) -> Result<()> { match &self.data_type { + DataType::Decimal(p, _) => { + let values_buffer = &self.buffers[0]; + + for pos in 0..values_buffer.len() { + let raw_val = unsafe { + std::slice::from_raw_parts( + values_buffer.as_ptr().add(pos), + 16_usize, + ) + }; + let value = i128::from_le_bytes(raw_val.try_into().unwrap()); + validate_decimal_precision(value, *p)?; + } + Ok(()) + } DataType::Utf8 => self.validate_utf8::(), DataType::LargeUtf8 => self.validate_utf8::(), DataType::Binary => self.validate_offsets_full::(self.buffers[1].len()), @@ -1492,8 +1507,9 @@ mod tests { use std::ptr::NonNull; use crate::array::{ - make_array, Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, - StringArray, StructBuilder, UInt64Array, + make_array, Array, BooleanBuilder, DecimalBuilder, FixedSizeListBuilder, + Int32Array, Int32Builder, Int64Array, StringArray, StructBuilder, UInt64Array, + UInt8Builder, }; use crate::buffer::Buffer; use crate::datatypes::Field; @@ -2707,4 +2723,36 @@ mod tests { assert_eq!(array, &expected); } + + #[test] + #[cfg(not(feature = "force_validate"))] + fn test_decimal_full_validation() { + let values_builder = UInt8Builder::new(10); + let byte_width = 16; + let mut fixed_size_builder = + FixedSizeListBuilder::new(values_builder, byte_width); + let value_as_bytes = DecimalBuilder::from_i128_to_fixed_size_bytes( + 123456, + fixed_size_builder.value_length() as usize, + ) + .unwrap(); + fixed_size_builder + .values() + .append_slice(value_as_bytes.as_slice()) + .unwrap(); + fixed_size_builder.append(true).unwrap(); + let fixed_size_array = fixed_size_builder.finish(); + + // Build ArrayData for Decimal + let builder = ArrayData::builder(DataType::Decimal(5, 3)) + .len(fixed_size_array.len()) + .add_buffer(fixed_size_array.data_ref().child_data()[0].buffers()[0].clone()); + let array_data = unsafe { builder.build_unchecked() }; + let validation_result = array_data.validate_full(); + let error = validation_result.unwrap_err(); + assert_eq!( + "Invalid argument error: 123456 is too large to store in a Decimal of precision 5. Max is 99999", + error.to_string() + ); + } } diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs index 4d5575b47533..2cad8af0d9c2 100644 --- a/arrow/src/array/transform/mod.rs +++ b/arrow/src/array/transform/mod.rs @@ -693,6 +693,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn test_decimal() { let decimal_array = create_decimal_array(&[Some(1), Some(2), None, Some(3)], 10, 3); @@ -706,6 +707,7 @@ mod tests { assert_eq!(array, expected); } #[test] + #[cfg(not(feature = "force_validate"))] fn test_decimal_offset() { let decimal_array = create_decimal_array(&[Some(1), Some(2), None, Some(3)], 10, 3); @@ -720,6 +722,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn test_decimal_null_offset_nulls() { let decimal_array = create_decimal_array(&[Some(1), Some(2), None, Some(3)], 10, 3); diff --git a/arrow/src/csv/reader.rs b/arrow/src/csv/reader.rs index ae9f3dd229c5..21e107ee4c8e 100644 --- a/arrow/src/csv/reader.rs +++ b/arrow/src/csv/reader.rs @@ -1204,8 +1204,8 @@ mod tests { fn test_csv_reader_with_decimal() { let schema = Schema::new(vec![ Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Decimal(26, 6), false), - Field::new("lng", DataType::Decimal(26, 6), false), + Field::new("lat", DataType::Decimal(38, 6), false), + Field::new("lng", DataType::Decimal(38, 6), false), ]); let file = File::open("test/data/decimal_test.csv").unwrap(); diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 4ab929829bfd..8eb28837ce57 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -907,6 +907,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn test_decimal_round_trip() -> Result<()> { // create an array natively let original_array = [Some(12345_i128), Some(-12345_i128), None] diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 41c0c3293ac0..03a960c4c670 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -1154,6 +1154,7 @@ mod tests { use crate::{datatypes, util::integration_util::*}; #[test] + #[cfg(not(feature = "force_validate"))] fn read_generated_files_014() { let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; @@ -1274,6 +1275,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn read_generated_streams_014() { let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index ffeeadc9d99c..c42c0fd97e7d 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -1078,6 +1078,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn read_and_rewrite_generated_files_014() { let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; @@ -1130,6 +1131,7 @@ mod tests { } #[test] + #[cfg(not(feature = "force_validate"))] fn read_and_rewrite_generated_streams_014() { let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; diff --git a/integration-testing/src/lib.rs b/integration-testing/src/lib.rs index 90537242a11f..c7796ece4c73 100644 --- a/integration-testing/src/lib.rs +++ b/integration-testing/src/lib.rs @@ -593,6 +593,10 @@ fn array_from_json( } DataType::Decimal(precision, scale) => { let mut b = DecimalBuilder::new(json_col.count, *precision, *scale); + // C++ interop tests involve incompatible decimal values + unsafe { + b.disable_value_validation(); + } for (is_valid, value) in json_col .validity .as_ref() From e5e2cf394d473d66a768f3f674571f67e1e37fca Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 3 Jun 2022 15:35:59 +0100 Subject: [PATCH 2/3] Update to actions/cache@v3 and actions/setup-python@v3 (#1778) * Update to actions/cache@v3 * Update setup-python --- .github/workflows/integration.yml | 8 +++---- .github/workflows/rust.yml | 36 +++++++++++++++---------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 41b1dcbe8eb9..7eed6b8e94c9 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -39,7 +39,7 @@ jobs: path: rust fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v3 with: python-version: 3.8 - name: Setup Archery @@ -64,17 +64,17 @@ jobs: rustup default ${{ matrix.rust }} rustup component add rustfmt clippy - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /home/runner/.cargo key: cargo-maturin-cache- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /home/runner/target # this key is not equal because maturin uses different compilation flags. key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}- - - uses: actions/setup-python@v2 + - uses: actions/setup-python@v3 with: python-version: '3.7' - name: Upgrade pip and setuptools diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a0456b365297..7f5996290577 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -41,14 +41,14 @@ jobs: steps: - uses: actions/checkout@v2 - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: # these represent dependencies downloaded by cargo # and thus do not depend on the OS, arch nor rust version. path: /github/home/.cargo key: cargo-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: # these represent compiled steps of both dependencies and arrow # and thus are specific for a particular OS, arch and rust version. @@ -86,13 +86,13 @@ jobs: with: submodules: true - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo # this key equals the ones on `linux-build-lib` for re-use key: cargo-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target # this key equals the ones on `linux-build-lib` for re-use @@ -154,12 +154,12 @@ jobs: with: submodules: true - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo key: cargo-nightly-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target key: ${{ runner.os }}-${{ matrix.arch }}-target-nightly-cache3-${{ matrix.rust }} @@ -226,13 +226,13 @@ jobs: with: submodules: true - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo # this key equals the ones on `linux-build-lib` for re-use key: cargo-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target # this key equals the ones on `linux-build-lib` for re-use @@ -268,13 +268,13 @@ jobs: with: submodules: true - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo # this key equals the ones on `linux-build-lib` for re-use key: cargo-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target # this key equals the ones on `linux-build-lib` for re-use @@ -321,13 +321,13 @@ jobs: rustup default ${{ matrix.rust }} rustup component add rustfmt clippy - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /home/runner/.cargo # this key is not equal because the user is different than on a container (runner vs github) key: cargo-coverage-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /home/runner/target # this key is not equal because coverage uses different compilation flags. @@ -369,12 +369,12 @@ jobs: with: submodules: true - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo key: cargo-wasm32-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target key: ${{ runner.os }}-${{ matrix.arch }}-target-wasm32-cache3-${{ matrix.rust }} @@ -416,12 +416,12 @@ jobs: apt update apt install -y libpython3.9-dev - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo key: cargo-nightly-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target key: ${{ runner.os }}-${{ matrix.arch }}-target-nightly-cache3-${{ matrix.rust }} @@ -453,13 +453,13 @@ jobs: steps: - uses: actions/checkout@v2 - name: Cache Cargo - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/.cargo # this key equals the ones on `linux-build-lib` for re-use key: cargo-cache3- - name: Cache Rust dependencies - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /github/home/target # this key equals the ones on `linux-build-lib` for re-use From eb706b76f1dbbe017c7081ba284979d19e2cc70b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 3 Jun 2022 15:37:22 +0100 Subject: [PATCH 3/3] Implement ChunkReader for Bytes (#1775) Deprecate SliceableCursor --- parquet/src/arrow/arrow_reader.rs | 4 ++-- parquet/src/arrow/arrow_writer.rs | 3 ++- parquet/src/file/footer.rs | 10 ++++------ parquet/src/file/serialized_reader.rs | 27 ++++++++++++++++++++++++++- parquet/src/file/writer.rs | 9 ++++----- parquet/src/util/cursor.rs | 7 +++++++ 6 files changed, 45 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 23e2398e0f88..4cc7294f675d 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -248,6 +248,7 @@ impl ParquetRecordBatchReader { #[cfg(test)] mod tests { + use bytes::Bytes; use std::cmp::min; use std::convert::TryFrom; use std::fs::File; @@ -285,7 +286,6 @@ mod tests { use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; - use crate::util::cursor::SliceableCursor; use crate::util::test_common::RandGen; #[test] @@ -1162,7 +1162,7 @@ mod tests { 114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49, ]; - let file = SliceableCursor::new(data); + let file = Bytes::from(data); let file_reader = SerializedFileReader::new(file).unwrap(); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 530dfe2ad090..ceeddfef5d18 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -689,6 +689,7 @@ fn get_fsb_array_slice( mod tests { use super::*; + use bytes::Bytes; use std::fs::File; use std::sync::Arc; @@ -750,7 +751,7 @@ mod tests { writer.close().unwrap(); } - let cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); + let cursor = Bytes::from(buffer); let reader = SerializedFileReader::new(cursor).unwrap(); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index db8a23d8ebca..76461358681f 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -160,11 +160,11 @@ fn parse_column_orders( #[cfg(test)] mod tests { use super::*; + use bytes::Bytes; use crate::basic::SortOrder; use crate::basic::Type; use crate::schema::types::Type as SchemaType; - use crate::util::cursor::SliceableCursor; use parquet_format::TypeDefinedOrder; #[test] @@ -180,7 +180,7 @@ mod tests { #[test] fn test_parse_metadata_corrupt_footer() { - let data = SliceableCursor::new(Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8])); + let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]); let reader_result = parse_metadata(&data); assert!(reader_result.is_err()); assert_eq!( @@ -191,8 +191,7 @@ mod tests { #[test] fn test_parse_metadata_invalid_length() { - let test_file = - SliceableCursor::new(Arc::new(vec![0, 0, 0, 255, b'P', b'A', b'R', b'1'])); + let test_file = Bytes::from(vec![0, 0, 0, 255, b'P', b'A', b'R', b'1']); let reader_result = parse_metadata(&test_file); assert!(reader_result.is_err()); assert_eq!( @@ -205,8 +204,7 @@ mod tests { #[test] fn test_parse_metadata_invalid_start() { - let test_file = - SliceableCursor::new(Arc::new(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1'])); + let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); let reader_result = parse_metadata(&test_file); assert!(reader_result.is_err()); assert_eq!( diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1dd374ef85c3..22f6f4a73694 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -18,6 +18,7 @@ //! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) +use bytes::{Buf, Bytes}; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; use parquet_format::{PageHeader, PageType}; @@ -36,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr}; // export `SliceableCursor` and `FileSource` publically so clients can // re-use the logic in their own ParquetFileWriter wrappers +#[allow(deprecated)] pub use crate::util::{cursor::SliceableCursor, io::FileSource}; // ---------------------------------------------------------------------- @@ -61,12 +63,35 @@ impl ChunkReader for File { } } +impl Length for Bytes { + fn len(&self) -> u64 { + self.len() as u64 + } +} + +impl TryClone for Bytes { + fn try_clone(&self) -> std::io::Result { + Ok(self.clone()) + } +} + +impl ChunkReader for Bytes { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64, length: usize) -> Result { + let start = start as usize; + Ok(self.slice(start..start + length).reader()) + } +} + +#[allow(deprecated)] impl Length for SliceableCursor { fn len(&self) -> u64 { SliceableCursor::len(self) } } +#[allow(deprecated)] impl ChunkReader for SliceableCursor { type T = SliceableCursor; @@ -521,7 +546,7 @@ mod tests { get_test_file("alltypes_plain.parquet") .read_to_end(&mut buf) .unwrap(); - let cursor = SliceableCursor::new(buf); + let cursor = Bytes::from(buf); let read_from_cursor = SerializedFileReader::new(cursor).unwrap(); let test_file = get_test_file("alltypes_plain.parquet"); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 646550dcb6be..3108baddefa9 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -541,6 +541,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { mod tests { use super::*; + use bytes::Bytes; use std::{fs::File, io::Cursor}; use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; @@ -1054,7 +1055,7 @@ mod tests { } fn test_bytes_roundtrip(data: Vec>) { - let mut cursor = Cursor::new(vec![]); + let mut buffer = vec![]; let schema = Arc::new( types::Type::group_type_builder("schema") @@ -1072,7 +1073,7 @@ mod tests { { let props = Arc::new(WriterProperties::builder().build()); let mut writer = - SerializedFileWriter::new(&mut cursor, schema, props).unwrap(); + SerializedFileWriter::new(&mut buffer, schema, props).unwrap(); for subset in &data { let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1089,9 +1090,7 @@ mod tests { writer.close().unwrap(); } - let buffer = cursor.into_inner(); - - let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); + let reading_cursor = Bytes::from(buffer); let reader = SerializedFileReader::new(reading_cursor).unwrap(); assert_eq!(reader.num_row_groups(), data.len()); diff --git a/parquet/src/util/cursor.rs b/parquet/src/util/cursor.rs index ff7067fcbcad..706724dbf52a 100644 --- a/parquet/src/util/cursor.rs +++ b/parquet/src/util/cursor.rs @@ -26,6 +26,7 @@ use std::{cmp, fmt}; /// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when /// returning such a cursor. #[allow(clippy::rc_buffer)] +#[deprecated = "use bytes::Bytes instead"] pub struct SliceableCursor { inner: Arc>, start: u64, @@ -33,6 +34,7 @@ pub struct SliceableCursor { pos: u64, } +#[allow(deprecated)] impl fmt::Debug for SliceableCursor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SliceableCursor") @@ -44,6 +46,7 @@ impl fmt::Debug for SliceableCursor { } } +#[allow(deprecated)] impl SliceableCursor { pub fn new(content: impl Into>>) -> Self { let inner = content.into(); @@ -90,6 +93,7 @@ impl SliceableCursor { } /// Implementation inspired by std::io::Cursor +#[allow(deprecated)] impl Read for SliceableCursor { fn read(&mut self, buf: &mut [u8]) -> io::Result { let n = Read::read(&mut self.remaining_slice(), buf)?; @@ -98,6 +102,7 @@ impl Read for SliceableCursor { } } +#[allow(deprecated)] impl Seek for SliceableCursor { fn seek(&mut self, pos: SeekFrom) -> io::Result { let new_pos = match pos { @@ -204,12 +209,14 @@ mod tests { use super::*; /// Create a SliceableCursor of all u8 values in ascending order + #[allow(deprecated)] fn get_u8_range() -> SliceableCursor { let data: Vec = (0u8..=255).collect(); SliceableCursor::new(data) } /// Reads all the bytes in the slice and checks that it matches the u8 range from start to end_included + #[allow(deprecated)] fn check_read_all(mut cursor: SliceableCursor, start: u8, end_included: u8) { let mut target = vec![]; let cursor_res = cursor.read_to_end(&mut target);