Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]:fast check has column #5328

Merged
merged 10 commits into from
Mar 3, 2023
41 changes: 41 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,21 @@ impl DFSchema {
.collect()
}

/// Find all fields match the given qualified name
pub fn fields_with_qualified_name(
&self,
qualifier: &str,
name: &str,
) -> Vec<&DFField> {
self.fields
.iter()
.filter(|field| {
field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)
&& field.name() == name
})
.collect()
}

/// Find all fields match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
self.fields
Expand Down Expand Up @@ -314,6 +329,32 @@ impl DFSchema {
}
}

/// Find if the field exists with the given name
pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
let matches = self.fields_with_unqualified_name(name);
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
match matches.len() {
1 => true,
_ => false,
}
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Find if the field exists with the given qualified name
pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) -> bool {
let matches = self.fields_with_qualified_name(qualifier, name);
match matches.len() {
1 => true,
_ => false,
}
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Find if the field exists with the given qualified column
pub fn has_column(&self, column: &Column) -> bool {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
match &column.relation {
Some(r) => self.has_column_with_qualified_name(r, &column.name),
None => self.has_column_with_unqualified_name(&column.name),
}
}

/// Check to see if unqualified field names matches field names in Arrow schema
pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
self.fields
Expand Down
13 changes: 5 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,7 @@ impl LogicalPlanBuilder {
input,
mut expr,
schema: _,
}) if missing_cols
.iter()
.all(|c| input.schema().field_from_column(c).is_ok()) =>
{
}) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
let mut missing_exprs = missing_cols
.iter()
.map(|c| normalize_col(Expr::Column(c.clone()), &input))
Expand Down Expand Up @@ -656,13 +653,13 @@ impl LogicalPlanBuilder {
let mut join_on: Vec<(Expr, Expr)> = vec![];
let mut filters: Option<Expr> = None;
for (l, r) in &on {
if self.plan.schema().field_from_column(l).is_ok()
&& right.schema().field_from_column(r).is_ok()
if self.plan.schema().has_column(l)
&& right.schema().has_column(r)
&& can_hash(self.plan.schema().field_from_column(l)?.data_type())
{
join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
} else if self.plan.schema().field_from_column(r).is_ok()
&& right.schema().field_from_column(l).is_ok()
} else if self.plan.schema().has_column(l)
&& right.schema().has_column(r)
&& can_hash(self.plan.schema().field_from_column(r)?.data_type())
{
join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ fn optimize_where_in(
let using_cols: Vec<Column> = expr
.to_columns()?
.into_iter()
.filter(|col| input_schema.field_from_column(col).is_ok())
.filter(|col| input_schema.has_column(col))
.collect::<_>();

cols.extend(using_cols);
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ impl OptimizerRule for EliminateOuterJoin {
let mut left_non_nullable = false;
let mut right_non_nullable = false;
for col in non_nullable_cols.iter() {
if join.left.schema().field_from_column(col).is_ok() {
if join.left.schema().has_column(col) {
left_non_nullable = true;
}
if join.right.schema().field_from_column(col).is_ok() {
if join.right.schema().has_column(col) {
right_non_nullable = true;
}
}
Expand Down Expand Up @@ -251,10 +251,10 @@ fn extract_non_nullable_columns(
{
for left_col in &left_non_nullable_cols {
for right_col in &right_non_nullable_cols {
if (left_schema.field_from_column(left_col).is_ok()
&& left_schema.field_from_column(right_col).is_ok())
|| (right_schema.field_from_column(left_col).is_ok()
&& right_schema.field_from_column(right_col).is_ok())
if (left_schema.has_column(left_col)
&& left_schema.has_column(right_col))
|| (right_schema.has_column(left_col)
&& right_schema.has_column(right_col))
{
non_nullable_cols.push(left_col.clone());
break;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.try_for_each::<_, Result<()>>(|expr| {
let columns = expr.to_columns()?;

columns.into_iter().for_each(|c| {
if schema.field_from_column(&c).is_err() {
columns.into_iter().try_for_each::<_, Result<()>>(|c| {
if !schema.has_column(&c) {
missing_cols.push(c);
}
});
Ok(())
})?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                columns.into_iter().for_each(|c| {
                    if !schema.has_column(&c) {
                        missing_cols.push(c);
                    }
                });


Ok(())
})?;
Expand Down