-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix join column handling logic for On
and Using
constraints
#605
Conversation
use crate::physical_plan::expressions::Column; | ||
|
||
/// All valid types of joins. | ||
#[derive(Clone, Copy, Debug, Eq, PartialEq)] | ||
pub enum JoinType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse the same enum from logical plane.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @houqp -- I think this PR is an improvement.
Also, I did some more testing and there are still cases that don't appear to be working (which I think would be fine to do as a follow on PR) -- they no longer panic :)
echo "1" > /tmp/foo.csv
cargo run -p datafusion-cli
>
CREATE EXTERNAL TABLE foo(bar int)
STORED AS CSV
LOCATION '/tmp/foo.csv';
0 rows in set. Query took 0.000 seconds.
> select * from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
Plan("Schema contains duplicate unqualified field name 'bar'")
> select f1.bar, f2.bar from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
Plan("Schema contains duplicate unqualified field name 'bar'")
> select f1.bar, f2.bar from foo as f1 JOIN foo as f2 USING(bar);
NotImplemented("Unsupported compound identifier '[\"f1\", \"bar\"]'")
> select * from foo as f1 JOIN foo as f2 USING(bar);
+-----+
| bar |
+-----+
| 1 |
+-----+
@@ -1259,6 +1259,96 @@ mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn left_join_using() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 definitely an improvement
The approach we seem to be taking is to try and keep the information about where a column came from in a join -- e.g. that a output DF field could be referred to as either It seems like a core challenge is in the semantics of the So for example, in a |
Good catch on the regressions, I will get them fixed tonight with unit tests.
Yeah, that's exactly the problem. Initially, I tried the approach of keeping the join columns from both relations (i.e. |
is this something we can change (aka update the using join to produce columns and then rely on projection pushdown to prune the uneeded ones out?) |
That's a good point, the single column output semantic doesn't need to be enforced at the join node level. let me give this a try as well 👍 This could simplify our join handling logic. I will also double check to see if this would result in nontrivial runtime overhead for us. |
@alamb, I wasn't able to reproduce the errors you showed in #605 (review) Here is what I got:
Perhaps you rain those tests with a different build? |
I am not sure what I tested with, but I re-ran the tests at b8da356 and everything is working -- sorry for the noise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this PR is better than master (aka it doesn't panic) I would be fine merging it in as is and iterating on the design subsequently. Alternately, if you plan to make more changes as part of this PR @houqp I can wait to merge it in
let's wait for my alternative solutions and review them together before the merge unless there is urgent issue in master that this PR addresses. I would like to avoid merging in a premature abstraction to reduce noise :) |
Marking as draft so it is clearer from the list of PRs that this one is not quite ready to go (and thus I don't merge it accidentally) |
get rid of shared field and move column expansion logic into plan builder and optimizer.
protobuf::JoinType::Full => JoinType::Full, | ||
protobuf::JoinType::Semi => JoinType::Semi, | ||
protobuf::JoinType::Anti => JoinType::Anti, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conversion handled by shared method in serde/mod.rs
} | ||
}; | ||
|
||
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Join schemas are now consistent across all join types and constraints. We implement the join column "merge" semantic externally in plan builder and optimizer.
if field.qualifier() == col.relation.as_ref() && field.name() == &col.name { | ||
return Ok(i); | ||
} | ||
fn index_of_column_by_name( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this code into its own helper method to reduce duplicated code between field_with_qualified_name
and index_of_column
. This should also make index_of_column
more robust.
@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr { | |||
} | |||
} | |||
|
|||
/// Recursively replace all Column expressions in a given expression tree with Column expressions | |||
/// provided by the hash map argument. | |||
pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used in predicate push down optimizer to push predicates to both sides of the join clause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use pub(crate)
?
pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> { | ||
struct ColumnNormalizer<'a, 'b> { | ||
schemas: &'a [&'b DFSchemaRef], | ||
pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change from schema to logical plan so we can extract join columns for join clauses with using constraints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note I wrote some tests that will need to be adjusted in #689 but that is no big deal
on: JoinOnRef, | ||
join_type: &JoinType, | ||
) -> Schema { | ||
pub fn build_join_schema(left: &Schema, right: &Schema, join_type: &JoinType) -> Schema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same schema build simplification as the one we introduced in logical plan builder.
@@ -560,7 +560,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |||
// SELECT c1 AS m FROM t HAVING c1 > 10; | |||
// SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING MAX(c2) > 10; | |||
// | |||
resolve_aliases_to_exprs(&having_expr, &alias_map) | |||
let having_expr = resolve_aliases_to_exprs(&having_expr, &alias_map)?; | |||
normalize_col(having_expr, &projected_plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed if having expression referenced using join columns with unqualified column name.
@alamb reimplemented the logic based on your suggestion in #605 (comment). Turns out there are many more edge-cases that need to be handled for using join other than wildcard expansion:
I have implemented support for all these edge-cases, but decided to leave out the wildcard expansion change as a follow up PR to keep the diff easier to review. UPDATE: filed #678 as follow up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks @houqp -- I'll try and review this later today but I may run out of time in which case I'll get it done tomorrow |
// field to lookup is qualified but current field is unqualified. | ||
(Some(_), None) => false, | ||
// field to lookup is unqualified, no need to compare qualifier | ||
_ => field.name() == name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to write out cases, (None, None)
and (None, Some(_)
and union them. this makes it clearer?
for schema in &schemas { | ||
let fields = schema.fields_with_unqualified_name(&self.name); | ||
match fields.len() { | ||
0 => continue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would this be possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are iterating through schemas from all plan nodes in the provided plan tree, each plan node could have different schemas, so when we do the fields_with_unqualified_name
look up, some of these plan nodes will no contain a field that matches self.name
. We just pick the first plan node that contains schema field matches the unqualified name.
on.iter() | ||
.map(|entry| { | ||
std::iter::once(entry.0.clone()) | ||
.chain(std::iter::once(entry.1.clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just flatmap with a vec![entry.0.clone(), entry.1.clone()]
- it's cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wanted to avoid an extra memory allocation incurred by Vec::new
, but i will double check to see if once chain is actually generating the optimal code without memory allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jimexist sent #704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.
.map(|f| { | ||
std::iter::once(f.qualified_column()) | ||
// we need to push down filter using unqualified column as well | ||
.chain(std::iter::once(f.unqualified_column())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.collect::<HashSet<_>>(); | ||
let right_columns = &right | ||
.fields() | ||
.iter() | ||
.map(|f| f.qualified_column()) | ||
.map(|f| { | ||
std::iter::once(f.qualified_column()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good @houqp -- I went through the test changes and code carefully and I think this is good to go.
@Dandandan I am not sure if you want to take a look at this given it changes how the Join relations are encoded.
}) | ||
.map(|(idx, _)| idx) | ||
.collect(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it probably doesn't matter but you could avoid the Vec allocation by something like:
let matches = self....;
match matches.next() {
let name = matches.next() {
None => // error about no field
Some(name) {
if matches.next().is_some() => // error about ambiguous reference
else name
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, fixed in #703.
pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> { | ||
struct ColumnNormalizer<'a, 'b> { | ||
schemas: &'a [&'b DFSchemaRef], | ||
pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note I wrote some tests that will need to be adjusted in #689 but that is no big deal
@@ -354,6 +356,43 @@ impl LogicalPlan { | |||
| LogicalPlan::CreateExternalTable { .. } => vec![], | |||
} | |||
} | |||
|
|||
/// returns all `Using` join columns in a logical plan | |||
pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this function feels like it might better belong in some sort of utils rather than a method on LogicalPlan
-- perhaps https://github.com/houqp/arrow-datafusion/blob/qp_join/datafusion/src/optimizer/utils.rs#L50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb this is used outside of the optimizers as well, for example, the logical plan builder. With that context, do you still think it should live in optimizer utils module?
@@ -232,6 +241,38 @@ fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { | |||
} | |||
} | |||
|
|||
fn optimize_join( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
@@ -901,20 +979,61 @@ mod tests { | |||
format!("{:?}", plan), | |||
"\ | |||
Filter: #test.a LtEq Int64(1)\ | |||
\n Join: #test.a = #test.a\ | |||
\n Join: #test.a = #test2.a\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
"| 1 | 7 | 10 | 4 | 70 |", | ||
"| 2 | 8 | 20 | 5 | 80 |", | ||
"+----+----+----+----+----+", | ||
"+----+----+----+----+----+----+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these changes make sense to me
Not sure if this will conflict with #660 |
I am going to merge this PR in (as it conflicts with #689) -- I think we can make additional improvements as follow on PRs. |
Thanks again @houqp |
I also merge |
Aaand it looks like I forgot to fetch |
Apologize for being too busy last week and haven't had the time to update my PR, I am going to send a new PR to address all the feedbacks shortly. |
Which issue does this PR close?
Follow up for #55 (review).
Closes #601.
Closes #671.
Also fixed a bug where
index_of_column
won't error out on duplicated fields.Rationale for this change
In MySQL and Postgres, Join with
On
constraints produces output with join column from both relations preserved. For example:produces:
While join with
Using
constraints deduplicates the join column:produces:
However, in our current implementation, join column dedup is applied in all cases. This PR changes the behavior so it's consistent with MySQL and Postgres.
Here comes the annoying part.
Note that for join with
Using
constraint, users can still project join columns using relations from both sides. For exampleSELECT t1.id, t2.id FROM test t1 JOIN test t2 USING (id)
produces the same output asSELECT * FROM test t1 JOIN test t2 ON t1.id = t2.id
.This means for
Using
joins, we need to model a join column as a single shared column between both relations. CurrentDFField
struct only allows a field/column to have a single qualifier, so I ended adding a newshared_qualifiers
field toDFField
struct to handle this edge-case. Our logical plan builder will be responsible for setting this field when building join queries with using constraints. During query optimization and planning, theshared_qualifiers
field is used to look up column by name and qualifier.Other alternatives include changing the
qualifer
field ofDFField
to an option of enum to account for single qualifier and shared qualifiers. None of these approaches felt elegant to me. I am curious if anyone has ideas or suggestions on how to better implement this behavior.What changes are included in this PR?
On
andUsing
joinsindex_of_column_by_name
inboth index_of_column
andfield_with_qualified_name
methods.Are there any user-facing changes?
Join with
On
constraints will now output joined columns from both relations without deduplication. Dedup is only applied to join withUsing
constraints.