Skip to content

Commit

Permalink
docstrings and some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Blizzara committed Jun 24, 2024
1 parent f54925a commit 87829ec
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ pub async fn from_substrait_plan(
match plan {
// If the last node of the plan produces expressions, bake the renames into those expressions.
// This isn't necessary for correctness, but helps with roundtrip tests.
LogicalPlan::Projection(p) => Ok(LogicalPlan::Projection(Projection::try_new(rename_expressions(p.expr, p.input.schema(), renamed_schema)?, p.input)?)),
LogicalPlan::Projection(p) => Ok(LogicalPlan::Projection(Projection::try_new(rename_expressions(p.expr, p.input.schema(), &renamed_schema)?, p.input)?)),
LogicalPlan::Aggregate(a) => {
let new_aggr_exprs = rename_expressions(a.aggr_expr, a.input.schema(), renamed_schema)?;
let new_aggr_exprs = rename_expressions(a.aggr_expr, a.input.schema(), &renamed_schema)?;
Ok(LogicalPlan::Aggregate(Aggregate::try_new(a.input, a.group_expr, new_aggr_exprs)?))
},
// There are probably more plans where we could bake things in, can add them later as needed.
// Otherwise, add a new Project to handle the renaming.
_ => Ok(LogicalPlan::Projection(Projection::try_new(rename_expressions(plan.schema().columns().iter().map(|c| col(c.to_owned())), plan.schema(), renamed_schema)?, Arc::new(plan))?))
_ => Ok(LogicalPlan::Projection(Projection::try_new(rename_expressions(plan.schema().columns().iter().map(|c| col(c.to_owned())), plan.schema(), &renamed_schema)?, Arc::new(plan))?))
}
}
},
Expand Down Expand Up @@ -308,10 +308,14 @@ pub fn extract_projection(
}
}

/// Ensure the expressions have the right name(s) according to the new schema.
/// This includes the top-level (column) name, which will be renamed through aliasing if needed,
/// as well as nested names (if the expression produces any struct types), which will be renamed
/// through casting if needed.
fn rename_expressions(
exprs: impl IntoIterator<Item = Expr>,
input_schema: &DFSchema,
new_schema: DFSchemaRef,
new_schema: &DFSchema,
) -> Result<Vec<Expr>> {
exprs
.into_iter()
Expand All @@ -326,27 +330,24 @@ fn rename_expressions(
} else {
old_expr
};
// Alias column if needed
let new_name = new_field.name();
// Alias column if needed to fix the top-level name
match &new_expr {
Expr::Column(c) => {
// If expr is a column reference, alias_if_changed would cause an aliasing if the old expr has a qualifier
if &c.name == new_name {
Ok(new_expr)
} else {
Ok(new_expr.alias(new_name))
}
}
_ => new_expr.alias_if_changed(new_name.to_owned()),
// If expr is a column reference, alias_if_changed would cause an aliasing if the old expr has a qualifier
Expr::Column(c) if &c.name == new_field.name() => Ok(new_expr),
_ => new_expr.alias_if_changed(new_field.name().to_owned()),
}
})
.collect()
}

/// Produce a version of the given schema with names matching the given list of names.
/// Substrait doesn't deal with column (incl. nested struct field) names within the schema,
/// but it does give us the list of expected names at the end of the plan, so we use this
/// to rename the schema to match the expected names.
fn make_renamed_schema(
schema: &DFSchemaRef,
dfs_names: &Vec<String>,
) -> Result<DFSchemaRef> {
) -> Result<DFSchema> {
fn rename_inner_fields(
dtype: &DataType,
dfs_names: &Vec<String>,
Expand Down Expand Up @@ -412,10 +413,10 @@ fn make_renamed_schema(
dfs_names.len());
}

Ok(Arc::new(DFSchema::from_field_specific_qualified_schema(
Ok(DFSchema::from_field_specific_qualified_schema(
qualifiers,
&Arc::new(Schema::new(fields)),
)?))
)?)
}

/// Convert Substrait Rel to DataFusion DataFrame
Expand Down Expand Up @@ -861,15 +862,25 @@ pub async fn from_substrait_rel(
}
}

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For
/// Substrait the names don't matter since it only refers to columns by indices, however DataFusion
/// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names).
fn requalify_sides_if_needed(
left: LogicalPlanBuilder,
right: LogicalPlanBuilder,
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder)> {
// If there are conflicting column names, alias the columns
// This may differ from the original plan, but is necessary as Substrait doesn't have a way to specify aliases
let left_cols = left.schema().columns();
let right_cols = right.schema().columns();
if left_cols.iter().any(|l| right_cols.contains(l)) {
if left_cols.iter().any(|l| {
right_cols.iter().any(|r| {
l == r || (l.name == r.name && (l.relation == None || r.relation == None))
})
}) {
// These names have no connection to the original plan, but they'll make the columns
// (mostly) unique. There may be cases where this still causes duplicates, if either left
// or right side itself contains duplicate names with different qualifiers.
Ok((
left.alias(TableReference::bare("left"))?,
right.alias(TableReference::bare("right"))?,
Expand Down

0 comments on commit 87829ec

Please sign in to comment.