Skip to content

Commit

Permalink
Add hash_join_single_partition_threshold_rows config
Browse files Browse the repository at this point in the history
  • Loading branch information
maruschin committed Jan 2, 2024
1 parent bf0a39a commit d9c9d34
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 46 deletions.
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ config_namespace! {
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

/// The maximum estimated size in rows for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128

/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
Expand Down
115 changes: 69 additions & 46 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ fn should_swap_join_order(
fn supports_collect_by_size(
plan: &dyn ExecutionPlan,
collection_size_threshold: usize,
collection_size_threshold_rows: usize,
) -> bool {
// Currently we do not trust the 0 value from stats, due to stats collection might have bug
// TODO check the logic in datasource::get_statistics_with_limit()
Expand All @@ -99,7 +100,7 @@ fn supports_collect_by_size(
if let Some(size) = stats.total_byte_size.get_value() {
*size != 0 && *size < collection_size_threshold
} else if let Some(row_count) = stats.num_rows.get_value() {
*row_count != 0 && *row_count < collection_size_threshold
*row_count != 0 && *row_count < collection_size_threshold_rows
} else {
false
}
Expand Down Expand Up @@ -251,8 +252,14 @@ impl PhysicalOptimizerRule for JoinSelection {
// side is the small side.
let config = &config.optimizer;
let collect_left_threshold = config.hash_join_single_partition_threshold;
let collect_left_threshold_rows =
config.hash_join_single_partition_threshold_rows;
state.plan.transform_up(&|plan| {
statistical_join_selection_subrule(plan, collect_left_threshold)
statistical_join_selection_subrule(
plan,
collect_left_threshold,
collect_left_threshold_rows,
)
})
}

Expand All @@ -269,16 +276,18 @@ impl PhysicalOptimizerRule for JoinSelection {
///
/// This function will first consider the given join type and check whether the
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
/// When the `collect_threshold` is provided, this function will also check left
/// and right sizes.
/// When the `ignore_threshold` is false, this function will also check left
/// and right sizes in bytes or rows.
///
/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`.
/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft`
/// mode as is, but it can do so by changing the join type to [`JoinType::Right`]
/// and [`JoinType::RightAnti`], respectively.
fn try_collect_left(
hash_join: &HashJoinExec,
collect_threshold: Option<usize>,
ignore_threshold: bool,
collect_threshold: usize,
collect_threshold_rows: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let left = hash_join.left();
let right = hash_join.right();
Expand All @@ -290,19 +299,29 @@ fn try_collect_left(
| JoinType::LeftSemi
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => collect_threshold.map_or(true, |threshold| {
supports_collect_by_size(&**left, threshold)
}),
| JoinType::RightAnti => {
ignore_threshold
|| supports_collect_by_size(
&**right,
collect_threshold,
collect_threshold_rows,
)
}
};
let right_can_collect = match join_type {
JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
JoinType::Inner
| JoinType::RightSemi
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| {
supports_collect_by_size(&**right, threshold)
}),
| JoinType::LeftAnti => {
ignore_threshold
|| supports_collect_by_size(
&**right,
collect_threshold,
collect_threshold_rows,
)
}
};
match (left_can_collect, right_can_collect) {
(true, true) => {
Expand Down Expand Up @@ -366,51 +385,55 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
fn statistical_join_selection_subrule(
plan: Arc<dyn ExecutionPlan>,
collect_left_threshold: usize,
collect_left_threshold_rows: usize,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let transformed = if let Some(hash_join) =
plan.as_any().downcast_ref::<HashJoinExec>()
{
match hash_join.partition_mode() {
PartitionMode::Auto => {
try_collect_left(hash_join, Some(collect_left_threshold))?.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
let transformed =
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
match hash_join.partition_mode() {
PartitionMode::Auto => try_collect_left(
hash_join,
false,
collect_left_threshold,
collect_left_threshold_rows,
)?
}
PartitionMode::CollectLeft => try_collect_left(hash_join, None)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::Partitioned => {
let left = hash_join.left();
let right = hash_join.right();
if should_swap_join_order(&**left, &**right)?
&& supports_swap(*hash_join.join_type())
{
swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)?
} else {
None
PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::Partitioned => {
let left = hash_join.left();
let right = hash_join.right();
if should_swap_join_order(&**left, &**right)?
&& supports_swap(*hash_join.join_type())
{
swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)?
} else {
None
}
}
}
}
} else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
let left = cross_join.left();
let right = cross_join.right();
if should_swap_join_order(&**left, &**right)? {
let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
let proj: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
swap_reverting_projection(&left.schema(), &right.schema()),
Arc::new(new_join),
)?);
Some(proj)
} else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
let left = cross_join.left();
let right = cross_join.right();
if should_swap_join_order(&**left, &**right)? {
let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
let proj: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
swap_reverting_projection(&left.schema(), &right.schema()),
Arc::new(new_join),
)?);
Some(proj)
} else {
None
}
} else {
None
}
} else {
None
};
};

Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
Expand Down

0 comments on commit d9c9d34

Please sign in to comment.