Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join cardinality computation for cost-based nested join optimizations #3787

Merged
merged 3 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 149 additions & 1 deletion datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,17 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
#[cfg(test)]
mod tests {
use crate::{
physical_plan::{hash_join::PartitionMode, Statistics},
physical_plan::{
displayable, hash_join::PartitionMode, ColumnStatistics, Statistics,
},
test::exec::StatisticsExec,
};

use super::*;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;

fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
Expand All @@ -226,6 +229,75 @@ mod tests {
(big, small)
}

/// Create a column statistics vector for a single column
/// that has the given min/max/distinct_count properties.
///
/// Given min/max will be mapped to a [`ScalarValue`] if
/// they are not `None`.
fn create_column_stats(
min: Option<u64>,
max: Option<u64>,
distinct_count: Option<usize>,
) -> Option<Vec<ColumnStatistics>> {
Some(vec![ColumnStatistics {
distinct_count,
min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
..Default::default()
}])
}

/// Returns three plans with statistics of (min, max, distinct_count)
/// * big 100K rows @ (0, 50k, 50k)
/// * medium 10K rows @ (1k, 5k, 1k)
/// * small 1K rows @ (0, 100k, 1k)
fn create_nested_with_min_max() -> (
Arc<dyn ExecutionPlan>,
Arc<dyn ExecutionPlan>,
Arc<dyn ExecutionPlan>,
) {
let big = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(100_000),
column_statistics: create_column_stats(
Some(0),
Some(50_000),
Some(50_000),
),
..Default::default()
},
Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
));

let medium = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(10_000),
column_statistics: create_column_stats(
Some(1000),
Some(5000),
Some(1000),
),
..Default::default()
},
Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]),
));

let small = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(1000),
column_statistics: create_column_stats(
Some(0),
Some(100_000),
Some(1000),
),
..Default::default()
},
Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
));

(big, medium, small)
}

#[tokio::test]
async fn test_join_with_swap() {
let (big, small) = create_big_and_small();
Expand Down Expand Up @@ -274,6 +346,82 @@ mod tests {
);
}

/// Compare the input plan with the plan after running the probe order optimizer.
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines =
$EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();

let optimized = HashBuildProbeOrder::new()
.optimize(Arc::new($PLAN), &SessionConfig::new())
.unwrap();

let plan = displayable(optimized.as_ref()).indent().to_string();
let actual_lines = plan.split("\n").collect::<Vec<&str>>();

assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}

#[tokio::test]
async fn test_nested_join_swap() {
let (big, medium, small) = create_nested_with_min_max();

// Form the inner join: big JOIN small
let child_join = HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
vec![(
Column::new_with_schema("big_col", &big.schema()).unwrap(),
Column::new_with_schema("small_col", &small.schema()).unwrap(),
)],
None,
&JoinType::Inner,
PartitionMode::CollectLeft,
&false,
)
.unwrap();
let child_schema = child_join.schema();

// Form join tree `medium LEFT JOIN (big JOIN small)`
let join = HashJoinExec::try_new(
Arc::clone(&medium),
Arc::new(child_join),
vec![(
Column::new_with_schema("medium_col", &medium.schema()).unwrap(),
Column::new_with_schema("small_col", &child_schema).unwrap(),
)],
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
)
.unwrap();

// Hash join uses the left side to build the hash table, and right side to probe it. We want
// to keep left as small as possible, so if we can estimate (with a reasonable margin of error)
// that the left side is smaller than the right side, we should swap the sides.
//
// The first hash join's left is 'small' table (with 1000 rows), and the second hash join's
// left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which
// has an exact cardinality of 10_000 rows).
let expected = [
"ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]",
" HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
" ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", index: 0 })]",
" StatisticsExec: col_count=1, row_count=Some(1000)",
" StatisticsExec: col_count=1, row_count=Some(100000)",
" StatisticsExec: col_count=1, row_count=Some(10000)",
""
];
assert_optimized!(expected, join);
}

#[tokio::test]
async fn test_join_no_swap() {
let (big, small) = create_big_and_small();
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ use super::{
coalesce_partitions::CoalescePartitionsExec,
expressions::PhysicalSortExpr,
join_utils::{
build_join_schema, check_join_is_valid, ColumnIndex, JoinFilter, JoinOn, JoinSide,
build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex,
JoinFilter, JoinOn, JoinSide,
},
};
use super::{
Expand Down Expand Up @@ -385,7 +386,12 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
Statistics::default()
estimate_join_statistics(
self.left.clone(),
self.right.clone(),
self.on.clone(),
&self.join_type,
)
}
}

Expand Down
Loading