Skip to content

Commit

Permalink
Recursive unnest (#11062)
Browse files Browse the repository at this point in the history
* chore: fix map children of unnest

* adjust test

* remove debug

* chore: move test to unnest.slt

* chore: rename

* add some comment

* compile err

* more comment

* chore: address comment

* more coverage

* one more scenario
  • Loading branch information
duongcongtoai authored Jul 2, 2024
1 parent 43ea682 commit 58f79e1
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 76 deletions.
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ impl Unnest {
expr: Box::new(expr),
}
}

/// Create a new Unnest expression.
pub fn new_boxed(boxed: Box<Expr>) -> Self {
Self { expr: boxed }
}
}

/// Alias expression
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,6 @@ pub fn project(
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
}
}

validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ impl TreeNode for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::Unnest(_)
| Expr::Literal(_) => Transformed::no(self),
Expr::Unnest(Unnest { expr, .. }) => transform_box(expr, &mut f)?
.update_data(|be| Expr::Unnest(Unnest::new_boxed(be))),
Expr::Alias(Alias {
expr,
relation,
Expand Down
98 changes: 56 additions & 42 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ use crate::planner::{
idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr,
recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns,
resolve_positions_to_exprs,
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};

use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
Expand Down Expand Up @@ -298,46 +297,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
let mut unnest_columns = vec![];
// from which column used for projection, before the unnest happen
// including non unnest column and unnest column
let mut inner_projection_exprs = vec![];

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs: Vec<Expr> = select_exprs
.into_iter()
.map(|expr| {
recursive_transform_unnest(
&input,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

// Do the final projection
if unnest_columns.is_empty() {
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
.build()
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.project(outer_projection_exprs)?
.build()
let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
// The transformation happen bottom up, one at a time for each iteration
// Ony exaust the loop if no more unnest transformation is found
for i in 0.. {
let mut unnest_columns = vec![];
// from which column used for projection, before the unnest happen
// including non unnest column and unnest column
let mut inner_projection_exprs = vec![];

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

// No more unnest is possible
if unnest_columns.is_empty() {
// The original expr does not contain any unnest
if i == 0 {
return LogicalPlanBuilder::from(intermediate_plan)
.project(inner_projection_exprs)?
.build();
}
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
let plan = LogicalPlanBuilder::from(intermediate_plan)
.project(inner_projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;
intermediate_plan = plan;
intermediate_select_exprs = outer_projection_exprs;
}
}
LogicalPlanBuilder::from(intermediate_plan)
.project(intermediate_select_exprs)?
.build()
}

fn plan_selection(
Expand Down
143 changes: 111 additions & 32 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::collections::HashMap;
use arrow_schema::{
DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE,
};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -267,11 +269,13 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
/// - For list column: unnest(col) with type list -> unnest(col) with type list::item
/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2
/// The transformed exprs will be used in the outer projection
pub(crate) fn recursive_transform_unnest(
/// If along the path from root to bottom, there are multiple unnest expressions, the transformation
/// is done only for the bottom expression
pub(crate) fn transform_bottom_unnest(
input: &LogicalPlan,
unnest_placeholder_columns: &mut Vec<String>,
inner_projection_exprs: &mut Vec<Expr>,
original_expr: Expr,
original_expr: &Expr,
) -> Result<Vec<Expr>> {
let mut transform =
|unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
Expand All @@ -298,35 +302,53 @@ pub(crate) fn recursive_transform_unnest(
.collect::<Vec<_>>();
Ok(expr)
};
// expr transformed maybe either the same, or different from the originals exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// This transformation is only done for list unnest
// struct unnest is done at the root level, and at the later stage
// because the syntax of TreeNode only support transform into 1 Expr, while
// Unnest struct will be transformed into multiple Exprs
// TODO: This can be resolved after this issue is resolved: https://github.com/apache/datafusion/issues/10102
//
// The transformation looks like:
// - unnest(array_col) will be transformed into unnest(array_col)
// - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1

// Specifically handle root level unnest expr, this is the only place
// unnest on struct can be handled
if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
return transform(&original_expr, arg);
}
let Transformed {
data: transformed_expr,
transformed,
tnr: _,
} = original_expr.transform_up(|expr: Expr| {
if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let (data_type, _) = arg.data_type_and_nullable(input.schema())?;
if let DataType::Struct(_) = data_type {
return internal_err!("unnest on struct can ony be applied at the root level of select expression");
}
let transformed_exprs = transform(&expr, arg)?;
Ok(Transformed::yes(transformed_exprs[0].clone()))
} else {
Ok(Transformed::no(expr))
data: transformed_expr,
transformed,
tnr: _,
} = original_expr.clone().transform_up(|expr: Expr| {
let is_root_expr = &expr == original_expr;
// Root expr is transformed separately
if is_root_expr {
return Ok(Transformed::no(expr));
}
if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let (data_type, _) = arg.data_type_and_nullable(input.schema())?;

if let DataType::Struct(_) = data_type {
return internal_err!("unnest on struct can ony be applied at the root level of select expression");
}
})?;

let mut transformed_exprs = transform(&expr, arg)?;
// root_expr.push(transformed_exprs[0].clone());
Ok(Transformed::new(
transformed_exprs.swap_remove(0),
true,
TreeNodeRecursion::Stop,
))
} else {
Ok(Transformed::no(expr))
}
})?;

if !transformed {
// Because root expr need to transform separately
// unnest struct is only possible here
// The transformation looks like
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
if let Expr::Unnest(Unnest { expr: ref arg }) = transformed_expr {
return transform(&transformed_expr, arg);
}

if matches!(&transformed_expr, Expr::Column(_)) {
inner_projection_exprs.push(transformed_expr.clone());
Ok(vec![transformed_expr])
Expand All @@ -351,12 +373,13 @@ mod tests {
use arrow_schema::Fields;
use datafusion_common::{DFSchema, Result};
use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan};
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_functions_aggregate::expr_fn::count;

use crate::utils::{recursive_transform_unnest, resolve_positions_to_exprs};
use crate::utils::{resolve_positions_to_exprs, transform_bottom_unnest};

#[test]
fn test_recursive_transform_unnest() -> Result<()> {
fn test_transform_bottom_unnest() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"struct_col",
Expand Down Expand Up @@ -390,11 +413,11 @@ mod tests {

// unnest(struct_col)
let original_expr = unnest(col("struct_col"));
let transformed_exprs = recursive_transform_unnest(
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
original_expr,
&original_expr,
)?;
assert_eq!(
transformed_exprs,
Expand All @@ -413,11 +436,11 @@ mod tests {

// unnest(array_col) + 1
let original_expr = unnest(col("array_col")).add(lit(1i64));
let transformed_exprs = recursive_transform_unnest(
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
original_expr,
&original_expr,
)?;
assert_eq!(
unnest_placeholder_columns,
Expand All @@ -440,6 +463,62 @@ mod tests {
]
);

// a nested structure struct[[]]
let schema = Schema::new(vec![
Field::new(
"struct_col", // {array_col: [1,2,3]}
ArrowDataType::Struct(Fields::from(vec![Field::new(
"matrix",
ArrowDataType::List(Arc::new(Field::new(
"matrix_row",
ArrowDataType::List(Arc::new(Field::new(
"item",
ArrowDataType::Int64,
true,
))),
true,
))),
true,
)])),
false,
),
Field::new("int_col", ArrowDataType::Int32, false),
]);

let dfschema = DFSchema::try_from(schema)?;

let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(dfschema),
});

let mut unnest_placeholder_columns = vec![];
let mut inner_projection_exprs = vec![];

// An expr with multiple unnest
let original_expr = unnest(unnest(col("struct_col").field("matrix")));
let transformed_exprs = transform_bottom_unnest(
&input,
&mut unnest_placeholder_columns,
&mut inner_projection_exprs,
&original_expr,
)?;
// Only the inner most/ bottom most unnest is transformed
assert_eq!(
transformed_exprs,
vec![unnest(col("unnest(struct_col[matrix])"))]
);
assert_eq!(
unnest_placeholder_columns,
vec!["unnest(struct_col[matrix])"]
);
assert_eq!(
inner_projection_exprs,
vec![col("struct_col")
.field("matrix")
.alias("unnest(struct_col[matrix])"),]
);

Ok(())
}

Expand Down
Loading

0 comments on commit 58f79e1

Please sign in to comment.