Skip to content

Commit

Permalink
Fix join on arrays of unhashable types and allow hash join on all typ…
Browse files Browse the repository at this point in the history
…es supported at run-time (apache#13388)

* Remove unused code paths from create_hashes

The `downcast_primitive_array!` macro handles all primitive types
and only then delegates to fallbacks. It handles Decimal128 and
Decimal256 internally.

* Fix join on arrays of unhashable types and allow hash join on all types supported at run-time apache#13388

Update can_hash to match currently supported hashes.

* Rename table_with_many_types in tests

* Test join on binary is hash join
  • Loading branch information
findepi authored Nov 19, 2024
1 parent c44b613 commit 9fb5ff9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 36 deletions.
10 changes: 1 addition & 9 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow_buffer::IntervalMonthDayNano;
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
as_string_array, as_string_view_array, as_struct_array,
};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -392,14 +392,6 @@ pub fn create_hashes<'a>(
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Decimal128(_, _) => {
let array = as_primitive_array::<Decimal128Type>(array)?;
hash_array_primitive(array, random_state, hashes_buffer, rehash)
}
DataType::Decimal256(_, _) => {
let array = as_primitive_array::<Decimal256Type>(array)?;
hash_array_primitive(array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
Expand Down
59 changes: 42 additions & 17 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
};
use datafusion_expr_common::signature::{Signature, TypeSignature};

use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
Expand Down Expand Up @@ -958,7 +958,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(

/// Can this data type be used in hash join equal conditions??
/// Data types here come from function 'equal_rows', if more data types are supported
/// in equal_rows(hash join), add those data types here to generate join logical plan.
/// in create_hashes, add those data types here to generate join logical plan.
pub fn can_hash(data_type: &DataType) -> bool {
match data_type {
DataType::Null => true,
Expand All @@ -971,31 +971,38 @@ pub fn can_hash(data_type: &DataType) -> bool {
DataType::UInt16 => true,
DataType::UInt32 => true,
DataType::UInt64 => true,
DataType::Float16 => true,
DataType::Float32 => true,
DataType::Float64 => true,
DataType::Timestamp(time_unit, _) => match time_unit {
TimeUnit::Second => true,
TimeUnit::Millisecond => true,
TimeUnit::Microsecond => true,
TimeUnit::Nanosecond => true,
},
DataType::Decimal128(_, _) => true,
DataType::Decimal256(_, _) => true,
DataType::Timestamp(_, _) => true,
DataType::Utf8 => true,
DataType::LargeUtf8 => true,
DataType::Utf8View => true,
DataType::Decimal128(_, _) => true,
DataType::Binary => true,
DataType::LargeBinary => true,
DataType::BinaryView => true,
DataType::Date32 => true,
DataType::Date64 => true,
DataType::Time32(_) => true,
DataType::Time64(_) => true,
DataType::Duration(_) => true,
DataType::Interval(_) => true,
DataType::FixedSizeBinary(_) => true,
DataType::Dictionary(key_type, value_type)
if *value_type.as_ref() == DataType::Utf8 =>
{
DataType::is_dictionary_key_type(key_type)
DataType::Dictionary(key_type, value_type) => {
DataType::is_dictionary_key_type(key_type) && can_hash(value_type)
}
DataType::List(_) => true,
DataType::LargeList(_) => true,
DataType::FixedSizeList(_, _) => true,
DataType::List(value_type) => can_hash(value_type.data_type()),
DataType::LargeList(value_type) => can_hash(value_type.data_type()),
DataType::FixedSizeList(value_type, _) => can_hash(value_type.data_type()),
DataType::Map(map_struct, true | false) => can_hash(map_struct.data_type()),
DataType::Struct(fields) => fields.iter().all(|f| can_hash(f.data_type())),
_ => false,

DataType::ListView(_)
| DataType::LargeListView(_)
| DataType::Union(_, _)
| DataType::RunEndEncoded(_, _) => false,
}
}

Expand Down Expand Up @@ -1403,6 +1410,7 @@ mod tests {
test::function_stub::max_udaf, test::function_stub::min_udaf,
test::function_stub::sum_udaf, Cast, ExprFunctionExt, WindowFunctionDefinition,
};
use arrow::datatypes::{UnionFields, UnionMode};

#[test]
fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
Expand Down Expand Up @@ -1805,4 +1813,21 @@ mod tests {
assert!(accum.contains(&Column::from_name("a")));
Ok(())
}

#[test]
fn test_can_hash() {
let union_fields: UnionFields = [
(0, Arc::new(Field::new("A", DataType::Int32, true))),
(1, Arc::new(Field::new("B", DataType::Float64, true))),
]
.into_iter()
.collect();

let union_type = DataType::Union(union_fields, UnionMode::Sparse);
assert!(!can_hash(&union_type));

let list_union_type =
DataType::List(Arc::new(Field::new("my_union", union_type, true)));
assert!(!can_hash(&list_union_type));
}
}
9 changes: 7 additions & 2 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ impl TestContext {
let example_udf = create_example_udf();
test_ctx.ctx.register_udf(example_udf);
register_partition_table(&mut test_ctx).await;
info!("Registering table with many types");
register_table_with_many_types(test_ctx.session_ctx()).await;
}
"metadata.slt" => {
info!("Registering metadata table tables");
Expand Down Expand Up @@ -251,8 +253,11 @@ pub async fn register_table_with_many_types(ctx: &SessionContext) {
.unwrap();
ctx.register_catalog("my_catalog", Arc::new(catalog));

ctx.register_table("my_catalog.my_schema.t2", table_with_many_types())
.unwrap();
ctx.register_table(
"my_catalog.my_schema.table_with_many_types",
table_with_many_types(),
)
.unwrap();
}

pub async fn register_table_with_map(ctx: &SessionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ query TTTTITTTIIIIIIT rowsort
SELECT * from information_schema.columns;
----
my_catalog my_schema t1 i 0 NULL YES Int32 NULL NULL 32 2 NULL NULL NULL
my_catalog my_schema t2 binary_col 4 NULL NO Binary NULL 2147483647 NULL NULL NULL NULL NULL
my_catalog my_schema t2 float64_col 1 NULL YES Float64 NULL NULL 24 2 NULL NULL NULL
my_catalog my_schema t2 int32_col 0 NULL NO Int32 NULL NULL 32 2 NULL NULL NULL
my_catalog my_schema t2 large_binary_col 5 NULL NO LargeBinary NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema t2 large_utf8_col 3 NULL NO LargeUtf8 NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema t2 timestamp_nanos 6 NULL NO Timestamp(Nanosecond, None) NULL NULL NULL NULL NULL NULL NULL
my_catalog my_schema t2 utf8_col 2 NULL YES Utf8 NULL 2147483647 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types binary_col 4 NULL NO Binary NULL 2147483647 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types float64_col 1 NULL YES Float64 NULL NULL 24 2 NULL NULL NULL
my_catalog my_schema table_with_many_types int32_col 0 NULL NO Int32 NULL NULL 32 2 NULL NULL NULL
my_catalog my_schema table_with_many_types large_binary_col 5 NULL NO LargeBinary NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types large_utf8_col 3 NULL NO LargeUtf8 NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types timestamp_nanos 6 NULL NO Timestamp(Nanosecond, None) NULL NULL NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types utf8_col 2 NULL YES Utf8 NULL 2147483647 NULL NULL NULL NULL NULL

# Cleanup
statement ok
drop table t1

statement ok
drop table t2
drop table table_with_many_types
21 changes: 21 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4292,3 +4292,24 @@ query T
select * from table1 as t1 natural join table1_stringview as t2;
----
foo

query TT
EXPLAIN SELECT count(*)
FROM my_catalog.my_schema.table_with_many_types AS l
JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col = r.binary_col
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
02)--Projection:
03)----Inner Join: l.binary_col = r.binary_col
04)------SubqueryAlias: l
05)--------TableScan: my_catalog.my_schema.table_with_many_types projection=[binary_col]
06)------SubqueryAlias: r
07)--------TableScan: my_catalog.my_schema.table_with_many_types projection=[binary_col]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[count(*)]
02)--ProjectionExec: expr=[]
03)----CoalesceBatchesExec: target_batch_size=3
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)]
05)--------MemoryExec: partitions=1, partition_sizes=[1]
06)--------MemoryExec: partitions=1, partition_sizes=[1]

0 comments on commit 9fb5ff9

Please sign in to comment.