diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index e937b4ea549c..abfa09a98ccd 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -20,13 +20,17 @@ use crate::error::{DataFusionError, Result}; use ahash::{CallHasher, RandomState}; use arrow::array::{ - Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - UInt16Array, UInt32Array, UInt64Array, UInt8Array, + Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DictionaryArray, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, +}; +use arrow::datatypes::{ + ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Int16Type, Int32Type, + Int64Type, Int8Type, Schema, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; -use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use std::collections::HashSet; +use std::sync::Arc; use crate::logical_plan::JoinType; use crate::physical_plan::expressions::Column; @@ -245,9 +249,60 @@ macro_rules! hash_array_float { }; } -/// Creates hash values for every row, based on the values in the columns +/// Hash the values in a dictionary array +fn create_hashes_dictionary( + array: &ArrayRef, + random_state: &RandomState, + hashes_buffer: &mut Vec, + multi_col: bool, +) -> Result<()> { + let dict_array = array.as_any().downcast_ref::>().unwrap(); + + // Hash each dictionary value once, and then use that computed + // hash for each key value to avoid a potentially expensive + // redundant hashing for large dictionary elements (e.g. strings) + let dict_values = Arc::clone(dict_array.values()); + let mut dict_hashes = vec![0; dict_values.len()]; + create_hashes(&[dict_values], random_state, &mut dict_hashes)?; + + // combine hash for each index in values + if multi_col { + for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) { + if let Some(key) = key { + let idx = key + .to_usize() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Can not convert key value {:?} to usize in dictionary of type {:?}", + key, dict_array.data_type() + )) + })?; + *hash = combine_hashes(dict_hashes[idx], *hash) + } // no update for Null, consistent with other hashes + } + } else { + for (hash, key) in hashes_buffer.iter_mut().zip(dict_array.keys().iter()) { + if let Some(key) = key { + let idx = key + .to_usize() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Can not convert key value {:?} to usize in dictionary of type {:?}", + key, dict_array.data_type() + )) + })?; + *hash = dict_hashes[idx] + } // no update for Null, consistent with other hashes + } + } + Ok(()) +} + +/// Creates hash values for every row, based on the values in the +/// columns. /// -/// This implements so-called "vectorized hashing" +/// The number of rows to hash is determined by `hashes_buffer.len()`. +/// `hashes_buffer` should be pre-sized appropriately pub fn create_hashes<'a>( arrays: &[ArrayRef], random_state: &RandomState, @@ -438,11 +493,84 @@ pub fn create_hashes<'a>( multi_col ); } + DataType::Dictionary(index_type, _) => match **index_type { + DataType::Int8 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::Int16 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::Int32 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::Int64 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::UInt8 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::UInt16 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::UInt32 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + DataType::UInt64 => { + create_hashes_dictionary::( + col, + random_state, + hashes_buffer, + multi_col, + )?; + } + _ => { + return Err(DataFusionError::Internal(format!( + "Unsupported dictionary type in hasher hashing: {}", + col.data_type(), + ))) + } + }, _ => { // This is internal because we should have caught this before. - return Err(DataFusionError::Internal( - "Unsupported data type in hasher".to_string(), - )); + return Err(DataFusionError::Internal(format!( + "Unsupported data type in hasher: {}", + col.data_type() + ))); } } } @@ -453,6 +581,8 @@ pub fn create_hashes<'a>( mod tests { use std::sync::Arc; + use arrow::{array::DictionaryArray, datatypes::Int8Type}; + use super::*; fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> Result<()> { @@ -529,4 +659,78 @@ mod tests { Ok(()) } + + #[test] + fn create_hashes_for_dict_arrays() { + let strings = vec![Some("foo"), None, Some("bar"), Some("foo"), None]; + + let string_array = Arc::new(strings.iter().cloned().collect::()); + let dict_array = Arc::new( + strings + .iter() + .cloned() + .collect::>(), + ); + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + + let mut string_hashes = vec![0; strings.len()]; + create_hashes(&[string_array], &random_state, &mut string_hashes).unwrap(); + + let mut dict_hashes = vec![0; strings.len()]; + create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap(); + + // Null values result in a zero hash, + for (val, hash) in strings.iter().zip(string_hashes.iter()) { + match val { + Some(_) => assert_ne!(*hash, 0), + None => assert_eq!(*hash, 0), + } + } + + // same logical values should hash to the same hash value + assert_eq!(string_hashes, dict_hashes); + + // Same values should map to same hash values + assert_eq!(strings[1], strings[4]); + assert_eq!(dict_hashes[1], dict_hashes[4]); + assert_eq!(strings[0], strings[3]); + assert_eq!(dict_hashes[0], dict_hashes[3]); + + // different strings should matp to different hash values + assert_ne!(strings[0], strings[2]); + assert_ne!(dict_hashes[0], dict_hashes[2]); + } + + #[test] + fn create_multi_column_hash_for_dict_arrays() { + let strings1 = vec![Some("foo"), None, Some("bar")]; + let strings2 = vec![Some("blarg"), Some("blah"), None]; + + let string_array = Arc::new(strings1.iter().cloned().collect::()); + let dict_array = Arc::new( + strings2 + .iter() + .cloned() + .collect::>(), + ); + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + + let mut one_col_hashes = vec![0; strings1.len()]; + create_hashes(&[dict_array.clone()], &random_state, &mut one_col_hashes).unwrap(); + + let mut two_col_hashes = vec![0; strings1.len()]; + create_hashes( + &[dict_array, string_array], + &random_state, + &mut two_col_hashes, + ) + .unwrap(); + + assert_eq!(one_col_hashes.len(), 3); + assert_eq!(two_col_hashes.len(), 3); + + assert_ne!(one_col_hashes, two_col_hashes); + } }