diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index da56c23b5cd9..d3b2526740fa 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -67,8 +67,8 @@ jobs: run: cargo test -p arrow-data --all-features - name: Test arrow-schema with all features run: cargo test -p arrow-schema --all-features - - name: Test arrow-array with all features except SIMD - run: cargo test -p arrow-array + - name: Test arrow-array with all features + run: cargo test -p arrow-array --all-features - name: Test arrow-select with all features run: cargo test -p arrow-select --all-features - name: Test arrow-cast with all features @@ -85,15 +85,15 @@ jobs: run: cargo test -p arrow-string --all-features - name: Test arrow-ord with all features run: cargo test -p arrow-ord --all-features - - name: Test arrow-arith with all features except SIMD - run: cargo test -p arrow-arith + - name: Test arrow-arith with all features + run: cargo test -p arrow-arith --all-features - name: Test arrow-row with all features run: cargo test -p arrow-row --all-features - name: Test arrow-integration-test with all features run: cargo test -p arrow-integration-test --all-features - name: Test arrow with default features run: cargo test -p arrow - - name: Test arrow with all features apart from simd + - name: Test arrow with all features except pyarrow run: cargo test -p arrow --features=force_validate,prettyprint,ipc_compression,ffi,chrono-tz - name: Run examples run: | @@ -132,29 +132,6 @@ jobs: - name: Check compilation --no-default-features --all-targets --features chrono-tz run: cargo check -p arrow --no-default-features --all-targets --features chrono-tz - # test the --features "simd" of the arrow crate. This requires nightly Rust. - linux-test-simd: - name: Test SIMD on AMD64 Rust ${{ matrix.rust }} - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - with: - rust-version: nightly - - name: Test arrow-array with SIMD - run: cargo test -p arrow-array --features simd - - name: Test arrow-arith with SIMD - run: cargo test -p arrow-arith --features simd - - name: Test arrow with SIMD - run: cargo test -p arrow --features simd - - name: Check compilation --features simd --all-targets - run: cargo check -p arrow --features simd --all-targets - # test the arrow crate builds against wasm32 in nightly rust wasm32-build: @@ -169,12 +146,11 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: nightly target: wasm32-unknown-unknown,wasm32-wasi - name: Build wasm32-unknown-unknown - run: cargo build -p arrow --no-default-features --features=json,csv,ipc,simd,ffi --target wasm32-unknown-unknown + run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-unknown-unknown - name: Build wasm32-wasi - run: cargo build -p arrow --no-default-features --features=json,csv,ipc,simd,ffi --target wasm32-wasi + run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-wasi clippy: name: Clippy @@ -193,8 +169,8 @@ jobs: run: cargo clippy -p arrow-data --all-targets --all-features -- -D warnings - name: Clippy arrow-schema with all features run: cargo clippy -p arrow-schema --all-targets --all-features -- -D warnings - - name: Clippy arrow-array with all features except SIMD - run: cargo clippy -p arrow-array --all-targets -- -D warnings + - name: Clippy arrow-array with all features + run: cargo clippy -p arrow-array --all-targets --all-features -- -D warnings - name: Clippy arrow-select with all features run: cargo clippy -p arrow-select --all-targets --all-features -- -D warnings - name: Clippy arrow-cast with all features @@ -211,12 +187,12 @@ jobs: run: cargo clippy -p arrow-string --all-targets --all-features -- -D warnings - name: Clippy arrow-ord with all features run: cargo clippy -p arrow-ord --all-targets --all-features -- -D warnings - - name: Clippy arrow-arith with all features except SIMD - run: cargo clippy -p arrow-arith --all-targets -- -D warnings + - name: Clippy arrow-arith with all features + run: cargo clippy -p arrow-arith --all-targets --all-features -- -D warnings - name: Clippy arrow-row with all features run: cargo clippy -p arrow-row --all-targets --all-features -- -D warnings - - name: Clippy arrow with all features except SIMD - run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,chrono-tz --all-targets -- -D warnings + - name: Clippy arrow with all features + run: cargo clippy -p arrow --all-features --all-targets -- -D warnings - name: Clippy arrow-integration-test with all features run: cargo clippy -p arrow-integration-test --all-targets --all-features -- -D warnings - name: Clippy arrow-integration-testing with all features diff --git a/.github/workflows/miri.sh b/.github/workflows/miri.sh index ec8712660c74..5057c876b952 100755 --- a/.github/workflows/miri.sh +++ b/.github/workflows/miri.sh @@ -14,5 +14,5 @@ cargo miri test -p arrow-buffer cargo miri test -p arrow-data --features ffi cargo miri test -p arrow-schema --features ffi cargo miri test -p arrow-array -cargo miri test -p arrow-arith --features simd +cargo miri test -p arrow-arith cargo miri test -p arrow-ord diff --git a/arrow-arith/Cargo.toml b/arrow-arith/Cargo.toml index 57dc033e9645..d2ee0b9e2c72 100644 --- a/arrow-arith/Cargo.toml +++ b/arrow-arith/Cargo.toml @@ -43,6 +43,3 @@ half = { version = "2.1", default-features = false } num = { version = "0.4", default-features = false, features = ["std"] } [dev-dependencies] - -[features] -simd = ["arrow-array/simd"] diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index 4f7ab24f9708..04eec8df6379 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -49,10 +49,6 @@ chrono-tz = { version = "0.8", optional = true } num = { version = "0.4.1", default-features = false, features = ["std"] } half = { version = "2.1", default-features = false, features = ["num-traits"] } hashbrown = { version = "0.14", default-features = false } -packed_simd = { version = "0.3.9", default-features = false, optional = true } - -[features] -simd = ["packed_simd"] [dev-dependencies] rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } diff --git a/arrow-array/src/array/boolean_array.rs b/arrow-array/src/array/boolean_array.rs index a778dc92ea35..fe374d965714 100644 --- a/arrow-array/src/array/boolean_array.rs +++ b/arrow-array/src/array/boolean_array.rs @@ -254,6 +254,11 @@ impl BooleanArray { }); Self::new(values, nulls) } + + /// Deconstruct this array into its constituent parts + pub fn into_parts(self) -> (BooleanBuffer, Option) { + (self.values, self.nulls) + } } impl Array for BooleanArray { @@ -618,4 +623,21 @@ mod tests { assert_eq!(b.false_count(), expected_false); } } + + #[test] + fn test_into_parts() { + let boolean_array = [Some(true), None, Some(false)] + .into_iter() + .collect::(); + let (values, nulls) = boolean_array.into_parts(); + assert_eq!(values.values(), &[0b0000_0001]); + assert!(nulls.is_some()); + assert_eq!(nulls.unwrap().buffer().as_slice(), &[0b0000_0101]); + + let boolean_array = + BooleanArray::from(vec![false, false, false, false, false, false, false, true]); + let (values, nulls) = boolean_array.into_parts(); + assert_eq!(values.values(), &[0b1000_0000]); + assert!(nulls.is_none()); + } } diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 1112acacfcd9..2296cebd4681 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -352,12 +352,18 @@ pub type Time64MicrosecondArray = PrimitiveArray; pub type Time64NanosecondArray = PrimitiveArray; /// A [`PrimitiveArray`] of “calendar” intervals in months +/// +/// See [`IntervalYearMonthType`] for details on representation and caveats. pub type IntervalYearMonthArray = PrimitiveArray; /// A [`PrimitiveArray`] of “calendar” intervals in days and milliseconds +/// +/// See [`IntervalDayTimeType`] for details on representation and caveats. pub type IntervalDayTimeArray = PrimitiveArray; -/// A [`PrimitiveArray`] of “calendar” intervals in months, days, and nanoseconds +/// A [`PrimitiveArray`] of “calendar” intervals in months, days, and nanoseconds. +/// +/// See [`IntervalMonthDayNanoType`] for details on representation and caveats. pub type IntervalMonthDayNanoArray = PrimitiveArray; /// A [`PrimitiveArray`] of elapsed durations in seconds diff --git a/arrow-array/src/numeric.rs b/arrow-array/src/numeric.rs index b5e474ba696a..a3cd7bde5d36 100644 --- a/arrow-array/src/numeric.rs +++ b/arrow-array/src/numeric.rs @@ -15,621 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::types::*; use crate::ArrowPrimitiveType; -#[cfg(feature = "simd")] -use packed_simd::*; -#[cfg(feature = "simd")] -use std::ops::{Add, BitAnd, BitAndAssign, BitOr, BitOrAssign, Div, Mul, Not, Rem, Sub}; /// A subtype of primitive type that represents numeric values. -/// -/// SIMD operations are defined in this trait if available on the target system. -#[cfg(feature = "simd")] -pub trait ArrowNumericType: ArrowPrimitiveType -where - Self::Simd: Add - + Sub - + Mul - + Div - + Rem - + Copy, - Self::SimdMask: BitAnd - + BitOr - + BitAndAssign - + BitOrAssign - + Not - + Copy, -{ - /// Defines the SIMD type that should be used for this numeric type - type Simd; - - /// Defines the SIMD Mask type that should be used for this numeric type - type SimdMask; - - /// The number of SIMD lanes available - fn lanes() -> usize; - - /// Initializes a SIMD register to a constant value - fn init(value: Self::Native) -> Self::Simd; - - /// Loads a slice into a SIMD register - fn load(slice: &[Self::Native]) -> Self::Simd; - - /// Creates a new SIMD mask for this SIMD type filling it with `value` - fn mask_init(value: bool) -> Self::SimdMask; - - /// Creates a new SIMD mask for this SIMD type from the lower-most bits of the given `mask`. - /// The number of bits used corresponds to the number of lanes of this type - fn mask_from_u64(mask: u64) -> Self::SimdMask; - - /// Creates a bitmask from the given SIMD mask. - /// Each bit corresponds to one vector lane, starting with the least-significant bit. - fn mask_to_u64(mask: &Self::SimdMask) -> u64; - - /// Gets the value of a single lane in a SIMD mask - fn mask_get(mask: &Self::SimdMask, idx: usize) -> bool; - - /// Sets the value of a single lane of a SIMD mask - fn mask_set(mask: Self::SimdMask, idx: usize, value: bool) -> Self::SimdMask; - - /// Selects elements of `a` and `b` using `mask` - fn mask_select(mask: Self::SimdMask, a: Self::Simd, b: Self::Simd) -> Self::Simd; - - /// Returns `true` if any of the lanes in the mask are `true` - fn mask_any(mask: Self::SimdMask) -> bool; - - /// Performs a SIMD binary operation - fn bin_op Self::Simd>( - left: Self::Simd, - right: Self::Simd, - op: F, - ) -> Self::Simd; - - /// SIMD version of equal - fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; - - /// SIMD version of not equal - fn ne(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; - - /// SIMD version of less than - fn lt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; - - /// SIMD version of less than or equal to - fn le(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; - - /// SIMD version of greater than - fn gt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; - - /// SIMD version of greater than or equal to - fn ge(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; - - /// Writes a SIMD result back to a slice - fn write(simd_result: Self::Simd, slice: &mut [Self::Native]); - - /// Performs a SIMD unary operation - fn unary_op Self::Simd>(a: Self::Simd, op: F) -> Self::Simd; -} - -/// A subtype of primitive type that represents numeric values. -#[cfg(not(feature = "simd"))] pub trait ArrowNumericType: ArrowPrimitiveType {} -macro_rules! make_numeric_type { - ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident) => { - #[cfg(feature = "simd")] - impl ArrowNumericType for $impl_ty { - type Simd = $simd_ty; - - type SimdMask = $simd_mask_ty; - - #[inline] - fn lanes() -> usize { - Self::Simd::lanes() - } - - #[inline] - fn init(value: Self::Native) -> Self::Simd { - Self::Simd::splat(value) - } - - #[inline] - fn load(slice: &[Self::Native]) -> Self::Simd { - unsafe { Self::Simd::from_slice_unaligned_unchecked(slice) } - } - - #[inline] - fn mask_init(value: bool) -> Self::SimdMask { - Self::SimdMask::splat(value) - } - - #[inline] - fn mask_from_u64(mask: u64) -> Self::SimdMask { - // this match will get removed by the compiler since the number of lanes is known at - // compile-time for each concrete numeric type - match Self::lanes() { - 4 => { - // the bit position in each lane indicates the index of that lane - let vecidx = i128x4::new(1, 2, 4, 8); - - // broadcast the lowermost 8 bits of mask to each lane - let vecmask = i128x4::splat((mask & 0x0F) as i128); - // compute whether the bit corresponding to each lanes index is set - let vecmask = (vecidx & vecmask).eq(vecidx); - - // transmute is necessary because the different match arms return different - // mask types, at runtime only one of those expressions will exist per type, - // with the type being equal to `SimdMask`. - unsafe { std::mem::transmute(vecmask) } - } - 8 => { - // the bit position in each lane indicates the index of that lane - let vecidx = i64x8::new(1, 2, 4, 8, 16, 32, 64, 128); - - // broadcast the lowermost 8 bits of mask to each lane - let vecmask = i64x8::splat((mask & 0xFF) as i64); - // compute whether the bit corresponding to each lanes index is set - let vecmask = (vecidx & vecmask).eq(vecidx); - - // transmute is necessary because the different match arms return different - // mask types, at runtime only one of those expressions will exist per type, - // with the type being equal to `SimdMask`. - unsafe { std::mem::transmute(vecmask) } - } - 16 => { - // same general logic as for 8 lanes, extended to 16 bits - let vecidx = i32x16::new( - 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, - 32768, - ); - - let vecmask = i32x16::splat((mask & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - unsafe { std::mem::transmute(vecmask) } - } - 32 => { - // compute two separate m32x16 vector masks from from the lower-most 32 bits of `mask` - // and then combine them into one m16x32 vector mask by writing and reading a temporary - let tmp = &mut [0_i16; 32]; - - let vecidx = i32x16::new( - 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, - 32768, - ); - - let vecmask = i32x16::splat((mask & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - i16x16::from_cast(vecmask).write_to_slice_unaligned(&mut tmp[0..16]); - - let vecmask = i32x16::splat(((mask >> 16) & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - i16x16::from_cast(vecmask).write_to_slice_unaligned(&mut tmp[16..32]); - - unsafe { std::mem::transmute(i16x32::from_slice_unaligned(tmp)) } - } - 64 => { - // compute four m32x16 vector masks from from all 64 bits of `mask` - // and convert them into one m8x64 vector mask by writing and reading a temporary - let tmp = &mut [0_i8; 64]; - - let vecidx = i32x16::new( - 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, - 32768, - ); - - let vecmask = i32x16::splat((mask & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - i8x16::from_cast(vecmask).write_to_slice_unaligned(&mut tmp[0..16]); - - let vecmask = i32x16::splat(((mask >> 16) & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - i8x16::from_cast(vecmask).write_to_slice_unaligned(&mut tmp[16..32]); - - let vecmask = i32x16::splat(((mask >> 32) & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - i8x16::from_cast(vecmask).write_to_slice_unaligned(&mut tmp[32..48]); - - let vecmask = i32x16::splat(((mask >> 48) & 0xFFFF) as i32); - let vecmask = (vecidx & vecmask).eq(vecidx); - - i8x16::from_cast(vecmask).write_to_slice_unaligned(&mut tmp[48..64]); - - unsafe { std::mem::transmute(i8x64::from_slice_unaligned(tmp)) } - } - _ => panic!("Invalid number of vector lanes"), - } - } - - #[inline] - fn mask_to_u64(mask: &Self::SimdMask) -> u64 { - mask.bitmask() as u64 - } - - #[inline] - fn mask_get(mask: &Self::SimdMask, idx: usize) -> bool { - unsafe { mask.extract_unchecked(idx) } - } - - #[inline] - fn mask_set(mask: Self::SimdMask, idx: usize, value: bool) -> Self::SimdMask { - unsafe { mask.replace_unchecked(idx, value) } - } - - /// Selects elements of `a` and `b` using `mask` - #[inline] - fn mask_select(mask: Self::SimdMask, a: Self::Simd, b: Self::Simd) -> Self::Simd { - mask.select(a, b) - } - - #[inline] - fn mask_any(mask: Self::SimdMask) -> bool { - mask.any() - } - - #[inline] - fn bin_op Self::Simd>( - left: Self::Simd, - right: Self::Simd, - op: F, - ) -> Self::Simd { - op(left, right) - } - - #[inline] - fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.eq(right) - } - - #[inline] - fn ne(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.ne(right) - } - - #[inline] - fn lt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.lt(right) - } - - #[inline] - fn le(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.le(right) - } - - #[inline] - fn gt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.gt(right) - } - - #[inline] - fn ge(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.ge(right) - } - - #[inline] - fn write(simd_result: Self::Simd, slice: &mut [Self::Native]) { - unsafe { simd_result.write_to_slice_unaligned_unchecked(slice) }; - } - - #[inline] - fn unary_op Self::Simd>(a: Self::Simd, op: F) -> Self::Simd { - op(a) - } - } - - #[cfg(not(feature = "simd"))] - impl ArrowNumericType for $impl_ty {} - }; -} - -make_numeric_type!(Int8Type, i8, i8x64, m8x64); -make_numeric_type!(Int16Type, i16, i16x32, m16x32); -make_numeric_type!(Int32Type, i32, i32x16, m32x16); -make_numeric_type!(Int64Type, i64, i64x8, m64x8); -make_numeric_type!(UInt8Type, u8, u8x64, m8x64); -make_numeric_type!(UInt16Type, u16, u16x32, m16x32); -make_numeric_type!(UInt32Type, u32, u32x16, m32x16); -make_numeric_type!(UInt64Type, u64, u64x8, m64x8); -make_numeric_type!(Float32Type, f32, f32x16, m32x16); -make_numeric_type!(Float64Type, f64, f64x8, m64x8); - -make_numeric_type!(TimestampSecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampMillisecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampMicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampNanosecondType, i64, i64x8, m64x8); -make_numeric_type!(Date32Type, i32, i32x16, m32x16); -make_numeric_type!(Date64Type, i64, i64x8, m64x8); -make_numeric_type!(Time32SecondType, i32, i32x16, m32x16); -make_numeric_type!(Time32MillisecondType, i32, i32x16, m32x16); -make_numeric_type!(Time64MicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(Time64NanosecondType, i64, i64x8, m64x8); -make_numeric_type!(IntervalYearMonthType, i32, i32x16, m32x16); -make_numeric_type!(IntervalDayTimeType, i64, i64x8, m64x8); -make_numeric_type!(IntervalMonthDayNanoType, i128, i128x4, m128x4); -make_numeric_type!(DurationSecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationMillisecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationMicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationNanosecondType, i64, i64x8, m64x8); -make_numeric_type!(Decimal128Type, i128, i128x4, m128x4); - -#[cfg(not(feature = "simd"))] -impl ArrowNumericType for Float16Type {} - -#[cfg(feature = "simd")] -impl ArrowNumericType for Float16Type { - type Simd = ::Simd; - type SimdMask = ::SimdMask; - - fn lanes() -> usize { - Float32Type::lanes() - } - - fn init(value: Self::Native) -> Self::Simd { - Float32Type::init(value.to_f32()) - } - - fn load(slice: &[Self::Native]) -> Self::Simd { - let mut s = [0_f32; Self::Simd::lanes()]; - s.iter_mut().zip(slice).for_each(|(o, a)| *o = a.to_f32()); - Float32Type::load(&s) - } - - fn mask_init(value: bool) -> Self::SimdMask { - Float32Type::mask_init(value) - } - - fn mask_from_u64(mask: u64) -> Self::SimdMask { - Float32Type::mask_from_u64(mask) - } - - fn mask_to_u64(mask: &Self::SimdMask) -> u64 { - Float32Type::mask_to_u64(mask) - } - - fn mask_get(mask: &Self::SimdMask, idx: usize) -> bool { - Float32Type::mask_get(mask, idx) - } - - fn mask_set(mask: Self::SimdMask, idx: usize, value: bool) -> Self::SimdMask { - Float32Type::mask_set(mask, idx, value) - } - - fn mask_select(mask: Self::SimdMask, a: Self::Simd, b: Self::Simd) -> Self::Simd { - Float32Type::mask_select(mask, a, b) - } - - fn mask_any(mask: Self::SimdMask) -> bool { - Float32Type::mask_any(mask) - } - - fn bin_op Self::Simd>( - left: Self::Simd, - right: Self::Simd, - op: F, - ) -> Self::Simd { - op(left, right) - } - - fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - Float32Type::eq(left, right) - } - - fn ne(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - Float32Type::ne(left, right) - } - - fn lt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - Float32Type::lt(left, right) - } - - fn le(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - Float32Type::le(left, right) - } - - fn gt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - Float32Type::gt(left, right) - } - - fn ge(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - Float32Type::ge(left, right) - } - - fn write(simd_result: Self::Simd, slice: &mut [Self::Native]) { - let mut s = [0_f32; Self::Simd::lanes()]; - Float32Type::write(simd_result, &mut s); - slice - .iter_mut() - .zip(s) - .for_each(|(o, i)| *o = half::f16::from_f32(i)) - } - - fn unary_op Self::Simd>(a: Self::Simd, op: F) -> Self::Simd { - Float32Type::unary_op(a, op) - } -} - -#[cfg(not(feature = "simd"))] -impl ArrowNumericType for Decimal256Type {} - -#[cfg(feature = "simd")] -impl ArrowNumericType for Decimal256Type { - type Simd = arrow_buffer::i256; - type SimdMask = bool; - - fn lanes() -> usize { - 1 - } - - fn init(value: Self::Native) -> Self::Simd { - value - } - - fn load(slice: &[Self::Native]) -> Self::Simd { - slice[0] - } - - fn mask_init(value: bool) -> Self::SimdMask { - value - } - - fn mask_from_u64(mask: u64) -> Self::SimdMask { - mask != 0 - } - - fn mask_to_u64(mask: &Self::SimdMask) -> u64 { - *mask as u64 - } - - fn mask_get(mask: &Self::SimdMask, _idx: usize) -> bool { - *mask - } - - fn mask_set(_mask: Self::SimdMask, _idx: usize, value: bool) -> Self::SimdMask { - value - } - - fn mask_select(mask: Self::SimdMask, a: Self::Simd, b: Self::Simd) -> Self::Simd { - match mask { - true => a, - false => b, - } - } - - fn mask_any(mask: Self::SimdMask) -> bool { - mask - } - - fn bin_op Self::Simd>( - left: Self::Simd, - right: Self::Simd, - op: F, - ) -> Self::Simd { - op(left, right) - } - - fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.eq(&right) - } - - fn ne(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.ne(&right) - } - - fn lt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.lt(&right) - } - - fn le(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.le(&right) - } - - fn gt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.gt(&right) - } - - fn ge(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { - left.ge(&right) - } - - fn write(simd_result: Self::Simd, slice: &mut [Self::Native]) { - slice[0] = simd_result - } - - fn unary_op Self::Simd>(a: Self::Simd, op: F) -> Self::Simd { - op(a) - } -} - -#[cfg(all(test, feature = "simd"))] -mod tests { - use super::*; - use FromCast; - - /// calculate the expected mask by iterating over all bits - macro_rules! expected_mask { - ($T:ty, $MASK:expr) => {{ - let mask = $MASK; - // simd width of all types is currently 64 bytes -> 512 bits - let lanes = 64 / std::mem::size_of::<$T>(); - // translate each set bit into a value of all ones (-1) of the correct type - (0..lanes) - .map(|i| (if (mask & (1 << i)) != 0 { -1 } else { 0 })) - .collect::>() - }}; - } - - #[test] - fn test_mask_i128() { - let mask = 0b1101; - let actual = IntervalMonthDayNanoType::mask_from_u64(mask); - let expected = expected_mask!(i128, mask); - let expected = m128x4::from_cast(i128x4::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } - - #[test] - fn test_mask_f64() { - let mask = 0b10101010; - let actual = Float64Type::mask_from_u64(mask); - let expected = expected_mask!(i64, mask); - let expected = m64x8::from_cast(i64x8::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } - - #[test] - fn test_mask_u64() { - let mask = 0b01010101; - let actual = Int64Type::mask_from_u64(mask); - let expected = expected_mask!(i64, mask); - let expected = m64x8::from_cast(i64x8::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } - - #[test] - fn test_mask_f32() { - let mask = 0b10101010_10101010; - let actual = Float32Type::mask_from_u64(mask); - let expected = expected_mask!(i32, mask); - let expected = m32x16::from_cast(i32x16::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } - - #[test] - fn test_mask_i32() { - let mask = 0b01010101_01010101; - let actual = Int32Type::mask_from_u64(mask); - let expected = expected_mask!(i32, mask); - let expected = m32x16::from_cast(i32x16::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } - - #[test] - fn test_mask_u16() { - let mask = 0b01010101_01010101_10101010_10101010; - let actual = UInt16Type::mask_from_u64(mask); - let expected = expected_mask!(i16, mask); - let expected = m16x32::from_cast(i16x32::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } - - #[test] - fn test_mask_i8() { - let mask = 0b01010101_01010101_10101010_10101010_01010101_01010101_10101010_10101010; - let actual = Int8Type::mask_from_u64(mask); - let expected = expected_mask!(i8, mask); - let expected = m8x64::from_cast(i8x64::from_slice_unaligned(expected.as_slice())); - - assert_eq!(expected, actual); - } -} +impl ArrowNumericType for T {} diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 16d0e822d052..6e177838c4f5 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -213,19 +213,84 @@ make_type!( IntervalYearMonthType, i32, DataType::Interval(IntervalUnit::YearMonth), - "A “calendar” interval type in months." + "A “calendar” interval stored as the number of whole months." ); make_type!( IntervalDayTimeType, i64, DataType::Interval(IntervalUnit::DayTime), - "A “calendar” interval type in days and milliseconds." + r#"A “calendar” interval type in days and milliseconds. + +## Representation +This type is stored as a single 64 bit integer, interpreted as two i32 fields: +1. the number of elapsed days +2. The number of milliseconds (no leap seconds), + +```text + ┌──────────────┬──────────────┐ + │ Days │ Milliseconds │ + │ (32 bits) │ (32 bits) │ + └──────────────┴──────────────┘ + 0 31 63 bit offset +``` +Please see the [Arrow Spec](https://github.com/apache/arrow/blob/081b4022fe6f659d8765efc82b3f4787c5039e3c/format/Schema.fbs#L406-L408) for more details + +## Note on Comparing and Ordering for Calendar Types + +Values of `IntervalDayTimeType` are compared using their binary representation, +which can lead to surprising results. Please see the description of ordering on +[`IntervalMonthDayNanoType`] for more details +"# ); make_type!( IntervalMonthDayNanoType, i128, DataType::Interval(IntervalUnit::MonthDayNano), - "A “calendar” interval type in months, days, and nanoseconds." + r#"A “calendar” interval type in months, days, and nanoseconds. + +## Representation +This type is stored as a single 128 bit integer, +interpreted as three different signed integral fields: + +1. The number of months (32 bits) +2. The number days (32 bits) +2. The number of nanoseconds (64 bits). + +Nanoseconds does not allow for leap seconds. +Each field is independent (e.g. there is no constraint that the quantity of +nanoseconds represents less than a day's worth of time). + +```text +┌──────────────────────────────┬─────────────┬──────────────┐ +│ Nanos │ Days │ Months │ +│ (64 bits) │ (32 bits) │ (32 bits) │ +└──────────────────────────────┴─────────────┴──────────────┘ + 0 63 95 127 bit offset +``` +Please see the [Arrow Spec](https://github.com/apache/arrow/blob/081b4022fe6f659d8765efc82b3f4787c5039e3c/format/Schema.fbs#L409-L415) for more details + +## Note on Comparing and Ordering for Calendar Types +Values of `IntervalMonthDayNanoType` are compared using their binary representation, +which can lead to surprising results. + +Spans of time measured in calendar units are not fixed in absolute size (e.g. +number of seconds) which makes defining comparisons and ordering non trivial. +For example `1 month` is 28 days for February but `1 month` is 31 days +in December. + +This makes the seemingly simple operation of comparing two intervals +complicated in practice. For example is `1 month` more or less than `30 days`? The +answer depends on what month you are talking about. + +This crate defines comparisons for calendar types using their binary +representation which is fast and efficient, but leads +to potentially surprising results. + +For example a +`IntervalMonthDayNano` of `1 month` will compare as **greater** than a +`IntervalMonthDayNano` of `100 days` because the binary representation of `1 month` +is larger than the binary representation of 100 days. +"# ); make_type!( DurationSecondType, diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 05530eed9b08..9db8732f3611 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use crate::alloc::{Allocation, Deallocation, ALIGNMENT}; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; +use crate::BufferBuilder; use crate::{bytes::Bytes, native::ArrowNativeType}; use super::ops::bitwise_unary_op_helper; @@ -99,7 +100,7 @@ impl Buffer { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - #[deprecated(note = "Use From>")] + #[deprecated(note = "Use Buffer::from_vec")] pub unsafe fn from_raw_parts(ptr: NonNull, len: usize, capacity: usize) -> Self { assert!(len <= capacity); let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap(); @@ -371,6 +372,12 @@ impl From for Buffer { } } +impl From> for Buffer { + fn from(mut value: BufferBuilder) -> Self { + value.finish() + } +} + impl Buffer { /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length. /// Prefer this to `collect` whenever possible, as it is ~60% faster. diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs index 3b75d5384046..f1c2ae785720 100644 --- a/arrow-buffer/src/buffer/scalar.rs +++ b/arrow-buffer/src/buffer/scalar.rs @@ -18,7 +18,7 @@ use crate::alloc::Deallocation; use crate::buffer::Buffer; use crate::native::ArrowNativeType; -use crate::MutableBuffer; +use crate::{BufferBuilder, MutableBuffer, OffsetBuffer}; use std::fmt::Formatter; use std::marker::PhantomData; use std::ops::Deref; @@ -145,6 +145,12 @@ impl From for ScalarBuffer { } } +impl From> for ScalarBuffer { + fn from(value: OffsetBuffer) -> Self { + value.into_inner() + } +} + impl From> for ScalarBuffer { fn from(value: Vec) -> Self { Self { @@ -154,6 +160,13 @@ impl From> for ScalarBuffer { } } +impl From> for ScalarBuffer { + fn from(mut value: BufferBuilder) -> Self { + let len = value.len(); + Self::new(value.finish(), 0, len) + } +} + impl FromIterator for ScalarBuffer { fn from_iter>(iter: I) -> Self { iter.into_iter().collect::>().into() @@ -263,4 +276,12 @@ mod tests { let buffer = Buffer::from_iter([0_i32, 1, 2]); ScalarBuffer::::new(buffer, 0, usize::MAX / 4 + 1); } + + #[test] + fn convert_from_buffer_builder() { + let input = vec![1, 2, 3, 4]; + let buffer_builder = BufferBuilder::from(input.clone()); + let scalar_buffer = ScalarBuffer::from(buffer_builder); + assert_eq!(scalar_buffer.as_ref(), input); + } } diff --git a/arrow-buffer/src/util/bit_util.rs b/arrow-buffer/src/util/bit_util.rs index b27931f4cc85..d2dbf3c84882 100644 --- a/arrow-buffer/src/util/bit_util.rs +++ b/arrow-buffer/src/util/bit_util.rs @@ -17,9 +17,6 @@ //! Utils for working with bits -#[cfg(feature = "simd")] -use packed_simd::u8x64; - const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; const UNSET_BIT_MASK: [u8; 8] = [ 255 - 1, @@ -104,31 +101,13 @@ pub fn ceil(value: usize, divisor: usize) -> usize { value / divisor + (0 != value % divisor) as usize } -/// Performs SIMD bitwise binary operations. -/// -/// # Safety -/// -/// Note that each slice should be 64 bytes and it is the callers responsibility to ensure -/// that this is the case. If passed slices larger than 64 bytes the operation will only -/// be performed on the first 64 bytes. Slices less than 64 bytes will panic. -#[cfg(feature = "simd")] -pub unsafe fn bitwise_bin_op_simd(left: &[u8], right: &[u8], result: &mut [u8], op: F) -where - F: Fn(u8x64, u8x64) -> u8x64, -{ - let left_simd = u8x64::from_slice_unaligned_unchecked(left); - let right_simd = u8x64::from_slice_unaligned_unchecked(right); - let simd_result = op(left_simd, right_simd); - simd_result.write_to_slice_unaligned_unchecked(result); -} - -#[cfg(all(test, feature = "test_utils"))] +#[cfg(test)] mod tests { use std::collections::HashSet; use super::*; - use crate::util::test_util::seedable_rng; - use rand::Rng; + use rand::rngs::StdRng; + use rand::{Rng, SeedableRng}; #[test] fn test_round_upto_multiple_of_64() { @@ -167,10 +146,14 @@ mod tests { assert!(!get_bit(&[0b01001001, 0b01010010], 15)); } + pub fn seedable_rng() -> StdRng { + StdRng::seed_from_u64(42) + } + #[test] fn test_get_bit_raw() { const NUM_BYTE: usize = 10; - let mut buf = vec![0; NUM_BYTE]; + let mut buf = [0; NUM_BYTE]; let mut expected = vec![]; let mut rng = seedable_rng(); for i in 0..8 * NUM_BYTE { @@ -278,7 +261,6 @@ mod tests { } #[test] - #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] fn test_ceil() { assert_eq!(ceil(0, 1), 0); assert_eq!(ceil(1, 1), 1); @@ -292,28 +274,4 @@ mod tests { assert_eq!(ceil(10, 10000000000), 1); assert_eq!(ceil(10000000000, 1000000000), 10); } - - #[test] - #[cfg(feature = "simd")] - fn test_bitwise_and_simd() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a & b) }; - for i in buf3.iter() { - assert_eq!(&0b00110000u8, i); - } - } - - #[test] - #[cfg(feature = "simd")] - fn test_bitwise_or_simd() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a | b) }; - for i in buf3.iter() { - assert_eq!(&0b11110011u8, i); - } - } } diff --git a/arrow-cast/src/cast.rs b/arrow-cast/src/cast.rs index 51acd36c3fe4..a75354cf9b35 100644 --- a/arrow-cast/src/cast.rs +++ b/arrow-cast/src/cast.rs @@ -2444,10 +2444,16 @@ fn cast_string_to_timestamp_impl( +fn cast_string_to_interval( array: &dyn Array, cast_options: &CastOptions, -) -> Result { + parse_function: F, +) -> Result +where + Offset: OffsetSizeTrait, + ArrowType: ArrowPrimitiveType, + F: Fn(&str) -> Result + Copy, +{ let string_array = array .as_any() .downcast_ref::>() @@ -2455,92 +2461,59 @@ fn cast_string_to_year_month_interval( let interval_array = if cast_options.safe { let iter = string_array .iter() - .map(|v| v.and_then(|v| parse_interval_year_month(v).ok())); + .map(|v| v.and_then(|v| parse_function(v).ok())); // Benefit: // 20% performance improvement // Soundness: // The iterator is trustedLen because it comes from an `StringArray`. - unsafe { IntervalYearMonthArray::from_trusted_len_iter(iter) } + unsafe { PrimitiveArray::::from_trusted_len_iter(iter) } } else { let vec = string_array .iter() - .map(|v| v.map(parse_interval_year_month).transpose()) + .map(|v| v.map(parse_function).transpose()) .collect::, ArrowError>>()?; // Benefit: // 20% performance improvement // Soundness: // The iterator is trustedLen because it comes from an `StringArray`. - unsafe { IntervalYearMonthArray::from_trusted_len_iter(vec) } + unsafe { PrimitiveArray::::from_trusted_len_iter(vec) } }; Ok(Arc::new(interval_array) as ArrayRef) } -fn cast_string_to_day_time_interval( +fn cast_string_to_year_month_interval( array: &dyn Array, cast_options: &CastOptions, ) -> Result { - let string_array = array - .as_any() - .downcast_ref::>() - .unwrap(); - let interval_array = if cast_options.safe { - let iter = string_array - .iter() - .map(|v| v.and_then(|v| parse_interval_day_time(v).ok())); - - // Benefit: - // 20% performance improvement - // Soundness: - // The iterator is trustedLen because it comes from an `StringArray`. - unsafe { IntervalDayTimeArray::from_trusted_len_iter(iter) } - } else { - let vec = string_array - .iter() - .map(|v| v.map(parse_interval_day_time).transpose()) - .collect::, ArrowError>>()?; + cast_string_to_interval::( + array, + cast_options, + parse_interval_year_month, + ) +} - // Benefit: - // 20% performance improvement - // Soundness: - // The iterator is trustedLen because it comes from an `StringArray`. - unsafe { IntervalDayTimeArray::from_trusted_len_iter(vec) } - }; - Ok(Arc::new(interval_array) as ArrayRef) +fn cast_string_to_day_time_interval( + array: &dyn Array, + cast_options: &CastOptions, +) -> Result { + cast_string_to_interval::( + array, + cast_options, + parse_interval_day_time, + ) } fn cast_string_to_month_day_nano_interval( array: &dyn Array, cast_options: &CastOptions, ) -> Result { - let string_array = array - .as_any() - .downcast_ref::>() - .unwrap(); - let interval_array = if cast_options.safe { - let iter = string_array - .iter() - .map(|v| v.and_then(|v| parse_interval_month_day_nano(v).ok())); - - // Benefit: - // 20% performance improvement - // Soundness: - // The iterator is trustedLen because it comes from an `StringArray`. - unsafe { IntervalMonthDayNanoArray::from_trusted_len_iter(iter) } - } else { - let vec = string_array - .iter() - .map(|v| v.map(parse_interval_month_day_nano).transpose()) - .collect::, ArrowError>>()?; - - // Benefit: - // 20% performance improvement - // Soundness: - // The iterator is trustedLen because it comes from an `StringArray`. - unsafe { IntervalMonthDayNanoArray::from_trusted_len_iter(vec) } - }; - Ok(Arc::new(interval_array) as ArrayRef) + cast_string_to_interval::( + array, + cast_options, + parse_interval_month_day_nano, + ) } fn adjust_timestamp_to_timezone( @@ -9449,4 +9422,29 @@ mod tests { let r: Vec<_> = a.as_string::().iter().map(|x| x.unwrap()).collect(); assert_eq!(r, &["[0, 1, 2]", "[0, null, 2]"]); } + #[test] + fn test_cast_string_to_timestamp_invalid_tz() { + // content after Z should be ignored + let bad_timestamp = "2023-12-05T21:58:10.45ZZTOP"; + let array = StringArray::from(vec![Some(bad_timestamp)]); + + let data_types = [ + DataType::Timestamp(TimeUnit::Second, None), + DataType::Timestamp(TimeUnit::Millisecond, None), + DataType::Timestamp(TimeUnit::Microsecond, None), + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]; + + let cast_options = CastOptions { + safe: false, + ..Default::default() + }; + + for dt in data_types { + assert_eq!( + cast_with_options(&array, &dt, &cast_options).unwrap_err().to_string(), + "Parser error: Invalid timezone \"ZZTOP\": only offset based timezones supported without chrono-tz feature" + ); + } + } } diff --git a/arrow-cast/src/parse.rs b/arrow-cast/src/parse.rs index 750f38006d33..3d2e47ed95a4 100644 --- a/arrow-cast/src/parse.rs +++ b/arrow-cast/src/parse.rs @@ -210,7 +210,7 @@ pub fn string_to_datetime(timezone: &T, s: &str) -> Result, /// Optional datetime format for datetime arrays @@ -209,14 +215,17 @@ pub struct WriterBuilder { impl Default for WriterBuilder { fn default() -> Self { - Self { - has_header: true, + WriterBuilder { delimiter: b',', + has_header: true, + quote: b'"', + escape: b'\\', + double_quote: true, date_format: None, datetime_format: None, - time_format: None, timestamp_format: None, timestamp_tz_format: None, + time_format: None, null_value: None, } } @@ -277,6 +286,51 @@ impl WriterBuilder { self.delimiter } + /// Set the CSV file's quote character as a byte character + pub fn with_quote(mut self, quote: u8) -> Self { + self.quote = quote; + self + } + + /// Get the CSV file's quote character as a byte character + pub fn quote(&self) -> u8 { + self.quote + } + + /// Set the CSV file's escape character as a byte character + /// + /// In some variants of CSV, quotes are escaped using a special escape + /// character like `\` (instead of escaping quotes by doubling them). + /// + /// By default, writing these idiosyncratic escapes is disabled, and is + /// only used when `double_quote` is disabled. + pub fn with_escape(mut self, escape: u8) -> Self { + self.escape = escape; + self + } + + /// Get the CSV file's escape character as a byte character + pub fn escape(&self) -> u8 { + self.escape + } + + /// Set whether to enable double quote escapes + /// + /// When enabled (which is the default), quotes are escaped by doubling + /// them. e.g., `"` escapes to `""`. + /// + /// When disabled, quotes are escaped with the escape character (which + /// is `\\` by default). + pub fn with_double_quote(mut self, double_quote: bool) -> Self { + self.double_quote = double_quote; + self + } + + /// Get whether double quote escapes are enabled + pub fn double_quote(&self) -> bool { + self.double_quote + } + /// Set the CSV file's date format pub fn with_date_format(mut self, format: String) -> Self { self.date_format = Some(format); @@ -346,7 +400,12 @@ impl WriterBuilder { /// Create a new `Writer` pub fn build(self, writer: W) -> Writer { let mut builder = csv::WriterBuilder::new(); - let writer = builder.delimiter(self.delimiter).from_writer(writer); + let writer = builder + .delimiter(self.delimiter) + .quote(self.quote) + .double_quote(self.double_quote) + .escape(self.escape) + .from_writer(writer); Writer { writer, beginning: true, @@ -499,8 +558,8 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo ]); let c1 = StringArray::from(vec![ - "Lorem ipsum dolor sit amet", - "consectetur adipiscing elit", + "Lorem ipsum \ndolor sit amet", + "consectetur \"adipiscing\" elit", "sed do eiusmod tempor", ]); let c2 = @@ -526,6 +585,7 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo let builder = WriterBuilder::new() .with_header(false) .with_delimiter(b'|') + .with_quote(b'\'') .with_null("NULL".to_string()) .with_time_format("%r".to_string()); let mut writer = builder.build(&mut file); @@ -541,10 +601,33 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo file.read_to_end(&mut buffer).unwrap(); assert_eq!( - "Lorem ipsum dolor sit amet|123.564532|3|true|12:20:34 AM\nconsectetur adipiscing elit|NULL|2|false|06:51:20 AM\nsed do eiusmod tempor|-556132.25|1|NULL|11:46:03 PM\n" + "'Lorem ipsum \ndolor sit amet'|123.564532|3|true|12:20:34 AM\nconsectetur \"adipiscing\" elit|NULL|2|false|06:51:20 AM\nsed do eiusmod tempor|-556132.25|1|NULL|11:46:03 PM\n" .to_string(), String::from_utf8(buffer).unwrap() ); + + let mut file = tempfile::tempfile().unwrap(); + + let builder = WriterBuilder::new() + .with_header(true) + .with_double_quote(false) + .with_escape(b'$'); + let mut writer = builder.build(&mut file); + let batches = vec![&batch]; + for batch in batches { + writer.write(batch).unwrap(); + } + drop(writer); + + file.rewind().unwrap(); + let mut buffer: Vec = vec![]; + file.read_to_end(&mut buffer).unwrap(); + + assert_eq!( + "c1,c2,c3,c4,c6\n\"Lorem ipsum \ndolor sit amet\",123.564532,3,true,00:20:34\n\"consectetur $\"adipiscing$\" elit\",,2,false,06:51:20\nsed do eiusmod tempor,-556132.25,1,,23:46:03\n" + .to_string(), + String::from_utf8(buffer).unwrap() + ); } #[test] diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6f2cb30a1629..06e53505fc22 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_array::*; -use arrow_buffer::{Buffer, MutableBuffer}; +use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; use arrow_data::ArrayData; use arrow_schema::*; use crate::compression::CompressionCodec; -use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER}; +use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER}; use DataType::*; /// Read a buffer based on offset and length @@ -498,10 +498,34 @@ pub fn read_dictionary( Ok(()) } +/// Read the data for a given block +fn read_block(mut reader: R, block: &Block) -> Result { + reader.seek(SeekFrom::Start(block.offset() as u64))?; + let body_len = block.bodyLength().to_usize().unwrap(); + let metadata_len = block.metaDataLength().to_usize().unwrap(); + let total_len = body_len.checked_add(metadata_len).unwrap(); + + let mut buf = MutableBuffer::from_len_zeroed(total_len); + reader.read_exact(&mut buf)?; + Ok(buf.into()) +} + +/// Parse an encapsulated message +/// +/// +fn parse_message(buf: &[u8]) -> Result { + let buf = match buf[..4] == CONTINUATION_MARKER { + true => &buf[8..], + false => &buf[4..], + }; + crate::root_as_message(buf) + .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))) +} + /// Arrow File reader pub struct FileReader { /// Buffered file reader that supports reading and seeking - reader: BufReader, + reader: R, /// The schema that is read from the file header schema: SchemaRef, @@ -535,7 +559,6 @@ pub struct FileReader { impl fmt::Debug for FileReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { f.debug_struct("FileReader") - .field("reader", &"BufReader<..>") .field("schema", &self.schema) .field("blocks", &self.blocks) .field("current_block", &self.current_block) @@ -543,37 +566,28 @@ impl fmt::Debug for FileReader { .field("dictionaries_by_id", &self.dictionaries_by_id) .field("metadata_version", &self.metadata_version) .field("projection", &self.projection) - .finish() + .finish_non_exhaustive() } } impl FileReader { /// Try to create a new file reader /// - /// Returns errors if the file does not meet the Arrow Format header and footer - /// requirements - pub fn try_new(reader: R, projection: Option>) -> Result { - let mut reader = BufReader::new(reader); - // check if header and footer contain correct magic bytes - let mut magic_buffer: [u8; 6] = [0; 6]; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != super::ARROW_MAGIC { - return Err(ArrowError::ParseError( - "Arrow file does not contain correct header".to_string(), - )); - } - reader.seek(SeekFrom::End(-6))?; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != super::ARROW_MAGIC { + /// Returns errors if the file does not meet the Arrow Format footer requirements + pub fn try_new(mut reader: R, projection: Option>) -> Result { + // Space for ARROW_MAGIC (6 bytes) and length (4 bytes) + let mut buffer = [0; 10]; + reader.seek(SeekFrom::End(-10))?; + reader.read_exact(&mut buffer)?; + + if buffer[4..] != super::ARROW_MAGIC { return Err(ArrowError::ParseError( "Arrow file does not contain correct footer".to_string(), )); } + // read footer length - let mut footer_size: [u8; 4] = [0; 4]; - reader.seek(SeekFrom::End(-10))?; - reader.read_exact(&mut footer_size)?; - let footer_len = i32::from_le_bytes(footer_size); + let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap()); // read footer let mut footer_data = vec![0; footer_len as usize]; @@ -607,35 +621,14 @@ impl FileReader { let mut dictionaries_by_id = HashMap::new(); if let Some(dictionaries) = footer.dictionaries() { for block in dictionaries { - // read length from end of offset - let mut message_size: [u8; 4] = [0; 4]; - reader.seek(SeekFrom::Start(block.offset() as u64))?; - reader.read_exact(&mut message_size)?; - if message_size == CONTINUATION_MARKER { - reader.read_exact(&mut message_size)?; - } - let footer_len = i32::from_le_bytes(message_size); - let mut block_data = vec![0; footer_len as usize]; - - reader.read_exact(&mut block_data)?; - - let message = crate::root_as_message(&block_data[..]).map_err(|err| { - ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) - })?; + let buf = read_block(&mut reader, block)?; + let message = parse_message(&buf)?; match message.header_type() { crate::MessageHeader::DictionaryBatch => { let batch = message.header_as_dictionary_batch().unwrap(); - - // read the block that makes up the dictionary batch into a buffer - let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); - reader.seek(SeekFrom::Start( - block.offset() as u64 + block.metaDataLength() as u64, - ))?; - reader.read_exact(&mut buf)?; - read_dictionary( - &buf.into(), + &buf.slice(block.metaDataLength() as _), batch, &schema, &mut dictionaries_by_id, @@ -702,27 +695,15 @@ impl FileReader { } fn maybe_next(&mut self) -> Result, ArrowError> { - let block = self.blocks[self.current_block]; + let block = &self.blocks[self.current_block]; self.current_block += 1; // read length - self.reader.seek(SeekFrom::Start(block.offset() as u64))?; - let mut meta_buf = [0; 4]; - self.reader.read_exact(&mut meta_buf)?; - if meta_buf == CONTINUATION_MARKER { - // continuation marker encountered, read message next - self.reader.read_exact(&mut meta_buf)?; - } - let meta_len = i32::from_le_bytes(meta_buf); - - let mut block_data = vec![0; meta_len as usize]; - self.reader.read_exact(&mut block_data)?; - let message = crate::root_as_message(&block_data[..]).map_err(|err| { - ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) - })?; + let buffer = read_block(&mut self.reader, block)?; + let message = parse_message(&buffer)?; // some old test data's footer metadata is not set, so we account for that - if self.metadata_version != crate::MetadataVersion::V1 + if self.metadata_version != MetadataVersion::V1 && message.version() != self.metadata_version { return Err(ArrowError::IpcError( @@ -739,14 +720,8 @@ impl FileReader { ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) })?; // read the block that makes up the record batch into a buffer - let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); - self.reader.seek(SeekFrom::Start( - block.offset() as u64 + block.metaDataLength() as u64, - ))?; - self.reader.read_exact(&mut buf)?; - read_record_batch( - &buf.into(), + &buffer.slice(block.metaDataLength() as _), batch, self.schema(), &self.dictionaries_by_id, @@ -766,14 +741,14 @@ impl FileReader { /// /// It is inadvisable to directly read from the underlying reader. pub fn get_ref(&self) -> &R { - self.reader.get_ref() + &self.reader } /// Gets a mutable reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. pub fn get_mut(&mut self) -> &mut R { - self.reader.get_mut() + &mut self.reader } } diff --git a/arrow-ord/src/comparison.rs b/arrow-ord/src/comparison.rs index 021ecdf0e658..4d552b038a7d 100644 --- a/arrow-ord/src/comparison.rs +++ b/arrow-ord/src/comparison.rs @@ -243,64 +243,6 @@ fn make_utf8_scalar(d: &DataType, scalar: &str) -> Result } } -/// Helper function to perform boolean lambda function on values from two array accessors, this -/// version does not attempt to use SIMD. -fn compare_op( - left: T, - right: S, - op: F, -) -> Result -where - F: Fn(T::Item, S::Item) -> bool, -{ - if left.len() != right.len() { - return Err(ArrowError::ComputeError( - "Cannot perform comparison operation on arrays of different length".to_string(), - )); - } - - Ok(BooleanArray::from_binary(left, right, op)) -} - -/// Helper function to perform boolean lambda function on values from array accessor, this -/// version does not attempt to use SIMD. -fn compare_op_scalar(left: T, op: F) -> Result -where - F: Fn(T::Item) -> bool, -{ - Ok(BooleanArray::from_unary(left, op)) -} - -/// Evaluate `op(left, right)` for [`PrimitiveArray`]s using a specified -/// comparison function. -#[deprecated(note = "Use BooleanArray::from_binary")] -pub fn no_simd_compare_op( - left: &PrimitiveArray, - right: &PrimitiveArray, - op: F, -) -> Result -where - T: ArrowPrimitiveType, - F: Fn(T::Native, T::Native) -> bool, -{ - compare_op(left, right, op) -} - -/// Evaluate `op(left, right)` for [`PrimitiveArray`] and scalar using -/// a specified comparison function. -#[deprecated(note = "Use BooleanArray::from_unary")] -pub fn no_simd_compare_op_scalar( - left: &PrimitiveArray, - right: T::Native, - op: F, -) -> Result -where - T: ArrowPrimitiveType, - F: Fn(T::Native, T::Native) -> bool, -{ - compare_op_scalar(left, |l| op(l, right)) -} - /// Perform `left == right` operation on [`StringArray`] / [`LargeStringArray`]. #[deprecated(note = "Use arrow_ord::cmp::eq")] pub fn eq_utf8( @@ -610,7 +552,6 @@ pub fn gt_eq_utf8_scalar( /// Perform `left == right` operation on an array and a numeric scalar /// value. Supports PrimitiveArrays, and DictionaryArrays that have primitive values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -628,7 +569,6 @@ where /// Perform `left < right` operation on an array and a numeric scalar /// value. Supports PrimitiveArrays, and DictionaryArrays that have primitive values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -646,7 +586,6 @@ where /// Perform `left <= right` operation on an array and a numeric scalar /// value. Supports PrimitiveArrays, and DictionaryArrays that have primitive values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -664,7 +603,6 @@ where /// Perform `left > right` operation on an array and a numeric scalar /// value. Supports PrimitiveArrays, and DictionaryArrays that have primitive values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -682,7 +620,6 @@ where /// Perform `left >= right` operation on an array and a numeric scalar /// value. Supports PrimitiveArrays, and DictionaryArrays that have primitive values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -700,7 +637,6 @@ where /// Perform `left != right` operation on an array and a numeric scalar /// value. Supports PrimitiveArrays, and DictionaryArrays that have primitive values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1015,7 +951,6 @@ pub fn gt_eq_dyn(left: &dyn Array, right: &dyn Array) -> Result(left: &PrimitiveArray, op: F) -> Result where T: ArrowNumericType, F: Fn(T::Native) -> bool, { - compare_op_scalar(left, op) + Ok(BooleanArray::from_unary(left, op)) } /// Perform `left != right` operation on two [`PrimitiveArray`]s. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1081,7 +1016,6 @@ where /// Perform `left != right` operation on a [`PrimitiveArray`] and a scalar value. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1100,7 +1034,6 @@ where /// Perform `left < right` operation on two [`PrimitiveArray`]s. Null values are less than non-null /// values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1121,7 +1054,6 @@ where /// Perform `left < right` operation on a [`PrimitiveArray`] and a scalar value. /// Null values are less than non-null values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1140,7 +1072,6 @@ where /// Perform `left <= right` operation on two [`PrimitiveArray`]s. Null values are less than non-null /// values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1161,7 +1092,6 @@ where /// Perform `left <= right` operation on a [`PrimitiveArray`] and a scalar value. /// Null values are less than non-null values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1183,7 +1113,6 @@ where /// Perform `left > right` operation on two [`PrimitiveArray`]s. Non-null values are greater than null /// values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1204,7 +1133,6 @@ where /// Perform `left > right` operation on a [`PrimitiveArray`] and a scalar value. /// Non-null values are greater than null values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1223,7 +1151,6 @@ where /// Perform `left >= right` operation on two [`PrimitiveArray`]s. Non-null values are greater than null /// values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1244,7 +1171,6 @@ where /// Perform `left >= right` operation on a [`PrimitiveArray`] and a scalar value. /// Non-null values are greater than null values. /// -/// If `simd` feature flag is not enabled: /// For floating values like f32 and f64, this comparison produces an ordering in accordance to /// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard. /// Note that totalOrder treats positive and negative zeros are different. If it is necessary @@ -1481,6 +1407,48 @@ mod tests { vec![6, 7, 8, 9, 10, 6, 7, 8, 9, 10], vec![false, false, true, false, false, false, false, true, false, false] ); + + cmp_vec!( + eq, + eq_dyn, + IntervalYearMonthArray, + vec![ + IntervalYearMonthType::make_value(1, 2), + IntervalYearMonthType::make_value(2, 1), + // 1 year + IntervalYearMonthType::make_value(1, 0), + ], + vec![ + IntervalYearMonthType::make_value(1, 2), + IntervalYearMonthType::make_value(1, 2), + // NB 12 months is treated as equal to a year (as the underlying + // type stores number of months) + IntervalYearMonthType::make_value(0, 12), + ], + vec![true, false, true] + ); + + cmp_vec!( + eq, + eq_dyn, + IntervalMonthDayNanoArray, + vec![ + IntervalMonthDayNanoType::make_value(1, 2, 3), + IntervalMonthDayNanoType::make_value(3, 2, 1), + // 1 month + IntervalMonthDayNanoType::make_value(1, 0, 0), + IntervalMonthDayNanoType::make_value(1, 0, 0), + ], + vec![ + IntervalMonthDayNanoType::make_value(1, 2, 3), + IntervalMonthDayNanoType::make_value(1, 2, 3), + // 30 days is not treated as a month + IntervalMonthDayNanoType::make_value(0, 30, 0), + // 100 days + IntervalMonthDayNanoType::make_value(0, 100, 0), + ], + vec![true, false, false, false] + ); } #[test] @@ -1734,6 +1702,77 @@ mod tests { vec![6, 7, 8, 9, 10, 6, 7, 8, 9, 10], vec![false, false, false, true, true, false, false, false, true, true] ); + + cmp_vec!( + lt, + lt_dyn, + IntervalDayTimeArray, + vec![ + IntervalDayTimeType::make_value(1, 0), + IntervalDayTimeType::make_value(0, 1000), + IntervalDayTimeType::make_value(1, 1000), + IntervalDayTimeType::make_value(1, 3000), + // 90M milliseconds + IntervalDayTimeType::make_value(0, 90_000_000), + ], + vec![ + IntervalDayTimeType::make_value(0, 1000), + IntervalDayTimeType::make_value(1, 0), + IntervalDayTimeType::make_value(10, 0), + IntervalDayTimeType::make_value(2, 1), + // NB even though 1 day is less than 90M milliseconds long, + // it compares as greater because the underlying type stores + // days and milliseconds as different fields + IntervalDayTimeType::make_value(0, 12), + ], + vec![false, true, true, true ,false] + ); + + cmp_vec!( + lt, + lt_dyn, + IntervalYearMonthArray, + vec![ + IntervalYearMonthType::make_value(1, 2), + IntervalYearMonthType::make_value(2, 1), + IntervalYearMonthType::make_value(1, 2), + // 1 year + IntervalYearMonthType::make_value(1, 0), + ], + vec![ + IntervalYearMonthType::make_value(1, 2), + IntervalYearMonthType::make_value(1, 2), + IntervalYearMonthType::make_value(2, 1), + // NB 12 months is treated as equal to a year (as the underlying + // type stores number of months) + IntervalYearMonthType::make_value(0, 12), + ], + vec![false, false, true, false] + ); + + cmp_vec!( + lt, + lt_dyn, + IntervalMonthDayNanoArray, + vec![ + IntervalMonthDayNanoType::make_value(1, 2, 3), + IntervalMonthDayNanoType::make_value(3, 2, 1), + // 1 month + IntervalMonthDayNanoType::make_value(1, 0, 0), + IntervalMonthDayNanoType::make_value(1, 2, 0), + IntervalMonthDayNanoType::make_value(1, 0, 0), + ], + vec![ + IntervalMonthDayNanoType::make_value(1, 2, 3), + IntervalMonthDayNanoType::make_value(1, 2, 3), + IntervalMonthDayNanoType::make_value(2, 0, 0), + // 30 days is not treated as a month + IntervalMonthDayNanoType::make_value(0, 30, 0), + // 100 days (note is treated as greater than 1 month as the underlying integer representation) + IntervalMonthDayNanoType::make_value(0, 100, 0), + ], + vec![false, false, true, false, false] + ); } #[test] diff --git a/arrow-ord/src/ord.rs b/arrow-ord/src/ord.rs index 28ca07cce260..f6bd39c9cd5d 100644 --- a/arrow-ord/src/ord.rs +++ b/arrow-ord/src/ord.rs @@ -216,6 +216,73 @@ pub mod tests { assert_eq!(Ordering::Greater, cmp(1, 0)); } + #[test] + fn test_interval_day_time() { + let array = IntervalDayTimeArray::from(vec![ + // 0 days, 1 second + IntervalDayTimeType::make_value(0, 1000), + // 1 day, 2 milliseconds + IntervalDayTimeType::make_value(1, 2), + // 90M milliseconds (which is more than is in 1 day) + IntervalDayTimeType::make_value(0, 90_000_000), + ]); + + let cmp = build_compare(&array, &array).unwrap(); + + assert_eq!(Ordering::Less, cmp(0, 1)); + assert_eq!(Ordering::Greater, cmp(1, 0)); + + // somewhat confusingly, while 90M milliseconds is more than 1 day, + // it will compare less as the comparison is done on the underlying + // values not field by field + assert_eq!(Ordering::Greater, cmp(1, 2)); + assert_eq!(Ordering::Less, cmp(2, 1)); + } + + #[test] + fn test_interval_year_month() { + let array = IntervalYearMonthArray::from(vec![ + // 1 year, 0 months + IntervalYearMonthType::make_value(1, 0), + // 0 years, 13 months + IntervalYearMonthType::make_value(0, 13), + // 1 year, 1 month + IntervalYearMonthType::make_value(1, 1), + ]); + + let cmp = build_compare(&array, &array).unwrap(); + + assert_eq!(Ordering::Less, cmp(0, 1)); + assert_eq!(Ordering::Greater, cmp(1, 0)); + + // the underlying representation is months, so both quantities are the same + assert_eq!(Ordering::Equal, cmp(1, 2)); + assert_eq!(Ordering::Equal, cmp(2, 1)); + } + + #[test] + fn test_interval_month_day_nano() { + let array = IntervalMonthDayNanoArray::from(vec![ + // 100 days + IntervalMonthDayNanoType::make_value(0, 100, 0), + // 1 month + IntervalMonthDayNanoType::make_value(1, 0, 0), + // 100 day, 1 nanoseconds + IntervalMonthDayNanoType::make_value(0, 100, 2), + ]); + + let cmp = build_compare(&array, &array).unwrap(); + + assert_eq!(Ordering::Less, cmp(0, 1)); + assert_eq!(Ordering::Greater, cmp(1, 0)); + + // somewhat confusingly, while 100 days is more than 1 month in all cases + // it will compare less as the comparison is done on the underlying + // values not field by field + assert_eq!(Ordering::Greater, cmp(1, 2)); + assert_eq!(Ordering::Less, cmp(2, 1)); + } + #[test] fn test_decimal() { let array = vec![Some(5_i128), Some(2_i128), Some(3_i128)] diff --git a/arrow/CONTRIBUTING.md b/arrow/CONTRIBUTING.md index 5b84bc2d3bdb..0c795d6b9cbd 100644 --- a/arrow/CONTRIBUTING.md +++ b/arrow/CONTRIBUTING.md @@ -67,18 +67,6 @@ the impossibility of the compiler to derive the invariants (such as lifetime, nu The arrow format declares a IPC protocol, which this crate supports. IPC is equivalent to a FFI in that the rust compiler can't reason about the contract's invariants. -#### SIMD - -The API provided by the [packed_simd_2](https://docs.rs/packed_simd_2/latest/packed_simd_2/) crate is currently `unsafe`. However, -SIMD offers a significant performance improvement over non-SIMD operations. A related crate in development is -[portable-simd](https://rust-lang.github.io/portable-simd/core_simd/) which has a nice -[beginners guide](https://github.com/rust-lang/portable-simd/blob/master/beginners-guide.md). These crates provide the ability -for code on x86 and ARM architectures to use some of the available parallel register operations. As an example if two arrays -of numbers are added, [1,2,3,4] + [5,6,7,8], rather than using four instructions to add each of the elements of the arrays, -one instruction can be used to all all four elements at the same time, which leads to improved time to solution. SIMD instructions -are typically most effective when data is aligned to allow a single load instruction to bring multiple consecutive data elements -to the registers, before use of a SIMD instruction. - #### Performance Some operations are significantly faster when `unsafe` is used. diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 6ca218f5f658..a6b4ddf51dfb 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -65,7 +65,6 @@ ipc_compression = ["ipc", "arrow-ipc/lz4", "arrow-ipc/zstd"] csv = ["arrow-csv"] ipc = ["arrow-ipc"] json = ["arrow-json"] -simd = ["arrow-array/simd", "arrow-arith/simd"] prettyprint = ["arrow-cast/prettyprint"] # The test utils feature enables code used in benchmarks and tests but # not the core arrow code itself. Be aware that `rand` must be kept as diff --git a/arrow/README.md b/arrow/README.md index 6a91bc951cc1..bc95b91a9a4a 100644 --- a/arrow/README.md +++ b/arrow/README.md @@ -48,9 +48,7 @@ The `arrow` crate provides the following features which may be enabled in your ` - `ipc` (default) - support for reading [Arrow IPC Format](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc), also used as the wire protocol in [arrow-flight](https://crates.io/crates/arrow-flight) - `ipc_compression` - Enables reading and writing compressed IPC streams (also enables `ipc`) - `prettyprint` - support for formatting record batches as textual columns -- `simd` - (_Requires Nightly Rust_) Use alternate hand optimized implementations of some [compute](https://github.com/apache/arrow-rs/tree/master/arrow/src/compute/kernels) - kernels using explicit SIMD instructions via [packed_simd_2](https://docs.rs/packed_simd_2/latest/packed_simd_2/). - `chrono-tz` - support of parsing timezone using [chrono-tz](https://docs.rs/chrono-tz/0.6.0/chrono_tz/) - `ffi` - bindings for the Arrow C [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) - `pyarrow` - bindings for pyo3 to call arrow-rs from python @@ -75,7 +73,6 @@ In particular there are a number of scenarios where `unsafe` is largely unavoida - Invariants that cannot be statically verified by the compiler and unlock non-trivial performance wins, e.g. values in a StringArray are UTF-8, [TrustedLen](https://doc.rust-lang.org/std/iter/trait.TrustedLen.html) iterators, etc... - FFI -- SIMD Additionally, this crate exposes a number of `unsafe` APIs, allowing downstream crates to explicitly opt-out of potentially expensive invariant checking where appropriate. @@ -95,7 +92,7 @@ In order to compile Arrow for `wasm32-unknown-unknown` you will need to disable ```toml [dependencies] -arrow = { version = "5.0", default-features = false, features = ["csv", "ipc", "simd"] } +arrow = { version = "5.0", default-features = false, features = ["csv", "ipc"] } ``` ## Examples diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index b49f56c91574..d867f7c30d1f 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -468,13 +468,13 @@ mod tests { use arrow_array::builder::UnionBuilder; use arrow_array::cast::AsArray; use arrow_array::types::{Float64Type, Int32Type}; - use arrow_array::{StructArray, UnionArray}; + use arrow_array::*; use crate::array::{ - make_array, Array, ArrayData, BooleanArray, Decimal128Array, DictionaryArray, - DurationSecondArray, FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, - GenericListArray, GenericStringArray, Int32Array, MapArray, OffsetSizeTrait, - Time32MillisecondArray, TimestampMillisecondArray, UInt32Array, + make_array, Array, ArrayData, BooleanArray, DictionaryArray, DurationSecondArray, + FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray, + GenericStringArray, Int32Array, MapArray, OffsetSizeTrait, Time32MillisecondArray, + TimestampMillisecondArray, UInt32Array, }; use crate::compute::kernels; use crate::datatypes::{Field, Int8Type}; diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs index 8302f8741b60..9a13cfa493e9 100644 --- a/arrow/src/pyarrow.rs +++ b/arrow/src/pyarrow.rs @@ -71,7 +71,7 @@ use crate::datatypes::{DataType, Field, Schema}; use crate::error::ArrowError; use crate::ffi; use crate::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; -use crate::ffi_stream::{export_reader_into_raw, ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use crate::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; use crate::record_batch::RecordBatch; import_exception!(pyarrow, ArrowException); @@ -377,7 +377,7 @@ impl FromPyArrow for RecordBatch { impl ToPyArrow for RecordBatch { fn to_pyarrow(&self, py: Python) -> PyResult { // Workaround apache/arrow#37669 by returning RecordBatchIterator - let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema().clone()); + let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema()); let reader: Box = Box::new(reader); let py_reader = reader.into_pyarrow(py)?; py_reader.call_method0(py, "read_next_batch") diff --git a/arrow/tests/array_transform.rs b/arrow/tests/array_transform.rs index 74e2a212736a..6f5b245b8e3b 100644 --- a/arrow/tests/array_transform.rs +++ b/arrow/tests/array_transform.rs @@ -28,6 +28,7 @@ use arrow_data::ArrayData; use arrow_schema::{DataType, Field, Fields}; use std::sync::Arc; +#[allow(unused)] fn create_decimal_array(array: Vec>, precision: u8, scale: i8) -> Decimal128Array { array .into_iter() diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index ae092edac095..2baf586127c6 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -213,7 +213,10 @@ impl Default for ClientOptions { http2_keep_alive_interval: None, http2_keep_alive_timeout: None, http2_keep_alive_while_idle: Default::default(), - http1_only: Default::default(), + // HTTP2 is known to be significantly slower than HTTP1, so we default + // to HTTP1 for now. + // https://github.com/apache/arrow-rs/issues/5194 + http1_only: true.into(), http2_only: Default::default(), } } @@ -350,17 +353,28 @@ impl ClientOptions { } /// Only use http1 connections + /// + /// This is on by default, since http2 is known to be significantly slower than http1. pub fn with_http1_only(mut self) -> Self { + self.http2_only = false.into(); self.http1_only = true.into(); self } /// Only use http2 connections pub fn with_http2_only(mut self) -> Self { + self.http1_only = false.into(); self.http2_only = true.into(); self } + /// Use http2 if supported, otherwise use http1. + pub fn with_allow_http2(mut self) -> Self { + self.http1_only = false.into(); + self.http2_only = false.into(); + self + } + /// Set a proxy URL to use for requests pub fn with_proxy_url(mut self, proxy_url: impl Into) -> Self { self.proxy_url = Some(proxy_url.into()); diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 11fa68310a2e..8633abbfb4dc 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -29,6 +29,13 @@ //! to abort the upload and drop those unneeded parts. In addition, you may wish to //! consider implementing automatic clean up of unused parts that are older than one //! week. +//! +//! ## Using HTTP/2 +//! +//! Google Cloud Storage supports both HTTP/2 and HTTP/1. HTTP/1 is used by default +//! because it allows much higher throughput in our benchmarks (see +//! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be +//! enabled by setting [crate::ClientConfigKey::Http1Only] to false. use std::sync::Arc; use crate::client::CredentialProvider; diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 01666c0af4e6..debe0d6109eb 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -19,7 +19,6 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; @@ -77,7 +76,7 @@ pub fn make_byte_array_reader( } /// An [`ArrayReader`] for variable length byte arrays -struct ByteArrayReader { +struct ByteArrayReader { data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -85,14 +84,11 @@ struct ByteArrayReader { record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, } -impl ByteArrayReader { +impl ByteArrayReader { fn new( pages: Box, data_type: ArrowType, - record_reader: GenericRecordReader< - OffsetBuffer, - ByteArrayColumnValueDecoder, - >, + record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, ) -> Self { Self { data_type, @@ -104,7 +100,7 @@ impl ByteArrayReader { } } -impl ArrayReader for ByteArrayReader { +impl ArrayReader for ByteArrayReader { fn as_any(&self) -> &dyn Any { self } @@ -167,15 +163,13 @@ impl ArrayReader for ByteArrayReader { } /// A [`ColumnValueDecoder`] for variable length byte arrays -struct ByteArrayColumnValueDecoder { +struct ByteArrayColumnValueDecoder { dict: Option>, decoder: Option, validate_utf8: bool, } -impl ColumnValueDecoder - for ByteArrayColumnValueDecoder -{ +impl ColumnValueDecoder for ByteArrayColumnValueDecoder { type Slice = OffsetBuffer; fn new(desc: &ColumnDescPtr) -> Self { @@ -275,17 +269,15 @@ impl ByteArrayDecoder { num_values, validate_utf8, )), - Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { - ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new( - data, num_levels, num_values, - )) - } + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary( + ByteArrayDecoderDictionary::new(data, num_levels, num_values), + ), Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength( ByteArrayDecoderDeltaLength::new(data, validate_utf8)?, ), - Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray( - ByteArrayDecoderDelta::new(data, validate_utf8)?, - ), + Encoding::DELTA_BYTE_ARRAY => { + ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, validate_utf8)?) + } _ => { return Err(general_err!( "unsupported encoding for byte array: {}", @@ -298,7 +290,7 @@ impl ByteArrayDecoder { } /// Read up to `len` values to `out` with the optional dictionary - pub fn read( + pub fn read( &mut self, out: &mut OffsetBuffer, len: usize, @@ -307,8 +299,8 @@ impl ByteArrayDecoder { match self { ByteArrayDecoder::Plain(d) => d.read(out, len), ByteArrayDecoder::Dictionary(d) => { - let dict = dict - .ok_or_else(|| general_err!("missing dictionary page for column"))?; + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; d.read(out, dict, len) } @@ -318,7 +310,7 @@ impl ByteArrayDecoder { } /// Skip `len` values - pub fn skip( + pub fn skip( &mut self, len: usize, dict: Option<&OffsetBuffer>, @@ -326,8 +318,8 @@ impl ByteArrayDecoder { match self { ByteArrayDecoder::Plain(d) => d.skip(len), ByteArrayDecoder::Dictionary(d) => { - let dict = dict - .ok_or_else(|| general_err!("missing dictionary page for column"))?; + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; d.skip(dict, len) } @@ -363,7 +355,7 @@ impl ByteArrayDecoderPlain { } } - pub fn read( + pub fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -392,8 +384,7 @@ impl ByteArrayDecoderPlain { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte array".into())); } - let len_bytes: [u8; 4] = - buf[self.offset..self.offset + 4].try_into().unwrap(); + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); let len = u32::from_le_bytes(len_bytes); let start_offset = self.offset + 4; @@ -424,8 +415,7 @@ impl ByteArrayDecoderPlain { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte array".into())); } - let len_bytes: [u8; 4] = - buf[self.offset..self.offset + 4].try_into().unwrap(); + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); let len = u32::from_le_bytes(len_bytes) as usize; skip += 1; self.offset = self.offset + 4 + len; @@ -462,7 +452,7 @@ impl ByteArrayDecoderDeltaLength { }) } - fn read( + fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -529,7 +519,7 @@ impl ByteArrayDecoderDelta { }) } - fn read( + fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -564,7 +554,7 @@ impl ByteArrayDecoderDictionary { } } - fn read( + fn read( &mut self, output: &mut OffsetBuffer, dict: &OffsetBuffer, @@ -576,15 +566,11 @@ impl ByteArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary( - keys, - dict.offsets.as_slice(), - dict.values.as_slice(), - ) + output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice()) }) } - fn skip( + fn skip( &mut self, dict: &OffsetBuffer, to_skip: usize, diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 0d216fa08327..a38122354145 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -27,10 +27,8 @@ use bytes::Bytes; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::buffer::{ - dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, -}; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; +use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer}; +use crate::arrow::record_reader::buffer::BufferQueue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; @@ -123,7 +121,7 @@ pub fn make_byte_array_dictionary_reader( /// An [`ArrayReader`] for dictionary encoded variable length byte arrays /// /// Will attempt to preserve any dictionary encoding present in the parquet data -struct ByteArrayDictionaryReader { +struct ByteArrayDictionaryReader { data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -133,16 +131,13 @@ struct ByteArrayDictionaryReader { impl ByteArrayDictionaryReader where - K: FromBytes + ScalarValue + Ord + ArrowNativeType, - V: ScalarValue + OffsetSizeTrait, + K: FromBytes + Ord + ArrowNativeType, + V: OffsetSizeTrait, { fn new( pages: Box, data_type: ArrowType, - record_reader: GenericRecordReader< - DictionaryBuffer, - DictionaryDecoder, - >, + record_reader: GenericRecordReader, DictionaryDecoder>, ) -> Self { Self { data_type, @@ -156,8 +151,8 @@ where impl ArrayReader for ByteArrayDictionaryReader where - K: FromBytes + ScalarValue + Ord + ArrowNativeType, - V: ScalarValue + OffsetSizeTrait, + K: FromBytes + Ord + ArrowNativeType, + V: OffsetSizeTrait, { fn as_any(&self) -> &dyn Any { self @@ -226,16 +221,15 @@ struct DictionaryDecoder { impl ColumnValueDecoder for DictionaryDecoder where - K: FromBytes + ScalarValue + Ord + ArrowNativeType, - V: ScalarValue + OffsetSizeTrait, + K: FromBytes + Ord + ArrowNativeType, + V: OffsetSizeTrait, { type Slice = DictionaryBuffer; fn new(col: &ColumnDescPtr) -> Self { let validate_utf8 = col.converted_type() == ConvertedType::UTF8; - let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) - { + let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) { (true, true) => ArrowType::LargeUtf8, (true, false) => ArrowType::LargeBinary, (false, true) => ArrowType::Utf8, @@ -274,8 +268,7 @@ where let len = num_values as usize; let mut buffer = OffsetBuffer::::default(); - let mut decoder = - ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); + let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); decoder.read(&mut buffer, usize::MAX)?; let array = buffer.into_array(None, self.value_type.clone()); @@ -339,8 +332,8 @@ where Some(keys) => { // Happy path - can just copy keys // Keys will be validated on conversion to arrow - let keys_slice = keys.spare_capacity_mut(range.start + len); - let len = decoder.get_batch(&mut keys_slice[range.start..])?; + let keys_slice = keys.get_output_slice(len); + let len = decoder.get_batch(keys_slice)?; *max_remaining_values -= len; Ok(len) } @@ -360,11 +353,7 @@ where let dict_offsets = dict_buffers[0].typed_data::(); let dict_values = dict_buffers[1].as_slice(); - values.extend_from_dictionary( - &keys[..len], - dict_offsets, - dict_values, - )?; + values.extend_from_dictionary(&keys[..len], dict_offsets, dict_values)?; *max_remaining_values -= len; Ok(len) } @@ -375,9 +364,7 @@ where fn skip_values(&mut self, num_values: usize) -> Result { match self.decoder.as_mut().expect("decoder set") { - MaybeDictionaryDecoder::Fallback(decoder) => { - decoder.skip::(num_values, None) - } + MaybeDictionaryDecoder::Fallback(decoder) => decoder.skip::(num_values, None), MaybeDictionaryDecoder::Dict { decoder, max_remaining_values, diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index b846997d36b8..849aa37c561f 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -18,7 +18,7 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be}; use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}; +use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{Encoding, Type}; @@ -162,11 +162,10 @@ impl ArrayReader for FixedLenByteArrayReader { fn consume_batch(&mut self) -> Result { let record_data = self.record_reader.consume_record_data(); - let array_data = - ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) - .len(self.record_reader.num_values()) - .add_buffer(record_data) - .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); + let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) + .len(self.record_reader.num_values()) + .add_buffer(record_data) + .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() }); @@ -197,19 +196,13 @@ impl ArrayReader for FixedLenByteArrayReader { IntervalUnit::YearMonth => Arc::new( binary .iter() - .map(|o| { - o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())) - }) + .map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))) .collect::(), ) as ArrayRef, IntervalUnit::DayTime => Arc::new( binary .iter() - .map(|o| { - o.map(|b| { - i64::from_le_bytes(b[4..12].try_into().unwrap()) - }) - }) + .map(|o| o.map(|b| i64::from_le_bytes(b[4..12].try_into().unwrap()))) .collect::(), ) as ArrayRef, IntervalUnit::MonthDayNano => { @@ -247,7 +240,7 @@ impl ArrayReader for FixedLenByteArrayReader { } struct FixedLenByteArrayBuffer { - buffer: ScalarBuffer, + buffer: Vec, /// The length of each element in bytes byte_length: usize, } @@ -263,14 +256,14 @@ impl BufferQueue for FixedLenByteArrayBuffer { type Slice = Self; fn consume(&mut self) -> Self::Output { - self.buffer.consume() + Buffer::from_vec(self.buffer.consume()) } - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { self } - fn set_len(&mut self, len: usize) { + fn truncate_buffer(&mut self, len: usize) { assert_eq!(self.buffer.len(), len * self.byte_length); } } @@ -288,14 +281,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { (read_offset + values_read) * self.byte_length ); self.buffer - .resize((read_offset + levels_read) * self.byte_length); - - let slice = self.buffer.as_slice_mut(); + .resize((read_offset + levels_read) * self.byte_length, 0); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in - values_range.rev().zip(iter_set_bits_rev(valid_mask)) - { + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; @@ -305,7 +294,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let value_pos_bytes = value_pos * self.byte_length; for i in 0..self.byte_length { - slice[level_pos_bytes + i] = slice[value_pos_bytes + i] + self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] } } } @@ -391,8 +380,7 @@ impl ColumnValueDecoder for ValueDecoder { let len = range.end - range.start; match self.decoder.as_mut().unwrap() { Decoder::Plain { offset, buf } => { - let to_read = - (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; + let to_read = (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; let end_offset = *offset + to_read * self.byte_length; out.buffer .extend_from_slice(&buf.as_ref()[*offset..end_offset]); @@ -485,15 +473,12 @@ mod tests { .build() .unwrap(); - let written = RecordBatch::try_from_iter([( - "list", - Arc::new(ListArray::from(data)) as ArrayRef, - )]) - .unwrap(); + let written = + RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)]) + .unwrap(); let mut buffer = Vec::with_capacity(1024); - let mut writer = - ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); + let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); writer.write(&written).unwrap(); writer.close().unwrap(); diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 4ad6c97e2f66..bb32fb307fda 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -16,14 +16,13 @@ // under the License. use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::column::page::PageIterator; use crate::data_type::DataType; use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow_array::ArrayRef; -use arrow_buffer::Buffer; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType as ArrowType; use std::any::Any; use std::sync::Arc; @@ -33,7 +32,7 @@ use std::sync::Arc; pub struct NullArrayReader where T: DataType, - T::T: ScalarValue, + T::T: ArrowNativeType, { data_type: ArrowType, pages: Box, @@ -45,7 +44,7 @@ where impl NullArrayReader where T: DataType, - T::T: ScalarValue, + T::T: ArrowNativeType, { /// Construct null array reader. pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { @@ -65,7 +64,7 @@ where impl ArrayReader for NullArrayReader where T: DataType, - T::T: ScalarValue, + T::T: ArrowNativeType, { fn as_any(&self) -> &dyn Any { self diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index f833eccecb4c..507b6215cacb 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -16,7 +16,6 @@ // under the License. use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; -use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::Type as PhysicalType; @@ -26,22 +25,55 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::Decimal256Array; use arrow_array::{ - builder::{BooleanBufferBuilder, TimestampNanosecondBufferBuilder}, - ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, - Int64Array, TimestampNanosecondArray, UInt32Array, UInt64Array, + builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array, + Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array, + UInt64Array, }; -use arrow_buffer::{i256, Buffer}; +use arrow_buffer::{i256, BooleanBuffer, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, TimeUnit}; use std::any::Any; use std::sync::Arc; +/// Provides conversion from `Vec` to `Buffer` +pub trait IntoBuffer { + fn into_buffer(self) -> Buffer; +} + +macro_rules! native_buffer { + ($($t:ty),*) => { + $(impl IntoBuffer for Vec<$t> { + fn into_buffer(self) -> Buffer { + Buffer::from_vec(self) + } + })* + }; +} +native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64); + +impl IntoBuffer for Vec { + fn into_buffer(self) -> Buffer { + BooleanBuffer::from_iter(self).into_inner() + } +} + +impl IntoBuffer for Vec { + fn into_buffer(self) -> Buffer { + let mut builder = TimestampNanosecondBufferBuilder::new(self.len()); + for v in self { + builder.append(v.to_nanos()) + } + builder.finish() + } +} + /// Primitive array readers are leaves of array reader tree. They accept page iterator /// and read them into primitive arrays. pub struct PrimitiveArrayReader where T: DataType, - T::T: ScalarValue, + T::T: Copy + Default, + Vec: IntoBuffer, { data_type: ArrowType, pages: Box, @@ -53,7 +85,8 @@ where impl PrimitiveArrayReader where T: DataType, - T::T: ScalarValue, + T::T: Copy + Default, + Vec: IntoBuffer, { /// Construct primitive array reader. pub fn new( @@ -85,7 +118,8 @@ where impl ArrayReader for PrimitiveArrayReader where T: DataType, - T::T: ScalarValue, + T::T: Copy + Default, + Vec: IntoBuffer, { fn as_any(&self) -> &dyn Any { self @@ -131,40 +165,14 @@ where _ => unreachable!("INT96 must be timestamp nanosecond"), }, PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); + unreachable!("PrimitiveArrayReaders don't support complex physical types"); } }; // Convert to arrays by using the Parquet physical type. // The physical types are then cast to Arrow types if necessary - let record_data = self.record_reader.consume_record_data(); - let record_data = match T::get_physical_type() { - PhysicalType::BOOLEAN => { - let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); - - for e in record_data.as_slice() { - boolean_buffer.append(*e > 0); - } - boolean_buffer.into() - } - PhysicalType::INT96 => { - // SAFETY - record_data is an aligned buffer of Int96 - let (prefix, slice, suffix) = - unsafe { record_data.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - - let mut builder = TimestampNanosecondBufferBuilder::new(slice.len()); - for v in slice { - builder.append(v.to_nanos()) - } - - builder.finish() - } - _ => record_data, - }; + let record_data = self.record_reader.consume_record_data().into_buffer(); let array_data = ArrayDataBuilder::new(arrow_data_type) .len(self.record_reader.num_values()) @@ -188,9 +196,7 @@ where PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)), PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)), PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); + unreachable!("PrimitiveArrayReaders don't support complex physical types"); } }; @@ -409,12 +415,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); @@ -618,12 +621,9 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); let mut accu_len: usize = 0; @@ -697,12 +697,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // read data from the reader // the data type is decimal(8,2) @@ -759,12 +756,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // read data from the reader // the data type is decimal(18,4) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 28c7c3b00540..61933b24178e 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -22,7 +22,7 @@ use crate::data_type::{AsBytes, ByteArray, Int32Type}; use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{WriterProperties, WriterVersion}; +use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; @@ -379,6 +379,7 @@ impl DictEncoder { pub struct ByteArrayEncoder { fallback: FallbackEncoder, dict_encoder: Option, + statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, bloom_filter: Option, @@ -387,24 +388,6 @@ pub struct ByteArrayEncoder { impl ColumnValueEncoder for ByteArrayEncoder { type T = ByteArray; type Values = dyn Array; - - fn min_max( - &self, - values: &dyn Array, - value_indices: Option<&[usize]>, - ) -> Option<(Self::T, Self::T)> { - match value_indices { - Some(indices) => { - let iter = indices.iter().cloned(); - downcast_op!(values.data_type(), values, compute_min_max, iter) - } - None => { - let len = Array::len(values); - downcast_op!(values.data_type(), values, compute_min_max, 0..len) - } - } - } - fn flush_bloom_filter(&mut self) -> Option { self.bloom_filter.take() } @@ -424,12 +407,15 @@ impl ColumnValueEncoder for ByteArrayEncoder { .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) .transpose()?; + let statistics_enabled = props.statistics_enabled(descr.path()); + Ok(Self { fallback, + statistics_enabled, + bloom_filter, dict_encoder: dictionary, min_value: None, max_value: None, - bloom_filter, }) } @@ -498,13 +484,15 @@ where T: ArrayAccessor + Copy, T::Item: Copy + Ord + AsRef<[u8]>, { - if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { - if encoder.min_value.as_ref().map_or(true, |m| m > &min) { - encoder.min_value = Some(min); - } + if encoder.statistics_enabled != EnabledStatistics::None { + if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { + if encoder.min_value.as_ref().map_or(true, |m| m > &min) { + encoder.min_value = Some(min); + } - if encoder.max_value.as_ref().map_or(true, |m| m < &max) { - encoder.max_value = Some(max); + if encoder.max_value.as_ref().map_or(true, |m| m < &max) { + encoder.max_value = Some(max); + } } } diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index 4208318122af..d0f63024edf0 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -16,7 +16,7 @@ // under the License. use crate::arrow::buffer::offset_buffer::OffsetBuffer; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer}; +use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; use crate::column::reader::decoder::ValuesBufferSlice; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait}; @@ -27,17 +27,12 @@ use std::sync::Arc; /// An array of variable length byte arrays that are potentially dictionary encoded /// and can be converted into a corresponding [`ArrayRef`] -pub enum DictionaryBuffer { - Dict { - keys: ScalarBuffer, - values: ArrayRef, - }, - Values { - values: OffsetBuffer, - }, +pub enum DictionaryBuffer { + Dict { keys: Vec, values: ArrayRef }, + Values { values: OffsetBuffer }, } -impl Default for DictionaryBuffer { +impl Default for DictionaryBuffer { fn default() -> Self { Self::Values { values: Default::default(), @@ -45,9 +40,7 @@ impl Default for DictionaryBuffer { } } -impl - DictionaryBuffer -{ +impl DictionaryBuffer { #[allow(unused)] pub fn len(&self) -> usize { match self { @@ -63,7 +56,7 @@ impl /// # Panic /// /// Panics if the dictionary is too large for `K` - pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut ScalarBuffer> { + pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec> { assert!(K::from_usize(dictionary.len()).is_some()); match self { @@ -112,7 +105,7 @@ impl if values.is_empty() { // If dictionary is empty, zero pad offsets - spilled.offsets.resize(keys.len() + 1); + spilled.offsets.resize(keys.len() + 1, V::default()); } else { // Note: at this point null positions will have arbitrary dictionary keys // and this will hydrate them to the corresponding byte array. This is @@ -164,7 +157,7 @@ impl let builder = ArrayDataBuilder::new(data_type.clone()) .len(keys.len()) - .add_buffer(keys.into()) + .add_buffer(Buffer::from_vec(keys)) .add_child_data(values.into_data()) .null_bit_buffer(null_buffer); @@ -192,13 +185,13 @@ impl } } -impl ValuesBufferSlice for DictionaryBuffer { +impl ValuesBufferSlice for DictionaryBuffer { fn capacity(&self) -> usize { usize::MAX } } -impl ValuesBuffer for DictionaryBuffer { +impl ValuesBuffer for DictionaryBuffer { fn pad_nulls( &mut self, read_offset: usize, @@ -208,7 +201,7 @@ impl ValuesBuffer for Dictiona ) { match self { Self::Dict { keys, .. } => { - keys.resize(read_offset + levels_read); + keys.resize(read_offset + levels_read, K::default()); keys.pad_nulls(read_offset, values_read, levels_read, valid_mask) } Self::Values { values, .. } => { @@ -218,7 +211,7 @@ impl ValuesBuffer for Dictiona } } -impl BufferQueue for DictionaryBuffer { +impl BufferQueue for DictionaryBuffer { type Output = Self; type Slice = Self; @@ -234,14 +227,14 @@ impl BufferQueue for Dictionar } } - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { self } - fn set_len(&mut self, len: usize) { + fn truncate_buffer(&mut self, len: usize) { match self { - Self::Dict { keys, .. } => keys.set_len(len), - Self::Values { values } => values.set_len(len), + Self::Dict { keys, .. } => keys.truncate_buffer(len), + Self::Values { values } => values.truncate_buffer(len), } } } diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 3f8f85494f02..459c94ed2803 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -16,7 +16,7 @@ // under the License. use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer}; +use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; use crate::column::reader::decoder::ValuesBufferSlice; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; @@ -27,23 +27,23 @@ use arrow_schema::DataType as ArrowType; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] #[derive(Debug)] -pub struct OffsetBuffer { - pub offsets: ScalarBuffer, - pub values: ScalarBuffer, +pub struct OffsetBuffer { + pub offsets: Vec, + pub values: Vec, } -impl Default for OffsetBuffer { +impl Default for OffsetBuffer { fn default() -> Self { - let mut offsets = ScalarBuffer::new(); - offsets.resize(1); + let mut offsets = Vec::new(); + offsets.resize(1, I::default()); Self { offsets, - values: ScalarBuffer::new(), + values: Vec::new(), } } } -impl OffsetBuffer { +impl OffsetBuffer { /// Returns the number of byte arrays in this buffer pub fn len(&self) -> usize { self.offsets.len() - 1 @@ -128,8 +128,8 @@ impl OffsetBuffer { pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let array_data_builder = ArrayDataBuilder::new(data_type) .len(self.len()) - .add_buffer(self.offsets.into()) - .add_buffer(self.values.into()) + .add_buffer(Buffer::from_vec(self.offsets)) + .add_buffer(Buffer::from_vec(self.values)) .null_bit_buffer(null_buffer); let data = match cfg!(debug_assertions) { @@ -141,7 +141,7 @@ impl OffsetBuffer { } } -impl BufferQueue for OffsetBuffer { +impl BufferQueue for OffsetBuffer { type Output = Self; type Slice = Self; @@ -149,16 +149,16 @@ impl BufferQueue for OffsetBuffer { std::mem::take(self) } - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { self } - fn set_len(&mut self, len: usize) { + fn truncate_buffer(&mut self, len: usize) { assert_eq!(self.offsets.len(), len + 1); } } -impl ValuesBuffer for OffsetBuffer { +impl ValuesBuffer for OffsetBuffer { fn pad_nulls( &mut self, read_offset: usize, @@ -167,9 +167,10 @@ impl ValuesBuffer for OffsetBuffer { valid_mask: &[u8], ) { assert_eq!(self.offsets.len(), read_offset + values_read + 1); - self.offsets.resize(read_offset + levels_read + 1); + self.offsets + .resize(read_offset + levels_read + 1, I::default()); - let offsets = self.offsets.as_slice_mut(); + let offsets = &mut self.offsets; let mut last_pos = read_offset + levels_read + 1; let mut last_start_offset = I::from_usize(self.values.len()).unwrap(); @@ -207,7 +208,7 @@ impl ValuesBuffer for OffsetBuffer { } } -impl ValuesBufferSlice for OffsetBuffer { +impl ValuesBufferSlice for OffsetBuffer { fn capacity(&self) -> usize { usize::MAX } diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 35a322e6c723..3914710ff7b9 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::marker::PhantomData; - use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::data_type::Int96; -use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; /// A buffer that supports writing new data to the end, and removing data from the front /// @@ -37,12 +33,12 @@ pub trait BufferQueue: Sized { /// to append data to the end of this [`BufferQueue`] /// /// NB: writes to the returned slice will not update the length of [`BufferQueue`] - /// instead a subsequent call should be made to [`BufferQueue::set_len`] - fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice; + /// instead a subsequent call should be made to [`BufferQueue::truncate_buffer`] + fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice; /// Sets the length of the [`BufferQueue`]. /// - /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`] + /// Intended to be used in combination with [`BufferQueue::get_output_slice`] /// /// # Panics /// @@ -57,132 +53,27 @@ pub trait BufferQueue: Sized { /// track how much of this slice is actually written to by the caller. This is still /// safe as the slice is default-initialized. /// - fn set_len(&mut self, len: usize); -} - -/// A marker trait for [scalar] types -/// -/// This means that a `[Self::default()]` of length `len` can be safely created from a -/// zero-initialized `[u8]` with length `len * std::mem::size_of::()` and -/// alignment of `std::mem::size_of::()` -/// -/// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types -/// -pub trait ScalarValue: Copy {} -impl ScalarValue for bool {} -impl ScalarValue for u8 {} -impl ScalarValue for i8 {} -impl ScalarValue for u16 {} -impl ScalarValue for i16 {} -impl ScalarValue for u32 {} -impl ScalarValue for i32 {} -impl ScalarValue for u64 {} -impl ScalarValue for i64 {} -impl ScalarValue for f32 {} -impl ScalarValue for f64 {} -impl ScalarValue for Int96 {} - -/// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage -#[derive(Debug)] -pub struct ScalarBuffer { - buffer: MutableBuffer, - - /// Length in elements of size T - len: usize, - - /// Placeholder to allow `T` as an invariant generic parameter - /// without making it !Send - _phantom: PhantomData T>, -} - -impl Default for ScalarBuffer { - fn default() -> Self { - Self::new() - } -} - -impl ScalarBuffer { - pub fn new() -> Self { - Self { - buffer: MutableBuffer::new(0), - len: 0, - _phantom: Default::default(), - } - } - - pub fn len(&self) -> usize { - self.len - } - - pub fn is_empty(&self) -> bool { - self.len == 0 - } - - pub fn reserve(&mut self, additional: usize) { - self.buffer.reserve(additional * std::mem::size_of::()); - } - - pub fn resize(&mut self, len: usize) { - self.buffer.resize(len * std::mem::size_of::(), 0); - self.len = len; - } - - #[inline] - pub fn as_slice(&self) -> &[T] { - let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - buf - } - - #[inline] - pub fn as_slice_mut(&mut self) -> &mut [T] { - let (prefix, buf, suffix) = unsafe { self.buffer.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - buf - } + fn truncate_buffer(&mut self, len: usize); } -impl ScalarBuffer { - pub fn push(&mut self, v: T) { - self.buffer.push(v); - self.len += 1; - } - - pub fn extend_from_slice(&mut self, v: &[T]) { - self.buffer.extend_from_slice(v); - self.len += v.len(); - } -} - -impl From> for Buffer { - fn from(t: ScalarBuffer) -> Self { - t.buffer.into() - } -} - -impl BufferQueue for ScalarBuffer { - type Output = Buffer; +impl BufferQueue for Vec { + type Output = Self; type Slice = [T]; fn consume(&mut self) -> Self::Output { - std::mem::take(self).into() + std::mem::take(self) } - fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { - self.buffer - .resize((self.len + batch_size) * std::mem::size_of::(), 0); - - let range = self.len..self.len + batch_size; - &mut self.as_slice_mut()[range] + fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice { + let len = self.len(); + self.resize(len + batch_size, T::default()); + &mut self[len..] } - fn set_len(&mut self, len: usize) { - self.len = len; - - let new_bytes = self.len * std::mem::size_of::(); - assert!(new_bytes <= self.buffer.len()); - self.buffer.resize(new_bytes, 0); + fn truncate_buffer(&mut self, len: usize) { + assert!(len <= self.len()); + self.truncate(len) } } @@ -212,7 +103,7 @@ pub trait ValuesBuffer: BufferQueue { ); } -impl ValuesBuffer for ScalarBuffer { +impl ValuesBuffer for Vec { fn pad_nulls( &mut self, read_offset: usize, @@ -220,8 +111,7 @@ impl ValuesBuffer for ScalarBuffer { levels_read: usize, valid_mask: &[u8], ) { - let slice = self.as_slice_mut(); - assert!(slice.len() >= read_offset + levels_read); + assert!(self.len() >= read_offset + levels_read); let values_range = read_offset..read_offset + values_read; for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { @@ -229,7 +119,7 @@ impl ValuesBuffer for ScalarBuffer { if level_pos <= value_pos { break; } - slice[level_pos] = slice[value_pos]; + self[level_pos] = self[value_pos]; } } } diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 9009c596c4bf..fa041f5fdb0a 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -30,12 +30,10 @@ use crate::column::reader::decoder::{ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use super::buffer::ScalarBuffer; - enum BufferInner { /// Compute levels and null mask Full { - levels: ScalarBuffer, + levels: Vec, nulls: BooleanBufferBuilder, max_level: i16, }, @@ -77,7 +75,7 @@ impl DefinitionLevelBuffer { } } false => BufferInner::Full { - levels: ScalarBuffer::new(), + levels: Vec::new(), nulls: BooleanBufferBuilder::new(0), max_level: desc.max_def_level(), }, @@ -89,7 +87,7 @@ impl DefinitionLevelBuffer { /// Returns the built level data pub fn consume_levels(&mut self) -> Option { match &mut self.inner { - BufferInner::Full { levels, .. } => Some(std::mem::take(levels).into()), + BufferInner::Full { levels, .. } => Some(Buffer::from_vec(std::mem::take(levels))), BufferInner::Mask { .. } => None, } } @@ -174,9 +172,9 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { assert_eq!(self.max_level, *max_level); assert_eq!(range.start + writer.len, nulls.len()); - levels.resize(range.end + writer.len); + levels.resize(range.end + writer.len, 0); - let slice = &mut levels.as_slice_mut()[writer.len..]; + let slice = &mut levels[writer.len..]; let levels_read = decoder.read_def_levels(slice, range.clone())?; nulls.reserve(levels_read); diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index ea982341994e..49c69c87e302 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -18,7 +18,7 @@ use arrow_buffer::Buffer; use crate::arrow::record_reader::{ - buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, + buffer::{BufferQueue, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; use crate::column::reader::decoder::RepetitionLevelDecoderImpl; @@ -37,8 +37,7 @@ pub(crate) mod buffer; mod definition_levels; /// A `RecordReader` is a stateful column reader that delimits semantic records. -pub type RecordReader = - GenericRecordReader::T>, ColumnValueDecoderImpl>; +pub type RecordReader = GenericRecordReader::T>, ColumnValueDecoderImpl>; pub(crate) type ColumnReader = GenericColumnReader; @@ -53,7 +52,7 @@ pub struct GenericRecordReader { values: V, def_levels: Option, - rep_levels: Option>, + rep_levels: Option>, column_reader: Option>, /// Number of buffered levels / null-padded values num_values: usize, @@ -81,7 +80,7 @@ where let def_levels = (desc.max_def_level() > 0) .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc))); - let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new); + let rep_levels = (desc.max_rep_level() > 0).then(Vec::new); Self { values: records, @@ -174,7 +173,9 @@ where /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. pub fn consume_rep_levels(&mut self) -> Option { - self.rep_levels.as_mut().map(|x| x.consume()) + self.rep_levels + .as_mut() + .map(|x| Buffer::from_vec(x.consume())) } /// Returns currently stored buffer data. @@ -209,9 +210,9 @@ where let rep_levels = self .rep_levels .as_mut() - .map(|levels| levels.spare_capacity_mut(batch_size)); + .map(|levels| levels.get_output_slice(batch_size)); let def_levels = self.def_levels.as_mut(); - let values = self.values.spare_capacity_mut(batch_size); + let values = self.values.get_output_slice(batch_size); let (records_read, values_read, levels_read) = self .column_reader @@ -234,9 +235,9 @@ where self.num_records += records_read; self.num_values += levels_read; - self.values.set_len(self.num_values); + self.values.truncate_buffer(self.num_values); if let Some(ref mut buf) = self.rep_levels { - buf.set_len(self.num_values) + buf.truncate_buffer(self.num_values) }; if let Some(ref mut buf) = self.def_levels { buf.set_len(self.num_values) @@ -257,7 +258,7 @@ mod tests { use std::sync::Arc; use arrow::buffer::Buffer; - use arrow_array::builder::{Int16BufferBuilder, Int32BufferBuilder}; + use arrow_array::builder::Int16BufferBuilder; use crate::basic::Encoding; use crate::data_type::Int32Type; @@ -334,10 +335,7 @@ mod tests { assert_eq!(7, record_reader.num_values()); } - let mut bb = Int32BufferBuilder::new(7); - bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]); - let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 9]); assert_eq!(None, record_reader.consume_def_levels()); assert_eq!(None, record_reader.consume_bitmap()); } @@ -434,13 +432,12 @@ mod tests { // Verify result record data let actual = record_reader.consume_record_data(); - let actual_values = actual.typed_data::(); let expected = &[0, 7, 0, 6, 3, 0, 8]; - assert_eq!(actual_values.len(), expected.len()); + assert_eq!(actual.len(), expected.len()); // Only validate valid values are equal - let iter = expected_valid.iter().zip(actual_values).zip(expected); + let iter = expected_valid.iter().zip(&actual).zip(expected); for ((valid, actual), expected) in iter { if *valid { assert_eq!(actual, expected) @@ -544,12 +541,11 @@ mod tests { // Verify result record data let actual = record_reader.consume_record_data(); - let actual_values = actual.typed_data::(); let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9]; - assert_eq!(actual_values.len(), expected.len()); + assert_eq!(actual.len(), expected.len()); // Only validate valid values are equal - let iter = expected_valid.iter().zip(actual_values).zip(expected); + let iter = expected_valid.iter().zip(&actual).zip(expected); for ((valid, actual), expected) in iter { if *valid { assert_eq!(actual, expected) @@ -713,10 +709,7 @@ mod tests { assert_eq!(0, record_reader.read_records(10).unwrap()); } - let mut bb = Int32BufferBuilder::new(3); - bb.append_slice(&[6, 3, 2]); - let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]); assert_eq!(None, record_reader.consume_def_levels()); assert_eq!(None, record_reader.consume_bitmap()); } @@ -814,13 +807,12 @@ mod tests { // Verify result record data let actual = record_reader.consume_record_data(); - let actual_values = actual.typed_data::(); let expected = &[0, 6, 3]; - assert_eq!(actual_values.len(), expected.len()); + assert_eq!(actual.len(), expected.len()); // Only validate valid values are equal - let iter = expected_valid.iter().zip(actual_values).zip(expected); + let iter = expected_valid.iter().zip(&actual).zip(expected); for ((valid, actual), expected) in iter { if *valid { assert_eq!(actual, expected) diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 0d5144f61c26..8624f859f4b0 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -76,15 +76,6 @@ pub trait ColumnValueEncoder { /// The values encoded by this encoder type Values: ColumnValues + ?Sized; - /// Returns the min and max values in this collection, skipping any NaN values - /// - /// Returns `None` if no values found - fn min_max( - &self, - values: &Self::Values, - value_indices: Option<&[usize]>, - ) -> Option<(Self::T, Self::T)>; - /// Create a new [`ColumnValueEncoder`] fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result where @@ -136,8 +127,15 @@ pub struct ColumnValueEncoderImpl { } impl ColumnValueEncoderImpl { + fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> { + match value_indices { + Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), + None => get_min_max(&self.descr, values.iter()), + } + } + fn write_slice(&mut self, slice: &[T::T]) -> Result<()> { - if self.statistics_enabled == EnabledStatistics::Page + if self.statistics_enabled != EnabledStatistics::None // INTERVAL has undefined sort order, so don't write min/max stats for it && self.descr.converted_type() != ConvertedType::INTERVAL { @@ -166,17 +164,6 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { type Values = [T::T]; - fn min_max( - &self, - values: &Self::Values, - value_indices: Option<&[usize]>, - ) -> Option<(Self::T, Self::T)> { - match value_indices { - Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), - None => get_min_max(&self.descr, values.iter()), - } - } - fn flush_bloom_filter(&mut self) -> Option { self.bloom_filter.take() } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 531af4bd461e..9f476595fb7e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -329,28 +329,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { None => values.len(), }; - // If only computing chunk-level statistics compute them here, page-level statistics - // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in - // [`Self::add_data_page`] - if self.statistics_enabled == EnabledStatistics::Chunk - // INTERVAL has undefined sort order, so don't write min/max stats for it - && self.descr.converted_type() != ConvertedType::INTERVAL - { - match (min, max) { - (Some(min), Some(max)) => { - update_min(&self.descr, min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, max, &mut self.column_metrics.max_column_value); - } - (None, Some(_)) | (Some(_), None) => { - panic!("min/max should be both set or both None") - } - (None, None) => { - if let Some((min, max)) = self.encoder.min_max(values, value_indices) { - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - } - } - }; + if let Some(min) = min { + update_min(&self.descr, min, &mut self.column_metrics.min_column_value); + } + if let Some(max) = max { + update_max(&self.descr, max, &mut self.column_metrics.max_column_value); } // We can only set the distinct count if there are no other writes @@ -764,22 +747,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls; - let page_statistics = if let (Some(min), Some(max)) = - (values_data.min_value, values_data.max_value) - { - // Update chunk level statistics - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - - (self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new( - Some(min), - Some(max), - None, - self.page_metrics.num_page_nulls, - false, - )) - } else { - None + let page_statistics = match (values_data.min_value, values_data.max_value) { + (Some(min), Some(max)) => { + // Update chunk level statistics + update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); + update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); + + (self.statistics_enabled == EnabledStatistics::Page).then_some( + ValueStatistics::new( + Some(min), + Some(max), + None, + self.page_metrics.num_page_nulls, + false, + ), + ) + } + _ => None, }; // update column and offset index