From 44d1803cfdb2015df0cb93529ed1a442a57b2876 Mon Sep 17 00:00:00 2001 From: suxiaogang Date: Sat, 18 Feb 2023 01:51:51 +0800 Subject: [PATCH 1/9] impl has_column --- datafusion/common/src/dfschema.rs | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 982459ac658b..baf5230d18d1 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -314,6 +314,39 @@ impl DFSchema { } } + pub fn has_column_with_unqualified_name(&self, name: &str) -> Result { + let matches = self.fields_with_unqualified_name(name); + match matches.len() { + 0 => Ok(false), + 1 => Ok(true), + _ => Err(DataFusionError::SchemaError( + SchemaError::AmbiguousReference { + qualifier: None, + name: name.to_string(), + }, + )), + } + } + + pub fn has_column_with_qualified_name( + &self, + qualifier: &str, + name: &str, + ) -> Result { + let res = self.index_of_column_by_name(Some(qualifier), name)?; + match res { + Some(_) => Ok(true), + None => Ok(false), + } + } + + pub fn has_column(&self, column: &Column) -> Result { + match &column.relation { + Some(r) => self.has_column_with_qualified_name(r, &column.name), + None => self.has_column_with_unqualified_name(&column.name), + } + } + /// Check to see if unqualified field names matches field names in Arrow schema pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool { self.fields From 1932531796fb1bd5c1ee144a1ee933965956f19c Mon Sep 17 00:00:00 2001 From: suxiaogang Date: Sat, 18 Feb 2023 02:26:34 +0800 Subject: [PATCH 2/9] replace is_err by has_column --- datafusion/expr/src/logical_plan/builder.rs | 12 +++++++----- datafusion/optimizer/src/decorrelate_where_in.rs | 4 +++- datafusion/optimizer/src/eliminate_outer_join.rs | 12 ++++++------ datafusion/sql/src/query.rs | 7 ++++--- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f979e1f76f98..ffd07fe65c93 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -351,7 +351,9 @@ impl LogicalPlanBuilder { schema: _, }) if missing_cols .iter() - .all(|c| input.schema().field_from_column(c).is_ok()) => + // TODO: how to return err in closure, maybe there is hethod like try_all in iter? + // or use 'for loop' instead of all + .all(|c| input.schema().has_column(c).unwrap()) => { let mut missing_exprs = missing_cols .iter() @@ -656,13 +658,13 @@ impl LogicalPlanBuilder { let mut join_on: Vec<(Expr, Expr)> = vec![]; let mut filters: Option = None; for (l, r) in &on { - if self.plan.schema().field_from_column(l).is_ok() - && right.schema().field_from_column(r).is_ok() + if self.plan.schema().has_column(l)? + && right.schema().has_column(r)? && can_hash(self.plan.schema().field_from_column(l)?.data_type()) { join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone()))); - } else if self.plan.schema().field_from_column(r).is_ok() - && right.schema().field_from_column(l).is_ok() + } else if self.plan.schema().has_column(l)? + && right.schema().has_column(r)? && can_hash(self.plan.schema().field_from_column(r)?.data_type()) { join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone()))); diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index 7a9a75ff45bb..217a7305d1e9 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -167,7 +167,9 @@ fn optimize_where_in( let using_cols: Vec = expr .to_columns()? .into_iter() - .filter(|col| input_schema.field_from_column(col).is_ok()) + // TODO: how to return err in closure, maybe there is method like try_filter in iter? + // or use 'for loop' instead of filter + .filter(|col| input_schema.has_column(col).unwrap()) .collect::<_>(); cols.extend(using_cols); diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 6a7914034bd1..0a1ffbc31c6d 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -84,10 +84,10 @@ impl OptimizerRule for EliminateOuterJoin { let mut left_non_nullable = false; let mut right_non_nullable = false; for col in non_nullable_cols.iter() { - if join.left.schema().field_from_column(col).is_ok() { + if join.left.schema().has_column(col)? { left_non_nullable = true; } - if join.right.schema().field_from_column(col).is_ok() { + if join.right.schema().has_column(col)? { right_non_nullable = true; } } @@ -251,10 +251,10 @@ fn extract_non_nullable_columns( { for left_col in &left_non_nullable_cols { for right_col in &right_non_nullable_cols { - if (left_schema.field_from_column(left_col).is_ok() - && left_schema.field_from_column(right_col).is_ok()) - || (right_schema.field_from_column(left_col).is_ok() - && right_schema.field_from_column(right_col).is_ok()) + if (left_schema.has_column(left_col)? + && left_schema.has_column(right_col)?) + || (right_schema.has_column(left_col)? + && right_schema.has_column(right_col)?) { non_nullable_cols.push(left_col.clone()); break; diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index c59c42e93c0d..83cc8b2edde3 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -177,11 +177,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .try_for_each::<_, Result<()>>(|expr| { let columns = expr.to_columns()?; - columns.into_iter().for_each(|c| { - if schema.field_from_column(&c).is_err() { + columns.into_iter().try_for_each::<_, Result<()>>(|c| { + if !schema.has_column(&c)? { missing_cols.push(c); } - }); + Ok(()) + })?; Ok(()) })?; From 301e9aa3d5ccd7705396365d3a812d99439ce026 Mon Sep 17 00:00:00 2001 From: suxiaogang Date: Sat, 18 Feb 2023 12:33:03 +0800 Subject: [PATCH 3/9] add comment --- datafusion/common/src/dfschema.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index baf5230d18d1..36f870c64948 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -314,6 +314,7 @@ impl DFSchema { } } + /// Find if the field exists with the given name pub fn has_column_with_unqualified_name(&self, name: &str) -> Result { let matches = self.fields_with_unqualified_name(name); match matches.len() { @@ -328,6 +329,7 @@ impl DFSchema { } } + /// Find if the field exists with the given qualified name pub fn has_column_with_qualified_name( &self, qualifier: &str, @@ -340,6 +342,7 @@ impl DFSchema { } } + /// Find if the field exists with the given qualified column pub fn has_column(&self, column: &Column) -> Result { match &column.relation { Some(r) => self.has_column_with_qualified_name(r, &column.name), From 9c77b50c4bb2b409e1aedb49064821e9eb8c53e1 Mon Sep 17 00:00:00 2001 From: suxiaogang Date: Sun, 19 Feb 2023 01:31:23 +0800 Subject: [PATCH 4/9] avoid str allocate in Err, error should not be returned --- datafusion/common/src/dfschema.rs | 43 +++++++++++-------- datafusion/expr/src/logical_plan/builder.rs | 15 +++---- .../optimizer/src/decorrelate_where_in.rs | 4 +- .../optimizer/src/eliminate_outer_join.rs | 12 +++--- datafusion/sql/src/query.rs | 2 +- 5 files changed, 37 insertions(+), 39 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 36f870c64948..e828a27494ba 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -270,6 +270,21 @@ impl DFSchema { .collect() } + /// Find all fields match the given qualified name + pub fn fields_with_qualified_name( + &self, + qualifier: &str, + name: &str, + ) -> Vec<&DFField> { + self.fields + .iter() + .filter(|field| { + field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false) + && field.name() == name + }) + .collect() + } + /// Find all fields match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { self.fields @@ -315,35 +330,25 @@ impl DFSchema { } /// Find if the field exists with the given name - pub fn has_column_with_unqualified_name(&self, name: &str) -> Result { + pub fn has_column_with_unqualified_name(&self, name: &str) -> bool { let matches = self.fields_with_unqualified_name(name); match matches.len() { - 0 => Ok(false), - 1 => Ok(true), - _ => Err(DataFusionError::SchemaError( - SchemaError::AmbiguousReference { - qualifier: None, - name: name.to_string(), - }, - )), + 1 => true, + _ => false, } } /// Find if the field exists with the given qualified name - pub fn has_column_with_qualified_name( - &self, - qualifier: &str, - name: &str, - ) -> Result { - let res = self.index_of_column_by_name(Some(qualifier), name)?; - match res { - Some(_) => Ok(true), - None => Ok(false), + pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) -> bool { + let matches = self.fields_with_qualified_name(qualifier, name); + match matches.len() { + 1 => true, + _ => false, } } /// Find if the field exists with the given qualified column - pub fn has_column(&self, column: &Column) -> Result { + pub fn has_column(&self, column: &Column) -> bool { match &column.relation { Some(r) => self.has_column_with_qualified_name(r, &column.name), None => self.has_column_with_unqualified_name(&column.name), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ffd07fe65c93..90d8d5108c02 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -349,12 +349,7 @@ impl LogicalPlanBuilder { input, mut expr, schema: _, - }) if missing_cols - .iter() - // TODO: how to return err in closure, maybe there is hethod like try_all in iter? - // or use 'for loop' instead of all - .all(|c| input.schema().has_column(c).unwrap()) => - { + }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => { let mut missing_exprs = missing_cols .iter() .map(|c| normalize_col(Expr::Column(c.clone()), &input)) @@ -658,13 +653,13 @@ impl LogicalPlanBuilder { let mut join_on: Vec<(Expr, Expr)> = vec![]; let mut filters: Option = None; for (l, r) in &on { - if self.plan.schema().has_column(l)? - && right.schema().has_column(r)? + if self.plan.schema().has_column(l) + && right.schema().has_column(r) && can_hash(self.plan.schema().field_from_column(l)?.data_type()) { join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone()))); - } else if self.plan.schema().has_column(l)? - && right.schema().has_column(r)? + } else if self.plan.schema().has_column(l) + && right.schema().has_column(r) && can_hash(self.plan.schema().field_from_column(r)?.data_type()) { join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone()))); diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index 217a7305d1e9..7d61401e9cad 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -167,9 +167,7 @@ fn optimize_where_in( let using_cols: Vec = expr .to_columns()? .into_iter() - // TODO: how to return err in closure, maybe there is method like try_filter in iter? - // or use 'for loop' instead of filter - .filter(|col| input_schema.has_column(col).unwrap()) + .filter(|col| input_schema.has_column(col)) .collect::<_>(); cols.extend(using_cols); diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 0a1ffbc31c6d..c8e125bc64de 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -84,10 +84,10 @@ impl OptimizerRule for EliminateOuterJoin { let mut left_non_nullable = false; let mut right_non_nullable = false; for col in non_nullable_cols.iter() { - if join.left.schema().has_column(col)? { + if join.left.schema().has_column(col) { left_non_nullable = true; } - if join.right.schema().has_column(col)? { + if join.right.schema().has_column(col) { right_non_nullable = true; } } @@ -251,10 +251,10 @@ fn extract_non_nullable_columns( { for left_col in &left_non_nullable_cols { for right_col in &right_non_nullable_cols { - if (left_schema.has_column(left_col)? - && left_schema.has_column(right_col)?) - || (right_schema.has_column(left_col)? - && right_schema.has_column(right_col)?) + if (left_schema.has_column(left_col) + && left_schema.has_column(right_col)) + || (right_schema.has_column(left_col) + && right_schema.has_column(right_col)) { non_nullable_cols.push(left_col.clone()); break; diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 83cc8b2edde3..d2bede7eeed1 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -178,7 +178,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let columns = expr.to_columns()?; columns.into_iter().try_for_each::<_, Result<()>>(|c| { - if !schema.has_column(&c)? { + if !schema.has_column(&c) { missing_cols.push(c); } Ok(()) From 668d72e97131f244bff3f50b59eb0c7f09765ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=8F=E5=88=9A?= <35674070+suxiaogang223@users.noreply.github.com> Date: Mon, 20 Feb 2023 19:18:12 +0800 Subject: [PATCH 5/9] Update datafusion/common/src/dfschema.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniël Heres --- datafusion/common/src/dfschema.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index e828a27494ba..3ae61bb83752 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -332,10 +332,7 @@ impl DFSchema { /// Find if the field exists with the given name pub fn has_column_with_unqualified_name(&self, name: &str) -> bool { let matches = self.fields_with_unqualified_name(name); - match matches.len() { - 1 => true, - _ => false, - } + matches.len() == 1 } /// Find if the field exists with the given qualified name From 19f3313e0b80712dfe31436e3ffd51c6fb953af5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=8F=E5=88=9A?= <35674070+suxiaogang223@users.noreply.github.com> Date: Mon, 20 Feb 2023 19:18:42 +0800 Subject: [PATCH 6/9] Update datafusion/common/src/dfschema.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniël Heres --- datafusion/common/src/dfschema.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 3ae61bb83752..d07d29935c75 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -338,10 +338,7 @@ impl DFSchema { /// Find if the field exists with the given qualified name pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) -> bool { let matches = self.fields_with_qualified_name(qualifier, name); - match matches.len() { - 1 => true, - _ => false, - } +matches.len() == 1 } /// Find if the field exists with the given qualified column From 4f36109941e10d08ef1604d8f05d1dc3616fd2fb Mon Sep 17 00:00:00 2001 From: suxiaogang Date: Sun, 26 Feb 2023 20:11:34 +0800 Subject: [PATCH 7/9] format --- datafusion/common/src/dfschema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d07d29935c75..21c750197d5c 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -338,7 +338,7 @@ impl DFSchema { /// Find if the field exists with the given qualified name pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) -> bool { let matches = self.fields_with_qualified_name(qualifier, name); -matches.len() == 1 + matches.len() == 1 } /// Find if the field exists with the given qualified column From c3ee99989c19171d0d58f543e82f81a07fb55ccf Mon Sep 17 00:00:00 2001 From: suxiaogang Date: Tue, 28 Feb 2023 23:57:36 +0800 Subject: [PATCH 8/9] avoid the collect() --- datafusion/common/src/dfschema.rs | 24 +++++------------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 21c750197d5c..c2e0549bb503 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -270,21 +270,6 @@ impl DFSchema { .collect() } - /// Find all fields match the given qualified name - pub fn fields_with_qualified_name( - &self, - qualifier: &str, - name: &str, - ) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| { - field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false) - && field.name() == name - }) - .collect() - } - /// Find all fields match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { self.fields @@ -331,14 +316,15 @@ impl DFSchema { /// Find if the field exists with the given name pub fn has_column_with_unqualified_name(&self, name: &str) -> bool { - let matches = self.fields_with_unqualified_name(name); - matches.len() == 1 + self.fields().iter().any(|field| field.name() == name) } /// Find if the field exists with the given qualified name pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) -> bool { - let matches = self.fields_with_qualified_name(qualifier, name); - matches.len() == 1 + self.fields().iter().any(|field| { + field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false) + && field.name() == name + }) } /// Find if the field exists with the given qualified column From 1479746839a0aec222d234e4e2dbe82211d16c08 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Mar 2023 16:50:46 -0500 Subject: [PATCH 9/9] fix one more is_ok --- datafusion/optimizer/src/decorrelate_where_exists.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 72a68b3123b0..023629b97ee0 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -172,7 +172,7 @@ fn optimize_exists( let using_cols: Vec = expr .to_columns()? .into_iter() - .filter(|col| input_schema.field_from_column(col).is_ok()) + .filter(|col| input_schema.has_column(col)) .collect::<_>(); cols.extend(using_cols);