diff --git a/datafusion/core/benches/dataframe.rs b/datafusion/core/benches/dataframe.rs index 087764883a33..03078e05e105 100644 --- a/datafusion/core/benches/dataframe.rs +++ b/datafusion/core/benches/dataframe.rs @@ -56,8 +56,7 @@ fn run(column_count: u32, ctx: Arc) { data_frame = data_frame .with_column_renamed(field_name, new_field_name) - .unwrap(); - data_frame = data_frame + .unwrap() .with_column(new_field_name, btrim(vec![col(new_field_name)])) .unwrap(); } @@ -68,8 +67,7 @@ fn run(column_count: u32, ctx: Arc) { } fn criterion_benchmark(c: &mut Criterion) { - // 500 takes far too long right now - for column_count in [10, 100, 200 /* 500 */] { + for column_count in [10, 100, 200, 500] { let ctx = create_context(column_count).unwrap(); c.bench_function(&format!("with_column_{column_count}"), |b| { diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index b6949d2eea9c..f4bf055eea8b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -184,6 +184,22 @@ pub struct DataFrame { // Box the (large) SessionState to reduce the size of DataFrame on the stack session_state: Box, plan: LogicalPlan, + // Whether projection ops can skip validation or not. This flag if false + // allows for an optimization in `with_column` and `with_column_renamed` functions + // where the recursive work required to columnize and normalize expressions can + // be skipped if set to false. Since these function calls are often chained or + // called many times in dataframe operations this can result in a significant + // performance gain. + // + // The conditions where this can be set to false is when the dataframe function + // call results in the last operation being a + // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or + // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()` + // call. This requirement guarantees that the plan has had all columnization + // and normalization applied to existing expressions and only new expressions + // will require that work. Any operation that update the plan in any way + // via anything other than a `project` call should set this to true. + projection_requires_validation: bool, } impl DataFrame { @@ -196,6 +212,7 @@ impl DataFrame { Self { session_state: Box::new(session_state), plan, + projection_requires_validation: true, } } @@ -333,6 +350,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan: project_plan, + projection_requires_validation: false, }) } @@ -438,6 +456,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -478,6 +497,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -551,6 +571,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: !is_grouping_set, }) } @@ -563,6 +584,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -601,6 +623,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -638,6 +661,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -676,6 +700,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -707,6 +732,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -748,6 +774,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -948,6 +975,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -997,6 +1025,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1064,6 +1093,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1123,6 +1153,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1158,6 +1189,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1429,6 +1461,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1481,6 +1514,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1516,6 +1550,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: true, }) } @@ -1561,6 +1596,7 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await @@ -1630,6 +1666,7 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await @@ -1699,12 +1736,13 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await } - /// Add an additional column to the DataFrame. + /// Add or replace a column in the DataFrame. /// /// # Example /// ``` @@ -1732,33 +1770,36 @@ impl DataFrame { let mut col_exists = false; let new_column = expr.alias(name); - let mut fields: Vec = plan + let mut fields: Vec<(Expr, bool)> = plan .schema() .iter() .filter_map(|(qualifier, field)| { if field.name() == name { col_exists = true; - Some(new_column.clone()) + Some((new_column.clone(), true)) } else { let e = col(Column::from((qualifier, field))); window_fn_str .as_ref() .filter(|s| *s == &e.to_string()) .is_none() - .then_some(e) + .then_some((e, self.projection_requires_validation)) } }) .collect(); if !col_exists { - fields.push(new_column); + fields.push((new_column, true)); } - let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; + let project_plan = LogicalPlanBuilder::from(plan) + .project_with_validation(fields)? + .build()?; Ok(DataFrame { session_state: self.session_state, plan: project_plan, + projection_requires_validation: false, }) } @@ -1815,19 +1856,23 @@ impl DataFrame { .iter() .map(|(qualifier, field)| { if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename { - col(Column::from((qualifier, field))) - .alias_qualified(qualifier.cloned(), new_name) + ( + col(Column::from((qualifier, field))) + .alias_qualified(qualifier.cloned(), new_name), + false, + ) } else { - col(Column::from((qualifier, field))) + (col(Column::from((qualifier, field))), false) } }) .collect::>(); let project_plan = LogicalPlanBuilder::from(self.plan) - .project(projection)? + .project_with_validation(projection)? .build()?; Ok(DataFrame { session_state: self.session_state, plan: project_plan, + projection_requires_validation: false, }) } @@ -1893,6 +1938,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } @@ -1928,6 +1974,7 @@ impl DataFrame { Ok(DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, }) } } diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1dd4d68fca6b..1bb5444ca009 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -93,6 +93,7 @@ impl DataFrame { DataFrame { session_state: self.session_state, plan, + projection_requires_validation: self.projection_requires_validation, } .collect() .await diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 4d825c6bfe49..2bb15da21863 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -528,6 +528,15 @@ impl LogicalPlanBuilder { project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new) } + /// Apply a projection without alias with optional validation + /// (true to validate, false to not validate) + pub fn project_with_validation( + self, + expr: Vec<(impl Into, bool)>, + ) -> Result { + project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new) + } + /// Select the given column indices pub fn select(self, indices: impl IntoIterator) -> Result { let exprs: Vec<_> = indices @@ -1647,13 +1656,33 @@ pub fn union_by_name( pub fn project( plan: LogicalPlan, expr: impl IntoIterator>, +) -> Result { + project_with_validation(plan, expr.into_iter().map(|e| (e, true))) +} + +/// Create Projection. Similar to project except that the expressions +/// passed in have a flag to indicate if that expression requires +/// validation (normalize & columnize) (true) or not (false) +/// # Errors +/// This function errors under any of the following conditions: +/// * Two or more expressions have the same name +/// * An invalid expression is used (e.g. a `sort` expression) +fn project_with_validation( + plan: LogicalPlan, + expr: impl IntoIterator, bool)>, ) -> Result { let mut projected_expr = vec![]; - for e in expr { + for (e, validate) in expr { let e = e.into(); match e { Expr::Wildcard { .. } => projected_expr.push(e), - _ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?), + _ => { + if validate { + projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?) + } else { + projected_expr.push(e) + } + } } } validate_unique_names("Projections", projected_expr.iter())?;