-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat(spark): implement spark hash function murmur/xxhash64 #17093
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| let ints = data.as_ptr().add(i) as *const i32; | ||
| let mut half_word = ints.read_unaligned(); | ||
| if cfg!(target_endian = "big") { | ||
| half_word = half_word.reverse_bits(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check whether .reverse_bits() needs to be replaced to .swap_bytes() here
In scala version there is some code that ensures little-endian byte order for this variable:
https://github.com/scala/scala/blob/3f6bdaeafde17d790023cc3f299b81eaaf876ca3/src/library/scala/util/hashing/MurmurHash3.scala#L210
But byte order isn't bit order, bit order is not changed here, so reverse_bits may produce different result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this code ever run on a big endian target? I'd just as soon remove the code and leave a comment for anyone who wants to lift the code that this implementation requires little endian targets. Even the original murmur3 implementation in C++ doesn't worry about endianness, instead mentioning in a comment where to swap bytes if necessary. https://github.com/aappleby/smhasher/blob/0ff96f7835817a27d0487325b6c16033e2992eb5/src/MurmurHash3.cpp#L52C46-L52C52
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbutrovich could you please tell me your suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think big endian should be considered; here's a comment from arrow-rs about how that doesn't target big endian: apache/arrow-rs#6917 (comment)
So for simplicity we could just remove this cfg?
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot of code here, with very little to none documentation, explanation or justification. I would love if it some documentation was provided so it's easier to review and for anyone looking at the code in the future; for example, if a specific algorithm was pulled from some other code, provide a link so we have a reference.
| fn try_hash_impl<T, H: Copy>( | ||
| arr: ArrayIter<impl ArrayAccessor<Item = T>>, | ||
| seed: &mut [H], | ||
| func: impl Fn(H, T) -> Result<H>, | ||
| ) -> Result<()> { | ||
| let len = arr.len(); | ||
| if len != seed.len() { | ||
| return exec_err!("Array length mismatch: {} != {}", len, seed.len()); | ||
| } | ||
| for (hash, elem) in seed.iter_mut().zip(arr) { | ||
| if let Some(elem) = elem { | ||
| *hash = func(*hash, elem)?; | ||
| } | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I see, this is only used for:
- Nested types like list & map
- try_hash_primitive which is only used for decimal 128 where precision < 19, I assume to special case for some performance benefit?
Is it worth considering removing this altogether for further simplification to remove all result types from these hash methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by removing all result types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can tell, result is only needed for:
- Checking array length mismatch with seed length, which I feel can be omitted (at worst we can debug_assert it since we control the invocations of the function)
- Special case for Decimal128 which can fit in i64
Yet because of using Result here, all the functions must return Result; I'm wondering if the complexity is worth it or if we can remove these checks/special case for a more simplified flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got you. for Decimal128 we can use debug assert, but for nested types, it's hard, it's not only used for lengh checking. for nested types, it will call hash function recursively. hash function may throw other errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I missed that having unsupported data types is valid error case; I guess having Result is unavoidable in that case
| pub trait SparkHasher<H: Copy + std::fmt::Debug> { | ||
| fn oneshot(seed: H, data: &[u8]) -> H; | ||
|
|
||
| fn hash_boolean(arr: &BooleanArray, seed: &mut [H]) -> Result<()> { | ||
| let true_bytes = 1_i32.to_le_bytes(); | ||
| let false_bytes = 0_i32.to_le_bytes(); | ||
| hash_impl(arr.iter(), seed, |seed, v| { | ||
| if v { | ||
| Self::oneshot(seed, &true_bytes) | ||
| } else { | ||
| Self::oneshot(seed, &false_bytes) | ||
| } | ||
| })?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn hash_primitive<T, U>(arr: &PrimitiveArray<T>, seed: &mut [H]) -> Result<()> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would love it if the generic names were more descriptive; seeing H, T and U all over the place but having to remember which does what (especially as SparkHasher has a H which hash_impl has as well, but functions inside SparkHasher have T and U but only T is shared with hash_impl) gets confusing real fast, especially as there is a lack of any explaining documentation.
|
|
||
| /// Computes the xxHash64 hash of the given data | ||
| fn hash(arr: &ArrayRef, seed: &mut [H]) -> Result<()> { | ||
| match arr.data_type() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't help feel there's a more ergonomic way to do this, perhaps using downcast_primitive_array!() macro? I'm still having trouble understanding all the parts of this code so I can't think of anything concrete yet.
| DataType::UInt16 => { | ||
| let arr = arr.as_any().downcast_ref::<UInt16Array>().unwrap(); | ||
| Self::hash_primitive::<_, i32>(arr, seed)?; | ||
| } | ||
| DataType::Int32 => { | ||
| let arr = arr.as_any().downcast_ref::<Int32Array>().unwrap(); | ||
| Self::hash_primitive::<_, i32>(arr, seed)?; | ||
| } | ||
| DataType::UInt32 => { | ||
| let arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap(); | ||
| Self::hash_primitive::<_, u32>(arr, seed)?; | ||
| } | ||
| DataType::Int64 => { | ||
| let arr = arr.as_any().downcast_ref::<Int64Array>().unwrap(); | ||
| Self::hash_primitive::<_, i64>(arr, seed)?; | ||
| } | ||
| DataType::UInt64 => { | ||
| let arr = arr.as_any().downcast_ref::<UInt64Array>().unwrap(); | ||
| Self::hash_primitive::<_, u64>(arr, seed)?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me to understand why types like UInt8, UInt16 use i32 in the generic similar to Int8, Int16 and Int32, but UInt32 uses u32?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but this doesn't explain anything to me 🙁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, maybe I missunderstand your meaning. so you mean UInt8, UInt16 should use u32?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| #[derive(Debug, Hash, Eq, PartialEq)] | ||
| pub struct SparkXxHash64 { | ||
| signature: Signature, | ||
| seed: i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to pull this out as a const, considering it's a hardcoded value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in spark it's also a member variable. It allows user to create their own udf with different seed. https://github.com/apache/spark/blob/e08c15b62712303942284cb90d6a1b004f69652c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L757
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in this code it's a private member variable that cannot be changed
| _ => { | ||
| return exec_err!("Unsupported data type: {}", arr.data_type()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this catch all so we know which ones are missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we remove this, when there's new data type added into arrow, we have to support it in hash function instantly. It may make arrow version bump much harder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion that is better than not realizing we need to extend this until some runtime error later by some user reveals it to us
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| let ints = data.as_ptr().add(i) as *const i32; | ||
| let mut half_word = ints.read_unaligned(); | ||
| if cfg!(target_endian = "big") { | ||
| half_word = half_word.reverse_bits(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think big endian should be considered; here's a comment from arrow-rs about how that doesn't target big endian: apache/arrow-rs#6917 (comment)
So for simplicity we could just remove this cfg?
| unsafe fn hash_bytes_by_int(data: &[u8], seed: u32) -> i32 { | ||
| // safety: data length must be aligned to 4 bytes | ||
| let mut h1 = seed as i32; | ||
| for i in (0..data.len()).step_by(4) { | ||
| let ints = data.as_ptr().add(i) as *const i32; | ||
| let mut half_word = ints.read_unaligned(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this unsafety strictly necessary? Do we get a noticeable performance benefit? Or is it mainly because the unsafe API makes it easier to get an i32 as we iterate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copy this part from sail or comet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we tag the original author here if possible? Or check if they had benchmarks for this code path? Whilst the unsafe does look ok to me, it would be nice if we could avoid unsafe code unless we have justification like tangible benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added original author
b4e9331 to
a2b11ed
Compare
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
add murmur/xxhash64 support, similar to implementation in datafusion-comet and sail, but more types are supported.
and use generic rather than macro, make it a bit easier to maintain.
Are these changes tested?
UT
Are there any user-facing changes?
No