Skip to content

Commit

Permalink
Merge pull request #5423 from xudong963/more_join
Browse files Browse the repository at this point in the history
feat(planner): support `using` and `natural` for join
  • Loading branch information
BohuTANG authored May 17, 2022
2 parents 509c7ac + 75d93f8 commit 7bf61aa
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 39 deletions.
5 changes: 5 additions & 0 deletions query/src/sql/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub struct ColumnBinding {
/// Another example is aggregation. In a `GROUP BY` context, aggregate funtions
/// will be extracted and be added to `BindContext` as a `ColumnBinding`.
pub scalar: Option<Box<Scalar>>,

/// Consider the sql: `select * from t join t1 using(a)`.
/// The result should only contain one `a` column.
/// So we need make `t.a` or `t1.a` invisible in unqualified wildcard.
pub visible_in_unqualified_wildcard: bool,
}

/// `BindContext` stores all the free variables in a query and tracks the context of binding procedure.
Expand Down
169 changes: 130 additions & 39 deletions query/src/sql/planner/binder/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::sql::optimizer::ColumnSet;
use crate::sql::optimizer::SExpr;
use crate::sql::planner::binder::scalar::ScalarBinder;
use crate::sql::planner::binder::Binder;
use crate::sql::plans::BoundColumnRef;
use crate::sql::plans::FilterPlan;
use crate::sql::plans::LogicalInnerJoin;
use crate::sql::plans::Scalar;
Expand Down Expand Up @@ -63,11 +64,11 @@ impl<'a> Binder {
let mut left_join_conditions: Vec<Scalar> = vec![];
let mut right_join_conditions: Vec<Scalar> = vec![];
let mut other_conditions: Vec<Scalar> = vec![];
let join_condition_resolver = JoinConditionResolver::new(
let mut join_condition_resolver = JoinConditionResolver::new(
self.ctx.clone(),
&left_context,
&right_context,
&bind_context,
&mut bind_context,
&join.condition,
);
join_condition_resolver
Expand Down Expand Up @@ -163,7 +164,7 @@ struct JoinConditionResolver<'a> {

left_context: &'a BindContext,
right_context: &'a BindContext,
join_context: &'a BindContext,
join_context: &'a mut BindContext,
join_condition: &'a JoinCondition<'a>,
}

Expand All @@ -172,7 +173,7 @@ impl<'a> JoinConditionResolver<'a> {
ctx: Arc<QueryContext>,
left_context: &'a BindContext,
right_context: &'a BindContext,
join_context: &'a BindContext,
join_context: &'a mut BindContext,
join_condition: &'a JoinCondition<'a>,
) -> Self {
Self {
Expand All @@ -185,7 +186,7 @@ impl<'a> JoinConditionResolver<'a> {
}

pub async fn resolve(
&self,
&mut self,
left_join_conditions: &mut Vec<Scalar>,
right_join_conditions: &mut Vec<Scalar>,
other_join_conditions: &mut Vec<Scalar>,
Expand All @@ -200,11 +201,23 @@ impl<'a> JoinConditionResolver<'a> {
)
.await?;
}
JoinCondition::Using(_) => {
return Err(ErrorCode::UnImplement("USING clause is not supported yet. Please specify join condition with ON clause."));
JoinCondition::Using(identifiers) => {
let using_columns = identifiers
.iter()
.map(|ident| ident.name.clone())
.collect::<Vec<String>>();
self.resolve_using(using_columns, left_join_conditions, right_join_conditions)
.await?;
}
JoinCondition::Natural => {
return Err(ErrorCode::UnImplement("NATURAL JOIN is not supported yet. Please specify join condition with ON clause."));
// NATURAL is a shorthand form of USING: it forms a USING list consisting of all column names that appear in both input tables
// As with USING, these columns appear only once in the output table
// Todo(xudong963) If there are no common column names, NATURAL JOIN behaves like JOIN ... ON TRUE, producing a cross-product join.
let mut using_columns = vec![];
// Find common columns in both input tables
self.find_using_columns(&mut using_columns)?;
self.resolve_using(using_columns, left_join_conditions, right_join_conditions)
.await?
}
JoinCondition::None => {
return Err(ErrorCode::UnImplement("JOIN without condition is not supported yet. Please specify join condition with ON clause."));
Expand Down Expand Up @@ -252,44 +265,122 @@ impl<'a> JoinConditionResolver<'a> {
//
// Only equi-predicate can be exploited by common join algorithms(e.g. sort-merge join, hash join).
// For the predicates that aren't equi-predicate, we will lift them as a `Filter` operator.
if let Some((mut left, mut right)) = split_equivalent_predicate(predicate) {
let left_used_columns = left.used_columns();
let right_used_columns = right.used_columns();
let left_columns: ColumnSet = self.left_context.all_column_bindings().iter().fold(
ColumnSet::new(),
|mut acc, v| {
if let Some((left, right)) = split_equivalent_predicate(predicate) {
self.add_conditions(left, right, left_join_conditions, right_join_conditions)?;
} else {
other_join_conditions.push(predicate.clone());
}
Ok(())
}

async fn resolve_using(
&mut self,
using_columns: Vec<String>,
left_join_conditions: &mut Vec<Scalar>,
right_join_conditions: &mut Vec<Scalar>,
) -> Result<()> {
for join_key in using_columns.iter() {
let join_key_name = join_key.as_str();
let mut left_scalars = vec![];
for col_binding in self.left_context.columns.iter() {
if col_binding.column_name == join_key_name {
left_scalars.push(Scalar::BoundColumnRef(BoundColumnRef {
column: col_binding.clone(),
}));
}
}
if left_scalars.is_empty() {
return Err(ErrorCode::SemanticError(format!(
"column {} specified in USING clause does not exist in left table",
join_key_name
)));
}
assert_eq!(left_scalars.len(), 1);
let mut right_scalars = vec![];
for col_binding in self.right_context.columns.iter() {
if col_binding.column_name == join_key_name {
right_scalars.push(Scalar::BoundColumnRef(BoundColumnRef {
column: col_binding.clone(),
}));
}
}
if right_scalars.is_empty() {
return Err(ErrorCode::SemanticError(format!(
"column {} specified in USING clause does not exist in right table",
join_key_name
)));
}
assert_eq!(right_scalars.len(), 1);
for col_binding in self.join_context.columns.iter_mut() {
if col_binding.column_name == join_key_name {
col_binding.visible_in_unqualified_wildcard = false;
break;
}
}
self.add_conditions(
left_scalars[0].clone(),
right_scalars[0].clone(),
left_join_conditions,
right_join_conditions,
)?;
}
Ok(())
}

fn add_conditions(
&self,
mut left: Scalar,
mut right: Scalar,
left_join_conditions: &mut Vec<Scalar>,
right_join_conditions: &mut Vec<Scalar>,
) -> Result<()> {
let left_used_columns = left.used_columns();
let right_used_columns = right.used_columns();
let left_columns: ColumnSet =
self.left_context
.all_column_bindings()
.iter()
.fold(ColumnSet::new(), |mut acc, v| {
acc.insert(v.index);
acc
},
);
let right_columns: ColumnSet = self.right_context.all_column_bindings().iter().fold(
ColumnSet::new(),
|mut acc, v| {
});
let right_columns: ColumnSet =
self.right_context
.all_column_bindings()
.iter()
.fold(ColumnSet::new(), |mut acc, v| {
acc.insert(v.index);
acc
},
);
});

// Bump types of left conditions and right conditions
let left_type = left.data_type();
let right_type = right.data_type();
let least_super_type = merge_types(&left_type, &right_type)?;
left = wrap_cast_if_needed(left, &least_super_type);
right = wrap_cast_if_needed(right, &least_super_type);
// Bump types of left conditions and right conditions
let left_type = left.data_type();
let right_type = right.data_type();
let least_super_type = merge_types(&left_type, &right_type)?;
left = wrap_cast_if_needed(left, &least_super_type);
right = wrap_cast_if_needed(right, &least_super_type);

if left_used_columns.is_subset(&left_columns)
&& right_used_columns.is_subset(&right_columns)
{
left_join_conditions.push(left);
right_join_conditions.push(right);
} else if left_used_columns.is_subset(&right_columns)
&& right_used_columns.is_subset(&left_columns)
{
left_join_conditions.push(right);
right_join_conditions.push(left);
if left_used_columns.is_subset(&left_columns)
&& right_used_columns.is_subset(&right_columns)
{
left_join_conditions.push(left);
right_join_conditions.push(right);
} else if left_used_columns.is_subset(&right_columns)
&& right_used_columns.is_subset(&left_columns)
{
left_join_conditions.push(right);
right_join_conditions.push(left);
}
Ok(())
}

fn find_using_columns(&self, using_columns: &mut Vec<String>) -> Result<()> {
for left_column in self.left_context.all_column_bindings().iter() {
for right_column in self.right_context.all_column_bindings().iter() {
if left_column.column_name == right_column.column_name {
using_columns.push(left_column.column_name.clone());
}
}
} else {
other_join_conditions.push(predicate.clone());
}
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions query/src/sql/planner/binder/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ impl<'a> Binder {
// Expands wildcard star, for example we have a table `t(a INT, b INT)`:
// The query `SELECT * FROM t` will be expanded into `SELECT t.a, t.b FROM t`
for column_binding in input_context.all_column_bindings() {
if !column_binding.visible_in_unqualified_wildcard {
continue;
}
output_context.add_column_binding(column_binding.clone());
}
}
Expand Down Expand Up @@ -121,6 +124,7 @@ impl<'a> Binder {
index: column_ref.column.index,
data_type,
scalar: Some(Box::new(bound_expr.clone())),
visible_in_unqualified_wildcard: true,
},
_ => {
let index = self.metadata.add_column(
Expand All @@ -136,6 +140,7 @@ impl<'a> Binder {
index,
data_type,
scalar: Some(Box::new(bound_expr.clone())),
visible_in_unqualified_wildcard: true,
}
}
};
Expand Down
1 change: 1 addition & 0 deletions query/src/sql/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl<'a> Binder {
index: column.column_index,
data_type: column.data_type.clone(),
scalar: None,
visible_in_unqualified_wildcard: true,
};
bind_context.add_column_binding(column_binding);
}
Expand Down
9 changes: 9 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,12 @@ new_planner
0 8 5
====Context Function====
default
===Inner Join with Using===
4 3 4
6 5 6
3
5
4
6
4 3 4
6 5 6
15 changes: 15 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,19 @@ select '====Context Function====';
use default;
select database();

-- Inner join with using
select '===Inner Join with Using===';
drop table if exists t1;
create table t1(a int, b int);
insert into t1 values(7, 8), (3, 4), (5, 6);
drop table if exists t2;
create table t2(a int, d int);
insert into t2 values(1, 2), (3, 4), (5, 6);
select * from t1 join t2 using(a);
select t1.a from t1 join t2 using(a);
select t2.d from t1 join t2 using(a);
select * from t1 natural join t2;
drop table t1;
drop table t2;

set enable_planner_v2 = 0;

0 comments on commit 7bf61aa

Please sign in to comment.