diff --git a/Cargo.lock b/Cargo.lock index 0746f0f2814e7..146f4e3b4e07a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2630,6 +2630,7 @@ name = "datafusion-spark" version = "49.0.1" dependencies = [ "arrow", + "bigdecimal", "chrono", "crc32fast", "criterion", @@ -2642,6 +2643,7 @@ dependencies = [ "log", "rand 0.9.2", "sha1", + "twox-hash", "url", "xxhash-rust", ] @@ -6840,6 +6842,9 @@ name = "twox-hash" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b907da542cbced5261bd3256de1b3a1bf340a3d37f93425a07362a1d687de56" +dependencies = [ + "rand 0.9.2", +] [[package]] name = "typed-arena" diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 4ed82a453d71f..65606e4632152 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -37,6 +37,7 @@ name = "datafusion_spark" [dependencies] arrow = { workspace = true } +bigdecimal = { workspace = true } chrono = { workspace = true } crc32fast = "1.4" datafusion-catalog = { workspace = true } @@ -47,6 +48,10 @@ datafusion-functions = { workspace = true, features = ["crypto_expressions"] } datafusion-macros = { workspace = true } log = { workspace = true } sha1 = "0.10" +# XXH3 and XXHash64 are two different algorithms. +# The xxhash-rust crate only implements XXH3, whose results differ from Spark. +# Therefore, twox-hash is introduced. +twox-hash = "2.1.1" url = { workspace = true } xxhash-rust = { version = "0.8", features = ["xxh3"] } diff --git a/datafusion/spark/src/function/hash/mod.rs b/datafusion/spark/src/function/hash/mod.rs index 5860596ac70a3..9b3af7e335d1f 100644 --- a/datafusion/spark/src/function/hash/mod.rs +++ b/datafusion/spark/src/function/hash/mod.rs @@ -16,8 +16,11 @@ // under the License. pub mod crc32; +pub mod murmur3; pub mod sha1; pub mod sha2; +pub mod utils; +pub mod xxhash64; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; @@ -26,16 +29,20 @@ use std::sync::Arc; make_udf_function!(crc32::SparkCrc32, crc32); make_udf_function!(sha1::SparkSha1, sha1); make_udf_function!(sha2::SparkSha2, sha2); +make_udf_function!(xxhash64::SparkXxHash64, xxhash64); +make_udf_function!(murmur3::SparkMurmur3Hash, hash); pub mod expr_fn { use datafusion_functions::export_functions; export_functions!( (crc32, "crc32(expr) - Returns a cyclic redundancy check value of the expr as a bigint.", arg1), (sha1, "sha1(expr) - Returns a SHA-1 hash value of the expr as a hex string.", arg1), - (sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2) + (sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2), + (xxhash64, "xxhash64(*expr) - Calculates the hash code of given columns using the 64-bit variant of the xxHash algorithm, and returns the result as a long column. The hash computation uses an initial seed of 42.", args), + (hash, "hash(*expr) - Calculates the hash code of given columns using the 32-bit variant of the MurmurHash3 algorithm, and returns the result as an integer column. The hash computation uses an initial seed of 42.", args) ); } pub fn functions() -> Vec> { - vec![crc32(), sha1(), sha2()] + vec![crc32(), sha1(), sha2(), xxhash64(), hash()] } diff --git a/datafusion/spark/src/function/hash/murmur3.rs b/datafusion/spark/src/function/hash/murmur3.rs new file mode 100644 index 0000000000000..7a9fd80d273f6 --- /dev/null +++ b/datafusion/spark/src/function/hash/murmur3.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, ArrowNativeTypeOp, PrimitiveArray}; +use arrow::datatypes::{DataType, Int32Type}; +use datafusion_common::Result; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +use std::any::Any; +use std::sync::Arc; + +use crate::function::hash::utils::SparkHasher; + +/// +#[derive(Debug, Hash, Eq, PartialEq)] +pub struct SparkMurmur3Hash { + signature: Signature, + seed: i64, +} + +impl Default for SparkMurmur3Hash { + fn default() -> Self { + Self::new() + } +} + +impl SparkMurmur3Hash { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + seed: 42, + } + } +} + +impl ScalarUDFImpl for SparkMurmur3Hash { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "hash" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let func = |arr: &[ArrayRef]| { + let mut result = vec![self.seed as i32; arr[0].len()]; + Murmur3Hasher::hash_arrays(arr, &mut result)?; + Ok(Arc::new(PrimitiveArray::::from(result)) as ArrayRef) + }; + make_scalar_function(func, vec![])(&args.args) + } +} + +// The following code is copied from +// +#[inline] +pub fn murmur3_hash>(data: T, seed: u32) -> u32 { + #[inline] + fn mix_k1(mut k1: i32) -> i32 { + k1 = k1.mul_wrapping(0xcc9e2d51u32 as i32); + k1 = k1.rotate_left(15); + k1 = k1.mul_wrapping(0x1b873593u32 as i32); + k1 + } + + #[inline] + fn mix_h1(mut h1: i32, k1: i32) -> i32 { + h1 ^= k1; + h1 = h1.rotate_left(13); + h1 = h1.mul_wrapping(5).add_wrapping(0xe6546b64u32 as i32); + h1 + } + + #[inline] + fn fmix(mut h1: i32, len: i32) -> i32 { + h1 ^= len; + h1 ^= (h1 as u32 >> 16) as i32; + h1 = h1.mul_wrapping(0x85ebca6bu32 as i32); + h1 ^= (h1 as u32 >> 13) as i32; + h1 = h1.mul_wrapping(0xc2b2ae35u32 as i32); + h1 ^= (h1 as u32 >> 16) as i32; + h1 + } + + #[inline] + unsafe fn hash_bytes_by_int(data: &[u8], seed: u32) -> i32 { + // safety: data length must be aligned to 4 bytes + let mut h1 = seed as i32; + for i in (0..data.len()).step_by(4) { + let ints = data.as_ptr().add(i) as *const i32; + let half_word = ints.read_unaligned(); + h1 = mix_h1(h1, mix_k1(half_word)); + } + h1 + } + let data = data.as_ref(); + let len = data.len(); + let len_aligned = len - len % 4; + + // safety: + // avoid boundary checking in performance critical codes. + // all operations are guaranteed to be safe + // data is &[u8] so we do not need to check for proper alignment + unsafe { + let mut h1 = if len_aligned > 0 { + hash_bytes_by_int(&data[0..len_aligned], seed) + } else { + seed as i32 + }; + + for i in len_aligned..len { + let half_word = *data.get_unchecked(i) as i8 as i32; + h1 = mix_h1(h1, mix_k1(half_word)); + } + fmix(h1, len as i32) as u32 + } +} + +struct Murmur3Hasher; + +impl SparkHasher for Murmur3Hasher { + fn oneshot(seed: i32, data: &[u8]) -> i32 { + murmur3_hash(data, seed as u32) as i32 + } +} diff --git a/datafusion/spark/src/function/hash/utils.rs b/datafusion/spark/src/function/hash/utils.rs new file mode 100644 index 0000000000000..608265eb1ab93 --- /dev/null +++ b/datafusion/spark/src/function/hash/utils.rs @@ -0,0 +1,637 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, ArrayAccessor, ArrayIter, ArrayRef, ArrowPrimitiveType, BinaryViewArray, + BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, + Decimal32Array, Decimal64Array, DictionaryArray, DurationMicrosecondArray, + DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray, + FixedSizeBinaryArray, FixedSizeListArray, Float16Array, Float32Array, Float64Array, + GenericBinaryArray, Int16Array, Int32Array, Int64Array, Int8Array, + IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray, + LargeListArray, LargeListViewArray, LargeStringArray, ListArray, ListViewArray, + MapArray, PrimitiveArray, StringArray, StringViewArray, StructArray, + Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, UnionArray, +}; +use arrow::datatypes::{ + i256, ArrowDictionaryKeyType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, + IntervalDayTime, IntervalMonthDayNano, IntervalUnit, TimeUnit, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, +}; +use bigdecimal::num_traits::{Float, ToBytes}; +use datafusion_common::{exec_err, DataFusionError, Result}; +use std::sync::Arc; + +fn hash_impl( + arr: ArrayIter>, + seed: &mut [H], + func: impl Fn(H, T) -> H, +) -> Result<()> { + for (hash, elem) in seed.iter_mut().zip(arr) { + if let Some(elem) = elem { + *hash = func(*hash, elem); + } + } + Ok(()) +} + +fn try_hash_impl( + arr: ArrayIter>, + seed: &mut [H], + func: impl Fn(H, T) -> Result, +) -> Result<()> { + let len = arr.len(); + if len != seed.len() { + return exec_err!("Array length mismatch: {} != {}", len, seed.len()); + } + for (hash, elem) in seed.iter_mut().zip(arr) { + if let Some(elem) = elem { + *hash = func(*hash, elem)?; + } + } + Ok(()) +} + +pub trait SparkHasher { + fn oneshot(seed: H, data: &[u8]) -> H; + + fn hash_boolean(arr: &BooleanArray, seed: &mut [H]) -> Result<()> { + let true_bytes = 1_i32.to_le_bytes(); + let false_bytes = 0_i32.to_le_bytes(); + hash_impl(arr.iter(), seed, |seed, v| { + if v { + Self::oneshot(seed, &true_bytes) + } else { + Self::oneshot(seed, &false_bytes) + } + })?; + Ok(()) + } + + fn hash_primitive(arr: &PrimitiveArray, seed: &mut [H]) -> Result<()> + where + T: ArrowPrimitiveType, + U: ToBytes + From, + { + hash_impl(arr.iter(), seed, |seed, v| { + let bytes = U::from(v).to_le_bytes(); + Self::oneshot(seed, bytes.as_ref()) + })?; + Ok(()) + } + + fn try_hash_primitive(arr: &PrimitiveArray, seed: &mut [H]) -> Result<()> + where + T: ArrowPrimitiveType, + U: ToBytes + TryFrom, + >::Error: std::fmt::Display, + { + try_hash_impl(arr.iter(), seed, |seed, v| { + let uv = + U::try_from(v).map_err(|e| DataFusionError::Execution(e.to_string()))?; + let bytes = uv.to_le_bytes(); + Ok(Self::oneshot(seed, bytes.as_ref())) + })?; + Ok(()) + } + + fn hash_primitive_float(arr: &PrimitiveArray, seed: &mut [H]) -> Result<()> + where + T: ArrowPrimitiveType, + T::Native: ToBytes + Float, + U: ToBytes + Default, + { + let neg_zero = T::Native::neg_zero(); + let neg_zero_bytes = U::to_le_bytes(&U::default()); + + hash_impl(arr.iter(), seed, |seed, v| { + if v == neg_zero { + Self::oneshot(seed, neg_zero_bytes.as_ref()) + } else { + let bytes = v.to_le_bytes(); + Self::oneshot(seed, bytes.as_ref()) + } + })?; + Ok(()) + } + + fn hash_bytes(iter: ArrayIter, seed: &mut [H]) -> Result<()> + where + T: ArrayAccessor, + T::Item: AsRef<[u8]>, + { + hash_impl(iter, seed, |seed, v| Self::oneshot(seed, v.as_ref()))?; + Ok(()) + } + + fn hash_list(iter: ArrayIter, seed: &mut [H]) -> Result<()> + where + T: ArrayAccessor, + { + try_hash_impl(iter, seed, |seed, v| { + let len = v.len(); + let mut result = [seed; 1]; + for i in 0..len { + let slice = v.slice(i, 1); + Self::hash(&slice, &mut result)?; + } + Ok(result[0]) + })?; + Ok(()) + } + + fn hash_map(iter: ArrayIter, seed: &mut [H]) -> Result<()> + where + T: ArrayAccessor, + { + try_hash_impl(iter, seed, |seed, v| { + let len = v.len(); + let mut result = [seed; 1]; + for i in 0..len { + let slice = v.slice(i, 1); + Self::hash(slice.column(0), &mut result)?; + Self::hash(slice.column(1), &mut result)?; + } + Ok(result[0]) + })?; + Ok(()) + } + + fn hash_struct(arr: &StructArray, seed: &mut [H]) -> Result<()> { + Self::hash_arrays(arr.columns(), seed) + } + + fn hash_arrays(arr: &[ArrayRef], seed: &mut [H]) -> Result<()> { + for arr in arr { + Self::hash(arr, seed)?; + } + Ok(()) + } + + fn hash_dictionary(arr: &DictionaryArray, seed: &mut [H]) -> Result<()> + where + T: ArrowDictionaryKeyType, + { + let len = arr.len(); + if len != seed.len() { + return exec_err!("Array length mismatch: {} != {}", len, seed.len()); + } + + for (i, seed) in seed.iter_mut().enumerate().take(arr.len()) { + if arr.is_null(i) { + continue; + } + let mut result = [*seed; 1]; + + let value = arr.values().slice(i, 1); + let key: Arc = Arc::new(arr.keys().slice(i, 1)); + + for j in 0..key.len() { + let value = value.slice(j, 1); + let key = key.slice(j, 1); + Self::hash(&key, &mut result)?; + Self::hash(&value, &mut result)?; + } + *seed = result[0]; + } + Ok(()) + } + + fn hash_union(arr: &UnionArray, seed: &mut [H]) -> Result<()> { + let len = arr.len(); + if len != seed.len() { + return exec_err!("Array length mismatch: {} != {}", len, seed.len()); + } + + for (i, seed) in seed.iter_mut().enumerate().take(len) { + let value = arr.value(i); + let mut result = [*seed; 1]; + for i in 0..value.len() { + let elem = value.slice(i, 1); + Self::hash(&elem, &mut result)?; + } + *seed = result[0]; + } + Ok(()) + } + + /// Computes the xxHash64 hash of the given data + fn hash(arr: &ArrayRef, seed: &mut [H]) -> Result<()> { + match arr.data_type() { + DataType::Null => { + // do nothing + } + DataType::Boolean => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_boolean(arr, seed)?; + } + DataType::Int8 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + DataType::UInt8 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, u32>(arr, seed)?; + } + DataType::Int16 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + DataType::UInt16 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, u32>(arr, seed)?; + } + DataType::Int32 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + DataType::UInt32 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, u32>(arr, seed)?; + } + DataType::Int64 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::UInt64 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, u64>(arr, seed)?; + } + DataType::Float16 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive_float::<_, i32>(arr, seed)?; + } + DataType::Float32 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive_float::<_, i32>(arr, seed)?; + } + DataType::Float64 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive_float::<_, i64>(arr, seed)?; + } + DataType::Decimal128(precision, _) if *precision <= 18 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::try_hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Decimal128(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i128>(arr, seed)?; + } + DataType::Decimal256(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, I256>(arr, seed)?; + } + DataType::Timestamp(TimeUnit::Second, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Date32 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + DataType::Date64 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::LargeUtf8 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::Binary => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::FixedSizeBinary(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::Utf8View => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::BinaryView => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_bytes(arr.iter(), seed)?; + } + DataType::Interval(IntervalUnit::DayTime) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, _IntervalDayTime>(arr, seed)?; + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, _IntervalMonthDayNano>(arr, seed)?; + } + DataType::Interval(IntervalUnit::YearMonth) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + DataType::Duration(TimeUnit::Second) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Duration(TimeUnit::Millisecond) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Duration(TimeUnit::Microsecond) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Duration(TimeUnit::Nanosecond) => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Time32(unit) => match unit { + TimeUnit::Second => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + TimeUnit::Millisecond => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + _ => { + return exec_err!("Unsupported time unit: {:?}", unit); + } + }, + DataType::Time64(unit) => match unit { + TimeUnit::Nanosecond => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + TimeUnit::Microsecond => { + let arr = arr + .as_any() + .downcast_ref::() + .unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + _ => { + return exec_err!("Unsupported time unit: {:?}", unit); + } + }, + DataType::List(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_list(arr.iter(), seed)?; + } + DataType::ListView(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_list(arr.iter(), seed)?; + } + DataType::LargeList(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_list(arr.iter(), seed)?; + } + DataType::LargeListView(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_list(arr.iter(), seed)?; + } + DataType::Map(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_map(arr.iter(), seed)?; + } + DataType::Struct(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_struct(arr, seed)?; + } + DataType::Union(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_union(arr, seed)?; + } + DataType::FixedSizeList(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_list(arr.iter(), seed)?; + } + DataType::Decimal32(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i32>(arr, seed)?; + } + DataType::Decimal64(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + Self::hash_primitive::<_, i64>(arr, seed)?; + } + DataType::Dictionary(key_type, _) => match key_type.as_ref() { + DataType::Int8 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::UInt8 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::Int16 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::UInt16 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::Int32 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::UInt32 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::Int64 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + DataType::UInt64 => { + let arr = arr + .as_any() + .downcast_ref::>() + .unwrap(); + Self::hash_dictionary(arr, seed)?; + } + _ => { + return exec_err!("Unsupported key type: {}", key_type); + } + }, + DataType::RunEndEncoded(_, _) => { + return exec_err!("Unsupported data type: {}", arr.data_type()); + } + }; + Ok(()) + } +} + +struct I256(i256); + +impl From for I256 { + fn from(value: i256) -> Self { + I256(value) + } +} + +impl ToBytes for I256 { + type Bytes = [u8; 32]; + + fn to_le_bytes(&self) -> Self::Bytes { + self.0.to_le_bytes() + } + + fn to_be_bytes(&self) -> Self::Bytes { + self.0.to_be_bytes() + } +} + +struct _IntervalDayTime(IntervalDayTime); + +impl From for _IntervalDayTime { + fn from(value: IntervalDayTime) -> Self { + _IntervalDayTime(value) + } +} + +impl ToBytes for _IntervalDayTime { + type Bytes = [u8; 8]; + + fn to_le_bytes(&self) -> Self::Bytes { + let days = self.0.days.to_le_bytes(); + let milliseconds = self.0.milliseconds.to_le_bytes(); + let mut bytes = [0; 8]; + bytes[..4].copy_from_slice(&days); + bytes[4..].copy_from_slice(&milliseconds); + bytes + } + + fn to_be_bytes(&self) -> Self::Bytes { + let days = self.0.days.to_be_bytes(); + let milliseconds = self.0.milliseconds.to_be_bytes(); + let mut bytes = [0; 8]; + bytes[..4].copy_from_slice(&days); + bytes[4..].copy_from_slice(&milliseconds); + bytes + } +} + +struct _IntervalMonthDayNano(IntervalMonthDayNano); + +impl From for _IntervalMonthDayNano { + fn from(value: IntervalMonthDayNano) -> Self { + _IntervalMonthDayNano(value) + } +} + +impl ToBytes for _IntervalMonthDayNano { + type Bytes = [u8; 16]; + + fn to_le_bytes(&self) -> Self::Bytes { + let months = self.0.months.to_le_bytes(); + let days = self.0.days.to_le_bytes(); + let nanoseconds = self.0.nanoseconds.to_le_bytes(); + let mut bytes = [0; 16]; + bytes[..4].copy_from_slice(&months); + bytes[4..8].copy_from_slice(&days); + bytes[8..].copy_from_slice(&nanoseconds); + bytes + } + + fn to_be_bytes(&self) -> Self::Bytes { + let months = self.0.months.to_be_bytes(); + let days = self.0.days.to_be_bytes(); + let nanoseconds = self.0.nanoseconds.to_be_bytes(); + let mut bytes = [0; 16]; + bytes[..4].copy_from_slice(&months); + bytes[4..8].copy_from_slice(&days); + bytes[8..].copy_from_slice(&nanoseconds); + bytes + } +} diff --git a/datafusion/spark/src/function/hash/xxhash64.rs b/datafusion/spark/src/function/hash/xxhash64.rs new file mode 100644 index 0000000000000..854bedbd6fa20 --- /dev/null +++ b/datafusion/spark/src/function/hash/xxhash64.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, PrimitiveArray}; +use arrow::datatypes::{DataType, Int64Type}; +use datafusion_common::Result; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +use std::any::Any; +use std::sync::Arc; +use twox_hash::XxHash64; + +use crate::function::hash::utils::SparkHasher; + +/// +#[derive(Debug, Hash, Eq, PartialEq)] +pub struct SparkXxHash64 { + signature: Signature, + seed: i64, +} + +impl Default for SparkXxHash64 { + fn default() -> Self { + Self::new() + } +} + +impl SparkXxHash64 { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + seed: 42, + } + } +} + +impl ScalarUDFImpl for SparkXxHash64 { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxhash64" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let func = |arr: &[ArrayRef]| { + let mut result = vec![self.seed; arr[0].len()]; + XxHash64Hasher::hash_arrays(arr, &mut result)?; + Ok(Arc::new(PrimitiveArray::::from(result)) as ArrayRef) + }; + make_scalar_function(func, vec![])(&args.args) + } +} + +struct XxHash64Hasher; + +impl SparkHasher for XxHash64Hasher { + fn oneshot(seed: i64, data: &[u8]) -> i64 { + XxHash64::oneshot(seed as u64, data) as i64 + } +} diff --git a/datafusion/sqllogictest/test_files/spark/hash/hash.slt b/datafusion/sqllogictest/test_files/spark/hash/hash.slt new file mode 100644 index 0000000000000..fbff7c9fce06c --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/hash/hash.slt @@ -0,0 +1,282 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file contains tests for hash function in Spark compatibility mode +# For more information, please see: +# https://github.com/apache/datafusion/issues/15914 + +# Test hash function with simple strings +query I +SELECT hash('Spark'); +---- +228093765 + +# Test hash with multiple values +query I +SELECT hash(expr) FROM VALUES ('foo'), ('bar'), ('baz') AS t(expr); +---- +1015597510 +-1808790533 +103115168 + +# Test hash with empty string +query I +SELECT hash(''); +---- +142593372 + +# Test hash with NULL values +query I +SELECT hash(NULL); +---- +42 + +# Test hash with binary data +query I +SELECT hash(X'48656c6c6f'); -- 'Hello' in hex +---- +825079905 + +query I +SELECT hash(X'537061726b'); -- 'Spark' in hex +---- +228093765 + +# Test hash with integer types +query I +SELECT hash(arrow_cast(42, 'Int8')); +---- +29417773 + +query I +SELECT hash(arrow_cast(42, 'UInt8')); +---- +29417773 + +query I +SELECT hash(arrow_cast(42, 'Int16')); +---- +29417773 + +query I +SELECT hash(arrow_cast(42, 'UInt16')); +---- +29417773 + +query I +SELECT hash(arrow_cast(42, 'Int32')); +---- +29417773 + +query I +SELECT hash(arrow_cast(42, 'UInt32')); +---- +29417773 + +query I +SELECT hash(arrow_cast(42, 'Int64')); +---- +1316951768 + +query I +SELECT hash(arrow_cast(42, 'UInt64')); +---- +1316951768 + +# Test hash with float types +query I +SELECT hash(arrow_cast(3.14, 'Float32')); +---- +808195277 + +query I +SELECT hash(arrow_cast(3.14, 'Float64')); +---- +834561050 + +query I +SELECT hash(arrow_cast(0.0, 'Float32')); +---- +933211791 + +query I +SELECT hash(arrow_cast(-0.0, 'Float32')); +---- +933211791 + +# Test hash with boolean types +query I +SELECT hash(true); +---- +-559580957 + +query I +SELECT hash(false); +---- +933211791 + +# Test hash with decimal types +query I +SELECT hash(arrow_cast(123.45, 'Decimal128(10,2)')); +---- +1416086240 + +query I +SELECT hash(arrow_cast(123456789012345678901234567890, 'Decimal256(30,0)')); +---- +737165439 + +# Test hash with timestamp types +query I +SELECT hash(TIMESTAMP '2023-01-01 12:00:00'); +---- +-263031675 + +query I +SELECT hash(TIMESTAMP '2023-01-01 12:00:00.123'); +---- +-800911551 + +query I +SELECT hash(TIMESTAMP '2023-01-01 12:00:00.123456'); +---- +-1123047081 + +query I +SELECT hash(TIMESTAMP '2023-01-01 12:00:00.123456789'); +---- +-1901595533 + +# Test hash with date types +query I +SELECT hash(arrow_cast('2023-01-01', 'Date32')); +---- +-69933992 + +query I +SELECT hash(arrow_cast('2023-01-01', 'Date64')); +---- +473315330 + +# Test hash with time types +query I +SELECT hash(arrow_cast('12:00:00', 'Time32(Second)')); +---- +-326478418 + +query I +SELECT hash(arrow_cast('12:00:00.123', 'Time32(Millisecond)')); +---- +-1564477167 + +query I +SELECT hash(arrow_cast('12:00:00.123456', 'Time64(Microsecond)')); +---- +362963970 + +query I +SELECT hash(arrow_cast('12:00:00.123456789', 'Time64(Nanosecond)')); +---- +-982328121 + +# Test hash with interval types +query I +SELECT hash(INTERVAL '1 day'); +---- +-269872653 + +query I +SELECT hash(INTERVAL '1 month 1 day 1 nanosecond'); +---- +-262533637 + +query I +SELECT hash(INTERVAL '1 year 1 month'); +---- +1281874930 + +# Test hash with duration types +query I +SELECT hash(arrow_cast(INTERVAL '1 second', 'Duration(Second)')); +---- +-1712319331 + +query I +SELECT hash(arrow_cast(INTERVAL '1 millisecond', 'Duration(Millisecond)')); +---- +-1712319331 + +query I +SELECT hash(arrow_cast(INTERVAL '1 microsecond', 'Duration(Microsecond)')); +---- +-1712319331 + +query I +SELECT hash(arrow_cast(INTERVAL '1 nanosecond', 'Duration(Nanosecond)')); +---- +-1712319331 + +# Test hash with fixed size binary +query I +SELECT hash(arrow_cast(X'0102030405', 'FixedSizeBinary(5)')); +---- +-2064006189 + +# Test hash with list types +query I +SELECT hash(arrow_cast([1, 2, 3], 'List(Int32)')); +---- +-912918097 + +query I +SELECT hash(arrow_cast(['a', 'b', 'c'], 'List(Utf8)')); +---- +-1512497118 + +# Test hash with struct types +query I +SELECT hash(named_struct('a', 1, 'b', 'test')); +---- +-1005837579 + +# Test hash with map types +query I +SELECT hash({'key1': 'value1', 'key2': 'value2'}); +---- +941599363 + +# Test hash with dictionary types +query I +SELECT hash(arrow_cast('test', 'Dictionary(Int32, Utf8)')); +---- +-2131272640 + +query I +SELECT hash('foo', 42); +---- +1697442603 + +# Test hash with multiple columns +query I +SELECT hash(col1, col2) FROM VALUES + ('foo', 42), + ('bar', 123), + ('baz', 456) AS t(col1, col2); +---- +1697442603 +-150815381 +-1728587230 diff --git a/datafusion/sqllogictest/test_files/spark/hash/xxhash64.slt b/datafusion/sqllogictest/test_files/spark/hash/xxhash64.slt new file mode 100644 index 0000000000000..eb37f1fdc13f8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/hash/xxhash64.slt @@ -0,0 +1,282 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file contains tests for xxhash64 function in Spark compatibility mode +# For more information, please see: +# https://github.com/apache/datafusion/issues/15914 + +# Test xxhash64 function with simple strings +query I +SELECT xxhash64('Spark'); +---- +-4294468057691064905 + +# Test xxhash64 with multiple values +query I +SELECT xxhash64(expr) FROM VALUES ('foo'), ('bar'), ('baz') AS t(expr); +---- +-3075308222547705278 +-1798770879548125814 +-7971906754264201004 + +# Test xxhash64 with empty string +query I +SELECT xxhash64(''); +---- +-7444071767201028348 + +# Test xxhash64 with NULL values +query I +SELECT xxhash64(NULL); +---- +42 + +# Test xxhash64 with binary data +query I +SELECT xxhash64(X'48656c6c6f'); -- 'Hello' in hex +---- +6777584228807376986 + +query I +SELECT xxhash64(X'537061726b'); -- 'Spark' in hex +---- +-4294468057691064905 + +# Test xxhash64 with integer types +query I +SELECT xxhash64(arrow_cast(42, 'Int8')); +---- +-387659249110444264 + +query I +SELECT xxhash64(arrow_cast(42, 'UInt8')); +---- +-387659249110444264 + +query I +SELECT xxhash64(arrow_cast(42, 'Int16')); +---- +-387659249110444264 + +query I +SELECT xxhash64(arrow_cast(42, 'UInt16')); +---- +-387659249110444264 + +query I +SELECT xxhash64(arrow_cast(42, 'Int32')); +---- +-387659249110444264 + +query I +SELECT xxhash64(arrow_cast(42, 'UInt32')); +---- +-387659249110444264 + +query I +SELECT xxhash64(arrow_cast(42, 'Int64')); +---- +-6876166290308861218 + +query I +SELECT xxhash64(arrow_cast(42, 'UInt64')); +---- +-6876166290308861218 + +# Test xxhash64 with float types +query I +SELECT xxhash64(arrow_cast(3.14, 'Float32')); +---- +-7283597122065597797 + +query I +SELECT xxhash64(arrow_cast(3.14, 'Float64')); +---- +-4595706691642582856 + +query I +SELECT xxhash64(arrow_cast(0.0, 'Float32')); +---- +3614696996920510707 + +query I +SELECT xxhash64(arrow_cast(-0.0, 'Float32')); +---- +3614696996920510707 + +# Test xxhash64 with boolean types +query I +SELECT xxhash64(true); +---- +-6698625589789238999 + +query I +SELECT xxhash64(false); +---- +3614696996920510707 + +# Test xxhash64 with decimal types +query I +SELECT xxhash64(arrow_cast(123.45, 'Decimal128(10,2)')); +---- +8791244235932249694 + +query I +SELECT xxhash64(arrow_cast(123456789012345678901234567890, 'Decimal256(30,0)')); +---- +4944037401260857918 + +# Test xxhash64 with timestamp types +query I +SELECT xxhash64(TIMESTAMP '2023-01-01 12:00:00'); +---- +-1407378931393080602 + +query I +SELECT xxhash64(TIMESTAMP '2023-01-01 12:00:00.123'); +---- +5398060225209591112 + +query I +SELECT xxhash64(TIMESTAMP '2023-01-01 12:00:00.123456'); +---- +-1165170176447128531 + +query I +SELECT xxhash64(TIMESTAMP '2023-01-01 12:00:00.123456789'); +---- +-6763638591191357453 + +# Test xxhash64 with date types +query I +SELECT xxhash64(arrow_cast('2023-01-01', 'Date32')); +---- +-6043813029439407350 + +query I +SELECT xxhash64(arrow_cast('2023-01-01', 'Date64')); +---- +-2921948568039682661 + +# Test xxhash64 with time types +query I +SELECT xxhash64(arrow_cast('12:00:00', 'Time32(Second)')); +---- +-8977157097864536298 + +query I +SELECT xxhash64(arrow_cast('12:00:00.123', 'Time32(Millisecond)')); +---- +-9067238489088955330 + +query I +SELECT xxhash64(arrow_cast('12:00:00.123456', 'Time64(Microsecond)')); +---- +-8151313266931874180 + +query I +SELECT xxhash64(arrow_cast('12:00:00.123456789', 'Time64(Nanosecond)')); +---- +1133172798740045464 + +# Test xxhash64 with interval types +query I +SELECT xxhash64(INTERVAL '1 day'); +---- +7906059383252910675 + +query I +SELECT xxhash64(INTERVAL '1 month 1 day 1 nanosecond'); +---- +8958388478287045343 + +query I +SELECT xxhash64(INTERVAL '1 year 1 month'); +---- +5298753258255735493 + +# Test xxhash64 with duration types +query I +SELECT xxhash64(arrow_cast(INTERVAL '1 second', 'Duration(Second)')); +---- +-7001672635703045582 + +query I +SELECT xxhash64(arrow_cast(INTERVAL '1 millisecond', 'Duration(Millisecond)')); +---- +-7001672635703045582 + +query I +SELECT xxhash64(arrow_cast(INTERVAL '1 microsecond', 'Duration(Microsecond)')); +---- +-7001672635703045582 + +query I +SELECT xxhash64(arrow_cast(INTERVAL '1 nanosecond', 'Duration(Nanosecond)')); +---- +-7001672635703045582 + +# Test xxhash64 with fixed size binary +query I +SELECT xxhash64(arrow_cast(X'0102030405', 'FixedSizeBinary(5)')); +---- +8668608334415110682 + +# Test xxhash64 with list types +query I +SELECT xxhash64(arrow_cast([1, 2, 3], 'List(Int32)')); +---- +8592097078962733837 + +query I +SELECT xxhash64(arrow_cast(['a', 'b', 'c'], 'List(Utf8)')); +---- +-8775012835737190202 + +# Test xxhash64 with struct types +query I +SELECT xxhash64(named_struct('a', 1, 'b', 'test')); +---- +-8935072733071425019 + +# Test xxhash64 with map types +query I +SELECT xxhash64({'key1': 'value1', 'key2': 'value2'}); +---- +7222145071341353886 + +# Test xxhash64 with dictionary types +query I +SELECT xxhash64(arrow_cast('test', 'Dictionary(Int32, Utf8)')); +---- +-3577179998404463756 + +query I +SELECT xxhash64('foo', 42); +---- +7795082500386061179 + +# Test xxhash64 with multiple columns +query I +SELECT xxhash64(col1, col2) FROM VALUES + ('foo', 42), + ('bar', 123), + ('baz', 456) AS t(col1, col2); +---- +7795082500386061179 +2320365627537839193 +2643631156015080397