-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT]: sql cross join #3110
[FEAT]: sql cross join #3110
Conversation
CodSpeed Performance ReportMerging #3110 will not alter performanceComparing Summary
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this Cory! The logic largely looks sound but I'm not super happy about the quality of the DataFusion optimizer rule code. I had a good amount of comments but I'm happy to branch off of your changes and make the fixes myself.
fn rewrite_children( | ||
optimizer: &impl OptimizerRule, | ||
plan: Arc<LogicalPlan>, | ||
) -> DaftResult<Transformed<Arc<LogicalPlan>>> { | ||
plan.map_children(|input| optimizer.try_optimize(input)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling rewrite_children
to recurse down the plan tree, you can use TreeNode::transform_up
or TreeNode::transform_down
which will do it for you. Then, in try_optimize
we can match on the specific cases that we'd like this rule to apply to, which makes it more clear what plan nodes this rule affects.
match plan { | ||
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => { | ||
let keys = join.left_on.into_iter().zip(join.right_on); | ||
possible_join_keys.insert_all_owned(keys); | ||
flatten_join_inputs( | ||
Arc::unwrap_or_clone(join.left), | ||
possible_join_keys, | ||
all_inputs, | ||
)?; | ||
flatten_join_inputs( | ||
Arc::unwrap_or_clone(join.right), | ||
possible_join_keys, | ||
all_inputs, | ||
)?; | ||
} | ||
_ => { | ||
all_inputs.push(Arc::new(plan)); | ||
} | ||
}; | ||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also use TreeNode
methods to simplify code here
rights: &mut Vec<LogicalPlanRef>, | ||
possible_join_keys: &JoinKeySet, | ||
all_join_keys: &mut JoinKeySet, | ||
) -> DaftResult<LogicalPlanRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note, I don't believe it affects TPC-H, but I believe this function is actually somewhat brittle, since if a join key uses columns from more than two tables, it will not always optimize it, depending on the order in which it has seen the tables.
Consider this case:
filter
|
cross join
/ \
a cross join
/ \
b c
If the filter predicate was a.id + c.id = b.id
, then this optimizer rule would first look for a table with columns a.id
and c.id
and another one with b.id
. Not finding one, it would cross join a
and b
in the first call to find_inner_join
, and then cross join that with c
in the second call. However, it could have cross joined a
and c
and then inner joined that with b
.
If we wanted to only handle two table predicates, I believe my suggestion in Slack to do filter pushdown + simple single join node into filter optimization would also solve that.
all_inputs, | ||
)?; | ||
} | ||
_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We insert projects under joins when there are naming conflicts to rename columns, however we could possibly still flatten joins that have those (and other compute-less) projects between them.
/// Returns true if the plan is a Join or Cross join could be flattened with | ||
/// `flatten_join_inputs` | ||
/// | ||
/// Must stay in sync with `flatten_join_inputs` | ||
fn can_flatten_join_inputs(plan: &LogicalPlan) -> bool { | ||
// can only flatten inner / cross joins | ||
match plan { | ||
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {} | ||
_ => return false, | ||
}; | ||
|
||
for child in plan.children() { | ||
if matches!( | ||
child, | ||
LogicalPlan::Join(Join { | ||
join_strategy: None, | ||
join_type: JoinType::Inner, | ||
.. | ||
}) | ||
) && !can_flatten_join_inputs(child) | ||
{ | ||
return false; | ||
} | ||
} | ||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstrings for this function are not very clear, and based on the logic it basically just checks if the top level plan node is an inner join? Unless the match on the plan is not intended to check for join strategy, the matches!
for each child is redundant to the recursive call.
if let Expr::BinaryOp { left, op, right } = expr { | ||
match op { | ||
Operator::Eq => { | ||
// insert handles ensuring we don't add the same Join keys multiple times | ||
join_keys.insert(left, right); | ||
} | ||
Operator::And => { | ||
extract_possible_join_keys(left, join_keys); | ||
extract_possible_join_keys(right, join_keys); | ||
} | ||
// Fix for join predicates from inside of OR expr also pulled up properly. | ||
Operator::Or => { | ||
let mut left_join_keys = JoinKeySet::new(); | ||
let mut right_join_keys = JoinKeySet::new(); | ||
|
||
extract_possible_join_keys(left, &mut left_join_keys); | ||
extract_possible_join_keys(right, &mut right_join_keys); | ||
|
||
join_keys.insert_intersection(&left_join_keys, &right_join_keys); | ||
} | ||
_ => (), | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's similar logic to this in other parts of the code already, I believe the only difference is this does not pull out common expressions from two sides of an or, but if we want that, we should probably add it to the same place so other optimizer rules can take advantage as well.
Daft/src/daft-dsl/src/optimization.rs
Line 58 in 5228930
pub fn split_conjuction(expr: &ExprRef) -> Vec<&ExprRef> { |
pub fn can_hash(data_type: &DataType) -> bool { | ||
match data_type { | ||
DataType::Null => true, | ||
DataType::Boolean => true, | ||
DataType::Int8 => true, | ||
DataType::Int16 => true, | ||
DataType::Int32 => true, | ||
DataType::Int64 => true, | ||
DataType::UInt8 => true, | ||
DataType::UInt16 => true, | ||
DataType::UInt32 => true, | ||
DataType::UInt64 => true, | ||
DataType::Float32 => true, | ||
DataType::Float64 => true, | ||
DataType::Timestamp(time_unit, _) => match time_unit { | ||
TimeUnit::Seconds => true, | ||
TimeUnit::Milliseconds => true, | ||
TimeUnit::Microseconds => true, | ||
TimeUnit::Nanoseconds => true, | ||
}, | ||
DataType::Utf8 => true, | ||
|
||
DataType::Decimal128(_, _) => true, | ||
DataType::Date => true, | ||
|
||
DataType::FixedSizeBinary(_) => true, | ||
|
||
DataType::List(_) => true, | ||
|
||
DataType::FixedSizeList(_, _) => true, | ||
DataType::Struct(fields) => fields.iter().all(|f| can_hash(&f.dtype)), | ||
_ => false, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, I wonder what the behavior right now is if you specify a join key that is not hashable. Additionally, should this be a method for DataType
instead?
still todo:
Notes for reviewers:
This does not actually implement a physical cross join, but just implements the logical cross join as well as cross join to inner join optimization
eliminate_cross_join.rs
This treats an inner join with no join conditions as cross join. (inspired by a recent change in datafusion).
If the cross join can not be optimized away, an error will be raised when attempting to execute the plan.