Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]:fast check has column #5328

Merged
merged 10 commits into from
Mar 3, 2023
21 changes: 21 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,27 @@ impl DFSchema {
}
}

/// Find if the field exists with the given name
pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
self.fields().iter().any(|field| field.name() == name)
}

/// Find if the field exists with the given qualified name
pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) -> bool {
self.fields().iter().any(|field| {
field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)
&& field.name() == name
})
}

/// Find if the field exists with the given qualified column
pub fn has_column(&self, column: &Column) -> bool {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
match &column.relation {
Some(r) => self.has_column_with_qualified_name(r, &column.name),
None => self.has_column_with_unqualified_name(&column.name),
}
}

/// Check to see if unqualified field names matches field names in Arrow schema
pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
self.fields
Expand Down
13 changes: 5 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,7 @@ impl LogicalPlanBuilder {
input,
mut expr,
schema: _,
}) if missing_cols
.iter()
.all(|c| input.schema().field_from_column(c).is_ok()) =>
{
}) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
let mut missing_exprs = missing_cols
.iter()
.map(|c| normalize_col(Expr::Column(c.clone()), &input))
Expand Down Expand Up @@ -723,13 +720,13 @@ impl LogicalPlanBuilder {
let mut join_on: Vec<(Expr, Expr)> = vec![];
let mut filters: Option<Expr> = None;
for (l, r) in &on {
if self.plan.schema().field_from_column(l).is_ok()
&& right.schema().field_from_column(r).is_ok()
if self.plan.schema().has_column(l)
&& right.schema().has_column(r)
&& can_hash(self.plan.schema().field_from_column(l)?.data_type())
{
join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
} else if self.plan.schema().field_from_column(r).is_ok()
&& right.schema().field_from_column(l).is_ok()
} else if self.plan.schema().has_column(l)
&& right.schema().has_column(r)
&& can_hash(self.plan.schema().field_from_column(r)?.data_type())
{
join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ fn optimize_exists(
let using_cols: Vec<Column> = expr
.to_columns()?
.into_iter()
.filter(|col| input_schema.field_from_column(col).is_ok())
.filter(|col| input_schema.has_column(col))
.collect::<_>();

cols.extend(using_cols);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn optimize_where_in(
let using_cols: Vec<Column> = expr
.to_columns()?
.into_iter()
.filter(|col| input_schema.field_from_column(col).is_ok())
.filter(|col| input_schema.has_column(col))
.collect::<_>();

cols.extend(using_cols);
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ impl OptimizerRule for EliminateOuterJoin {
let mut left_non_nullable = false;
let mut right_non_nullable = false;
for col in non_nullable_cols.iter() {
if join.left.schema().field_from_column(col).is_ok() {
if join.left.schema().has_column(col) {
left_non_nullable = true;
}
if join.right.schema().field_from_column(col).is_ok() {
if join.right.schema().has_column(col) {
right_non_nullable = true;
}
}
Expand Down Expand Up @@ -251,10 +251,10 @@ fn extract_non_nullable_columns(
{
for left_col in &left_non_nullable_cols {
for right_col in &right_non_nullable_cols {
if (left_schema.field_from_column(left_col).is_ok()
&& left_schema.field_from_column(right_col).is_ok())
|| (right_schema.field_from_column(left_col).is_ok()
&& right_schema.field_from_column(right_col).is_ok())
if (left_schema.has_column(left_col)
&& left_schema.has_column(right_col))
|| (right_schema.has_column(left_col)
&& right_schema.has_column(right_col))
{
non_nullable_cols.push(left_col.clone());
break;
Expand Down