diff --git a/Cargo.lock b/Cargo.lock index 5e8159cc829c..3fcac108c468 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2629,6 +2629,7 @@ version = "49.0.0" dependencies = [ "arrow", "chrono", + "crc32fast", "criterion", "datafusion-catalog", "datafusion-common", @@ -2638,6 +2639,8 @@ dependencies = [ "datafusion-macros", "log", "rand 0.9.2", + "sha1", + "xxhash-rust", ] [[package]] @@ -5953,6 +5956,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -7596,6 +7610,12 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "xz2" version = "0.1.7" diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index bc7ae380f793..73b406257b84 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -38,6 +38,7 @@ name = "datafusion_spark" [dependencies] arrow = { workspace = true } chrono = { workspace = true } +crc32fast = "1.4" datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } @@ -45,6 +46,8 @@ datafusion-expr = { workspace = true } datafusion-functions = { workspace = true, features = ["crypto_expressions"] } datafusion-macros = { workspace = true } log = { workspace = true } +sha1 = "0.10" +xxhash-rust = { version = "0.8", features = ["xxh3"] } [dev-dependencies] criterion = { workspace = true } diff --git a/datafusion/spark/src/function/hash/crc32.rs b/datafusion/spark/src/function/hash/crc32.rs new file mode 100644 index 000000000000..ad23380459f5 --- /dev/null +++ b/datafusion/spark/src/function/hash/crc32.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array}; +use arrow::datatypes::DataType; +use crc32fast::Hasher; +use datafusion_common::cast::{ + as_binary_array, as_binary_view_array, as_large_binary_array, +}; +use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +/// +#[derive(Debug)] +pub struct SparkCrc32 { + signature: Signature, +} + +impl Default for SparkCrc32 { + fn default() -> Self { + Self::new() + } +} + +impl SparkCrc32 { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkCrc32 { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "crc32" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_crc32, vec![])(&args.args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + if arg_types.len() != 1 { + return exec_err!( + "`crc32` function requires 1 argument, got {}", + arg_types.len() + ); + } + match arg_types[0] { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + Ok(vec![arg_types[0].clone()]) + } + DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]), + DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]), + DataType::Null => Ok(vec![DataType::Binary]), + _ => exec_err!("`crc32` function does not support type {}", arg_types[0]), + } + } +} + +fn spark_crc32_digest(value: &[u8]) -> i64 { + let mut hasher = Hasher::new(); + hasher.update(value); + hasher.finalize() as i64 +} + +fn spark_crc32_impl<'a>(input: impl Iterator>) -> ArrayRef { + let result = input + .map(|value| value.map(spark_crc32_digest)) + .collect::(); + Arc::new(result) +} + +fn spark_crc32(args: &[ArrayRef]) -> Result { + let [input] = args else { + return internal_err!( + "Spark `crc32` function requires 1 argument, got {}", + args.len() + ); + }; + + match input.data_type() { + DataType::Binary => { + let input = as_binary_array(input)?; + Ok(spark_crc32_impl(input.iter())) + } + DataType::LargeBinary => { + let input = as_large_binary_array(input)?; + Ok(spark_crc32_impl(input.iter())) + } + DataType::BinaryView => { + let input = as_binary_view_array(input)?; + Ok(spark_crc32_impl(input.iter())) + } + _ => { + exec_err!( + "Spark `crc32` function: argument must be binary or large binary, got {:?}", + input.data_type() + ) + } + } +} diff --git a/datafusion/spark/src/function/hash/mod.rs b/datafusion/spark/src/function/hash/mod.rs index f31918e6a46b..5860596ac70a 100644 --- a/datafusion/spark/src/function/hash/mod.rs +++ b/datafusion/spark/src/function/hash/mod.rs @@ -15,19 +15,27 @@ // specific language governing permissions and limitations // under the License. +pub mod crc32; +pub mod sha1; pub mod sha2; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; use std::sync::Arc; +make_udf_function!(crc32::SparkCrc32, crc32); +make_udf_function!(sha1::SparkSha1, sha1); make_udf_function!(sha2::SparkSha2, sha2); pub mod expr_fn { use datafusion_functions::export_functions; - export_functions!((sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2)); + export_functions!( + (crc32, "crc32(expr) - Returns a cyclic redundancy check value of the expr as a bigint.", arg1), + (sha1, "sha1(expr) - Returns a SHA-1 hash value of the expr as a hex string.", arg1), + (sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2) + ); } pub fn functions() -> Vec> { - vec![sha2()] + vec![crc32(), sha1(), sha2()] } diff --git a/datafusion/spark/src/function/hash/sha1.rs b/datafusion/spark/src/function/hash/sha1.rs new file mode 100644 index 000000000000..8c635d8dc9be --- /dev/null +++ b/datafusion/spark/src/function/hash/sha1.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::Write; +use std::sync::Arc; + +use arrow::array::{ArrayRef, StringArray}; +use arrow::datatypes::DataType; +use datafusion_common::cast::{ + as_binary_array, as_binary_view_array, as_large_binary_array, +}; +use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; +use sha1::{Digest, Sha1}; + +/// +#[derive(Debug)] +pub struct SparkSha1 { + signature: Signature, + aliases: Vec, +} + +impl Default for SparkSha1 { + fn default() -> Self { + Self::new() + } +} + +impl SparkSha1 { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + aliases: vec!["sha".to_string()], + } + } +} + +impl ScalarUDFImpl for SparkSha1 { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "sha1" + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_sha1, vec![])(&args.args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + if arg_types.len() != 1 { + return exec_err!( + "`sha1` function requires 1 argument, got {}", + arg_types.len() + ); + } + match arg_types[0] { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + Ok(vec![arg_types[0].clone()]) + } + DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]), + DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]), + DataType::Null => Ok(vec![DataType::Binary]), + _ => exec_err!("`sha1` function does not support type {}", arg_types[0]), + } + } +} + +fn spark_sha1_digest(value: &[u8]) -> String { + let result = Sha1::digest(value); + let mut s = String::with_capacity(result.len() * 2); + for b in result.as_slice() { + #[allow(clippy::unwrap_used)] + write!(&mut s, "{b:02x}").unwrap(); + } + s +} + +fn spark_sha1_impl<'a>(input: impl Iterator>) -> ArrayRef { + let result = input + .map(|value| value.map(spark_sha1_digest)) + .collect::(); + Arc::new(result) +} + +fn spark_sha1(args: &[ArrayRef]) -> Result { + let [input] = args else { + return internal_err!( + "Spark `sha1` function requires 1 argument, got {}", + args.len() + ); + }; + + match input.data_type() { + DataType::Binary => { + let input = as_binary_array(input)?; + Ok(spark_sha1_impl(input.iter())) + } + DataType::LargeBinary => { + let input = as_large_binary_array(input)?; + Ok(spark_sha1_impl(input.iter())) + } + DataType::BinaryView => { + let input = as_binary_view_array(input)?; + Ok(spark_sha1_impl(input.iter())) + } + _ => { + exec_err!( + "Spark `sha1` function: argument must be binary or large binary, got {:?}", + input.data_type() + ) + } + } +} diff --git a/datafusion/sqllogictest/test_files/spark/hash/crc32.slt b/datafusion/sqllogictest/test_files/spark/hash/crc32.slt index b28a3fc13b51..87b69d8d404e 100644 --- a/datafusion/sqllogictest/test_files/spark/hash/crc32.slt +++ b/datafusion/sqllogictest/test_files/spark/hash/crc32.slt @@ -23,5 +23,54 @@ ## Original Query: SELECT crc32('Spark'); ## PySpark 3.5.5 Result: {'crc32(Spark)': 1557323817, 'typeof(crc32(Spark))': 'bigint', 'typeof(Spark)': 'string'} -#query -#SELECT crc32('Spark'::string); + +# Basic crc32 tests +query I +SELECT crc32('Spark'); +---- +1557323817 + +query I +SELECT crc32(NULL); +---- +NULL + +query I +SELECT crc32(''); +---- +0 + +query I +SELECT crc32(arrow_cast('', 'Binary')); +---- +0 + +# Test with LargeUtf8 (using CAST to ensure type) +query I +SELECT crc32(arrow_cast('Spark', 'LargeUtf8')); +---- +1557323817 + +# Test with Utf8View (using CAST to ensure type) +query I +SELECT crc32(arrow_cast('Spark', 'Utf8View')); +---- +1557323817 + +# Test with different binary types +query I +SELECT crc32(arrow_cast('Spark', 'Binary')); +---- +1557323817 + +# Test with LargeBinary +query I +SELECT crc32(arrow_cast('Spark', 'LargeBinary')); +---- +1557323817 + +# Test with BinaryView +query I +SELECT crc32(arrow_cast('Spark', 'BinaryView')); +---- +1557323817 diff --git a/datafusion/sqllogictest/test_files/spark/hash/sha.slt b/datafusion/sqllogictest/test_files/spark/hash/sha.slt index bf205f7e0aaf..c7710aa6a763 100644 --- a/datafusion/sqllogictest/test_files/spark/hash/sha.slt +++ b/datafusion/sqllogictest/test_files/spark/hash/sha.slt @@ -23,5 +23,49 @@ ## Original Query: SELECT sha('Spark'); ## PySpark 3.5.5 Result: {'sha(Spark)': '85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c', 'typeof(sha(Spark))': 'string', 'typeof(Spark)': 'string'} -#query -#SELECT sha('Spark'::string); + +# Basic sha tests +query T +SELECT sha('Spark'); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +query T +SELECT sha(NULL); +---- +NULL + +query T +SELECT sha(''); +---- +da39a3ee5e6b4b0d3255bfef95601890afd80709 + +# Test with LargeUtf8 (using CAST to ensure type) +query T +SELECT sha(arrow_cast('Spark', 'LargeUtf8')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with Utf8View (using CAST to ensure type) +query T +SELECT sha(arrow_cast('Spark', 'Utf8View')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with Binary +query T +SELECT sha(arrow_cast('Spark', 'Binary')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with LargeBinary +query T +SELECT sha(arrow_cast('Spark', 'LargeBinary')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with BinaryView +query T +SELECT sha(arrow_cast('Spark', 'BinaryView')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c diff --git a/datafusion/sqllogictest/test_files/spark/hash/sha1.slt b/datafusion/sqllogictest/test_files/spark/hash/sha1.slt index afaa862391bf..1ce734616072 100644 --- a/datafusion/sqllogictest/test_files/spark/hash/sha1.slt +++ b/datafusion/sqllogictest/test_files/spark/hash/sha1.slt @@ -23,5 +23,49 @@ ## Original Query: SELECT sha1('Spark'); ## PySpark 3.5.5 Result: {'sha1(Spark)': '85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c', 'typeof(sha1(Spark))': 'string', 'typeof(Spark)': 'string'} -#query -#SELECT sha1('Spark'::string); + +# Basic sha1 tests +query T +SELECT sha1('Spark'); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +query T +SELECT sha1(NULL); +---- +NULL + +query T +SELECT sha1(''); +---- +da39a3ee5e6b4b0d3255bfef95601890afd80709 + +# Test with LargeUtf8 (using CAST to ensure type) +query T +SELECT sha1(arrow_cast('Spark', 'LargeUtf8')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with Utf8View (using CAST to ensure type) +query T +SELECT sha1(arrow_cast('Spark', 'Utf8View')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with Binary +query T +SELECT sha1(arrow_cast('Spark', 'Binary')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with LargeBinary +query T +SELECT sha1(arrow_cast('Spark', 'LargeBinary')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c + +# Test with BinaryView +query T +SELECT sha1(arrow_cast('Spark', 'BinaryView')); +---- +85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c