Skip to content
6 changes: 2 additions & 4 deletions datafusion/core/benches/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {

data_frame = data_frame
.with_column_renamed(field_name, new_field_name)
.unwrap();
data_frame = data_frame
.unwrap()
.with_column(new_field_name, btrim(vec![col(new_field_name)]))
.unwrap();
}
Expand All @@ -68,8 +67,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
}

fn criterion_benchmark(c: &mut Criterion) {
// 500 takes far too long right now
for column_count in [10, 100, 200 /* 500 */] {
for column_count in [10, 100, 200, 500] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

let ctx = create_context(column_count).unwrap();

c.bench_function(&format!("with_column_{column_count}"), |b| {
Expand Down
67 changes: 57 additions & 10 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,22 @@ pub struct DataFrame {
// Box the (large) SessionState to reduce the size of DataFrame on the stack
session_state: Box<SessionState>,
plan: LogicalPlan,
// Whether projection ops can skip validation or not. This flag if false
// allows for an optimization in `with_column` and `with_column_renamed` functions
// where the recursive work required to columnize and normalize expressions can
// be skipped if set to false. Since these function calls are often chained or
// called many times in dataframe operations this can result in a significant
// performance gain.
//
// The conditions where this can be set to false is when the dataframe function
// call results in the last operation being a
// `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
// `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
// call. This requirement guarantees that the plan has had all columnization
// and normalization applied to existing expressions and only new expressions
// will require that work. Any operation that update the plan in any way
// via anything other than a `project` call should set this to true.
projection_requires_validation: bool,
}

impl DataFrame {
Expand All @@ -196,6 +212,7 @@ impl DataFrame {
Self {
session_state: Box::new(session_state),
plan,
projection_requires_validation: true,
}
}

Expand Down Expand Up @@ -333,6 +350,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
})
}

Expand Down Expand Up @@ -438,6 +456,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -478,6 +497,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -551,6 +571,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: !is_grouping_set,
})
}

Expand All @@ -563,6 +584,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -601,6 +623,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -638,6 +661,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -676,6 +700,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -707,6 +732,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -748,6 +774,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -948,6 +975,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -997,6 +1025,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -1064,6 +1093,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1123,6 +1153,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1158,6 +1189,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1429,6 +1461,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -1481,6 +1514,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1516,6 +1550,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1561,6 +1596,7 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
Expand Down Expand Up @@ -1630,6 +1666,7 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
Expand Down Expand Up @@ -1699,12 +1736,13 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
}

/// Add an additional column to the DataFrame.
/// Add or replace a column in the DataFrame.
///
/// # Example
/// ```
Expand Down Expand Up @@ -1732,33 +1770,36 @@ impl DataFrame {

let mut col_exists = false;
let new_column = expr.alias(name);
let mut fields: Vec<Expr> = plan
let mut fields: Vec<(Expr, bool)> = plan
.schema()
.iter()
.filter_map(|(qualifier, field)| {
if field.name() == name {
col_exists = true;
Some(new_column.clone())
Some((new_column.clone(), true))
} else {
let e = col(Column::from((qualifier, field)));
window_fn_str
.as_ref()
.filter(|s| *s == &e.to_string())
.is_none()
.then_some(e)
.then_some((e, self.projection_requires_validation))
}
})
.collect();

if !col_exists {
fields.push(new_column);
fields.push((new_column, true));
}

let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;
let project_plan = LogicalPlanBuilder::from(plan)
.project_with_validation(fields)?
.build()?;

Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
})
}

Expand Down Expand Up @@ -1815,19 +1856,23 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name)
(
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name),
false,
)
} else {
col(Column::from((qualifier, field)))
(col(Column::from((qualifier, field))), false)
}
})
.collect::<Vec<_>>();
let project_plan = LogicalPlanBuilder::from(self.plan)
.project(projection)?
.project_with_validation(projection)?
.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
})
}

Expand Down Expand Up @@ -1893,6 +1938,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -1928,6 +1974,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
Expand Down
33 changes: 31 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,15 @@ impl LogicalPlanBuilder {
project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
}

/// Apply a projection without alias with optional validation
/// (true to validate, false to not validate)
pub fn project_with_validation(
self,
expr: Vec<(impl Into<Expr>, bool)>,
) -> Result<Self> {
project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
}

/// Select the given column indices
pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
let exprs: Vec<_> = indices
Expand Down Expand Up @@ -1647,13 +1656,33 @@ pub fn union_by_name(
pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<LogicalPlan> {
project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
}

/// Create Projection. Similar to project except that the expressions
/// passed in have a flag to indicate if that expression requires
/// validation (normalize & columnize) (true) or not (false)
/// # Errors
/// This function errors under any of the following conditions:
/// * Two or more expressions have the same name
/// * An invalid expression is used (e.g. a `sort` expression)
fn project_with_validation(
plan: LogicalPlan,
expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
) -> Result<LogicalPlan> {
let mut projected_expr = vec![];
for e in expr {
for (e, validate) in expr {
let e = e.into();
match e {
Expr::Wildcard { .. } => projected_expr.push(e),
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
_ => {
if validate {
projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
} else {
projected_expr.push(e)
}
}
}
}
validate_unique_names("Projections", projected_expr.iter())?;
Expand Down