Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join cardinality computation for cost-based nested join optimizations #3787

Merged
merged 3 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 134 additions & 1 deletion datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,17 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
#[cfg(test)]
mod tests {
use crate::{
physical_plan::{hash_join::PartitionMode, Statistics},
physical_plan::{
displayable, hash_join::PartitionMode, ColumnStatistics, Statistics,
},
test::exec::StatisticsExec,
};

use super::*;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;

fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
Expand All @@ -226,6 +229,66 @@ mod tests {
(big, small)
}

fn create_column_stats(
min: Option<u64>,
max: Option<u64>,
distinct_count: Option<usize>,
) -> Option<Vec<ColumnStatistics>> {
Some(vec![ColumnStatistics {
distinct_count,
min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
..Default::default()
}])
}

fn create_nested_with_min_max() -> (
Arc<dyn ExecutionPlan>,
Arc<dyn ExecutionPlan>,
Arc<dyn ExecutionPlan>,
) {
let big = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(100_000),
column_statistics: create_column_stats(
Some(0),
Some(50_000),
Some(50_000),
),
..Default::default()
},
Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
));

let medium = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(10_000),
column_statistics: create_column_stats(
Some(1000),
Some(5000),
Some(1000),
),
..Default::default()
},
Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]),
));

let small = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(1000),
column_statistics: create_column_stats(
Some(0),
Some(100_000),
Some(1000),
),
..Default::default()
},
Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
));

(big, medium, small)
}

#[tokio::test]
async fn test_join_with_swap() {
let (big, small) = create_big_and_small();
Expand Down Expand Up @@ -274,6 +337,76 @@ mod tests {
);
}

/// Compare the input plan with the plan after running the probe order optimizer.
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines =
$EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();

let optimized = HashBuildProbeOrder::new()
.optimize(Arc::new($PLAN), &SessionConfig::new())
.unwrap();

let plan = displayable(optimized.as_ref()).indent().to_string();
let actual_lines = plan.split("\n").collect::<Vec<&str>>();

assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}

#[tokio::test]
async fn test_nested_join_swap() {
let (big, medium, small) = create_nested_with_min_max();

let child_join = HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
vec![(
Column::new_with_schema("big_col", &big.schema()).unwrap(),
Column::new_with_schema("small_col", &small.schema()).unwrap(),
)],
None,
&JoinType::Inner,
PartitionMode::CollectLeft,
&false,
)
.unwrap();
let child_schema = child_join.schema();

let join = HashJoinExec::try_new(
Arc::clone(&medium),
Arc::new(child_join),
vec![(
Column::new_with_schema("medium_col", &medium.schema()).unwrap(),
Column::new_with_schema("small_col", &child_schema).unwrap(),
)],
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
)
.unwrap();

// The first hash join's left is 'small' table (with 1000 rows), and the second hash join's
// left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which
// has an exact cardinality of 10_000 rows).
let expected = [
"ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]",
" HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
" ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", index: 0 })]",
" StatisticsExec: col_count=1, row_count=Some(1000)",
" StatisticsExec: col_count=1, row_count=Some(100000)",
" StatisticsExec: col_count=1, row_count=Some(10000)",
""
];
assert_optimized!(expected, join);
}

#[tokio::test]
async fn test_join_no_swap() {
let (big, small) = create_big_and_small();
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ use super::{
coalesce_partitions::CoalescePartitionsExec,
expressions::PhysicalSortExpr,
join_utils::{
build_join_schema, check_join_is_valid, ColumnIndex, JoinFilter, JoinOn, JoinSide,
build_join_schema, check_join_is_valid, join_statistics, ColumnIndex, JoinFilter,
JoinOn, JoinSide,
},
};
use super::{
Expand Down Expand Up @@ -385,7 +386,12 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
Statistics::default()
join_statistics(
self.left.clone(),
self.right.clone(),
self.on.clone(),
&self.join_type,
)
}
}

Expand Down
Loading