From 9b1ab42b28189a126814826ccfca41192e9822ef Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 5 Oct 2024 12:55:19 +0200 Subject: [PATCH 01/12] refactor --- datafusion/common/src/lib.rs | 2 +- datafusion/common/src/unnest.rs | 23 +++ datafusion/expr/src/logical_plan/builder.rs | 143 ++++++++++-------- datafusion/expr/src/logical_plan/plan.rs | 4 +- datafusion/expr/src/logical_plan/tree_node.rs | 2 +- datafusion/sql/src/select.rs | 56 ++++++- datafusion/sql/src/utils.rs | 56 +++---- 7 files changed, 169 insertions(+), 117 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 10541e01914a..8323f5efc86d 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -70,7 +70,7 @@ pub use scalar::{ScalarType, ScalarValue}; pub use schema_reference::SchemaReference; pub use stats::{ColumnStatistics, Statistics}; pub use table_reference::{ResolvedTableReference, TableReference}; -pub use unnest::UnnestOptions; +pub use unnest::{RecursionUnnestOption, UnnestOptions}; pub use utils::project_schema; // These are hidden from docs purely to avoid polluting the public view of what this crate exports. diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index fd92267f9b4c..aba3e9e41726 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -17,6 +17,10 @@ //! [`UnnestOptions`] for unnesting structured types +use hashbrown::HashMap; + +use crate::Column; + /// Options for unnesting a column that contains a list type, /// replicating values in the other, non nested rows. /// @@ -64,6 +68,16 @@ pub struct UnnestOptions { /// Should nulls in the input be preserved? Defaults to true pub preserve_nulls: bool, + /// Without explicit recursions, all + /// Datafusion will infer the unesting type with depth = 1 + pub recursions: Option>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] +pub struct RecursionUnnestOption { + pub input_column: Column, + pub output_column: Column, + pub depth: usize, } impl Default for UnnestOptions { @@ -71,6 +85,7 @@ impl Default for UnnestOptions { Self { // default to true to maintain backwards compatible behavior preserve_nulls: true, + recursions: None, } } } @@ -87,4 +102,12 @@ impl UnnestOptions { self.preserve_nulls = preserve_nulls; self } + + /// Set the recursions for the unnest operation + pub fn with_recursions(mut self, recursions: Vec) -> Self { + if !recursions.is_empty() { + self.recursions = Some(recursions); + } + self + } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ad96f6a85d0e..f879d9d64c4b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1185,7 +1185,7 @@ impl LogicalPlanBuilder { ) -> Result { unnest_with_options( Arc::unwrap_or_clone(self.plan), - vec![(column.into(), ColumnUnnestType::Inferred)], + vec![column.into()], options, ) .map(Self::new) @@ -1197,15 +1197,8 @@ impl LogicalPlanBuilder { columns: Vec, options: UnnestOptions, ) -> Result { - unnest_with_options( - Arc::unwrap_or_clone(self.plan), - columns - .into_iter() - .map(|c| (c, ColumnUnnestType::Inferred)) - .collect(), - options, - ) - .map(Self::new) + unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options) + .map(Self::new) } /// Unnest the given columns with the given [`UnnestOptions`] @@ -1214,7 +1207,7 @@ impl LogicalPlanBuilder { /// e.g select unnest(list_col,depth=1), unnest(list_col,depth=2) pub fn unnest_columns_recursive_with_options( self, - columns: Vec<(Column, ColumnUnnestType)>, + columns: Vec, options: UnnestOptions, ) -> Result { unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options) @@ -1593,14 +1586,13 @@ impl TableSource for LogicalTableSource { /// Create a [`LogicalPlan::Unnest`] plan pub fn unnest(input: LogicalPlan, columns: Vec) -> Result { - let unnestings = columns - .into_iter() - .map(|c| (c, ColumnUnnestType::Inferred)) - .collect(); + let unnestings = columns.into_iter().map(|c| c).collect(); unnest_with_options(input, unnestings, UnnestOptions::default()) } -pub fn get_unnested_list_datatype_recursive( +// get the data type of a multi-dimensional type after unnesting it +// with a given depth +fn get_unnested_list_datatype_recursive( data_type: &DataType, depth: usize, ) -> Result { @@ -1728,20 +1720,15 @@ pub fn get_unnested_columns( /// ``` pub fn unnest_with_options( input: LogicalPlan, - columns_to_unnest: Vec<(Column, ColumnUnnestType)>, + columns_to_unnest: Vec, options: UnnestOptions, ) -> Result { let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![]; let mut struct_columns = vec![]; let indices_to_unnest = columns_to_unnest .iter() - .map(|col_unnesting| { - Ok(( - input.schema().index_of_column(&col_unnesting.0)?, - col_unnesting, - )) - }) - .collect::>>()?; + .map(|c| Ok((input.schema().index_of_column(c)?, c))) + .collect::>>()?; let input_schema = input.schema(); @@ -1766,51 +1753,75 @@ pub fn unnest_with_options( .enumerate() .map(|(index, (original_qualifier, original_field))| { match indices_to_unnest.get(&index) { - Some((column_to_unnest, unnest_type)) => { - let mut inferred_unnest_type = unnest_type.clone(); - if let ColumnUnnestType::Inferred = unnest_type { - inferred_unnest_type = infer_unnest_type( - &column_to_unnest.name, - original_field.data_type(), - )?; - } - let transformed_columns: Vec<(Column, Arc)> = - match inferred_unnest_type { - ColumnUnnestType::Struct => { - struct_columns.push(index); - get_unnested_columns( - &column_to_unnest.name, - original_field.data_type(), - 1, - )? + Some(column_to_unnest) => { + let transformed_columns; + let maybe_explicit_recursion = match options.recursions { + None => None, + Some(ref recursions) => { + let list_recursions_on_column = recursions + .iter() + .filter(|p| -> bool { + &p.input_column == *column_to_unnest + }) + .collect::>(); + if list_recursions_on_column.len() == 0 { + None + } else { + Some(list_recursions_on_column) } - ColumnUnnestType::List(unnest_lists) => { - list_columns.extend( - unnest_lists - .iter() - .map(|ul| (index, ul.to_owned().clone())), - ); - unnest_lists - .iter() - .map( - |ColumnUnnestList { - output_column, - depth, - }| { - get_unnested_columns( - &output_column.name, - original_field.data_type(), - *depth, - ) + } + }; + match maybe_explicit_recursion { + Some(explicit_recursion) => { + transformed_columns = explicit_recursion + .iter() + .map(|r| { + list_columns.push(( + index, + ColumnUnnestList { + output_column: r.output_column.clone(), + depth: r.depth, }, + )); + get_unnested_columns( + &r.output_column.name, + original_field.data_type(), + r.depth, ) - .collect::)>>>>()? - .into_iter() - .flatten() - .collect::>() - } - _ => return internal_err!("Invalid unnest type"), - }; + }) + .collect::)>>>>()? + .into_iter() + .flatten() + .collect::>(); + } + None => { + transformed_columns = get_unnested_columns( + &column_to_unnest.name, + original_field.data_type(), + 1, + )?; + match original_field.data_type() { + DataType::Struct(_) => { + struct_columns.push(index); + } + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) => { + list_columns.push(( + index, + ColumnUnnestList { + output_column: Column::from_name( + &column_to_unnest.name, + ), + depth: 1, + }, + )); + } + _ => {} + }; + } + } + // new columns dependent on the same original index dependency_indices .extend(std::iter::repeat(index).take(transformed_columns.len())); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 443d23804adb..a553113495af 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -3371,7 +3371,7 @@ pub struct Unnest { /// The incoming logical plan pub input: Arc, /// Columns to run unnest on, can be a list of (List/Struct) columns - pub exec_columns: Vec<(Column, ColumnUnnestType)>, + pub exec_columns: Vec, /// refer to the indices(in the input schema) of columns /// that have type list to run unnest on pub list_type_columns: Vec<(usize, ColumnUnnestList)>, @@ -3395,7 +3395,7 @@ impl PartialOrd for Unnest { /// The incoming logical plan pub input: &'a Arc, /// Columns to run unnest on, can be a list of (List/Struct) columns - pub exec_columns: &'a Vec<(Column, ColumnUnnestType)>, + pub exec_columns: &'a Vec, /// refer to the indices(in the input schema) of columns /// that have type list to run unnest on pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>, diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 8ba68697bd4d..0964fb601879 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -497,7 +497,7 @@ impl LogicalPlan { let exprs = columns .iter() - .map(|(c, _)| Expr::Column(c.clone())) + .map(|c| Expr::Column(c.clone())) .collect::>(); exprs.iter().apply_until_stop(f) } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index c93d9e6fc435..e888d15d7458 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; @@ -25,8 +25,8 @@ use crate::utils::{ }; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::UnnestOptions; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; +use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, @@ -306,7 +306,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // The transformation happen bottom up, one at a time for each iteration // Only exaust the loop if no more unnest transformation is found for i in 0.. { - let mut unnest_columns = vec![]; + let mut unnest_columns = HashMap::new(); // from which column used for projection, before the unnest happen // including non unnest column and unnest column let mut inner_projection_exprs = vec![]; @@ -336,11 +336,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + let mut list_recursions = vec![]; + let unnest_col_vec = unnest_columns + .into_iter() + .map(|(col, maybe_list_unnest)| { + match maybe_list_unnest { + None => {} + Some(list_unnest) => { + list_recursions.extend(list_unnest.into_iter().map( + |unnest_list| RecursionUnnestOption { + input_column: col.clone(), + output_column: unnest_list.output_column, + depth: unnest_list.depth, + }, + )); + } + } + col + }) + .collect::>(); + let plan = LogicalPlanBuilder::from(intermediate_plan) .project(inner_projection_exprs)? .unnest_columns_recursive_with_options( - unnest_columns, - unnest_options, + unnest_col_vec, + unnest_options.with_recursions(list_recursions), )? .build()?; intermediate_plan = plan; @@ -410,7 +430,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut intermediate_select_exprs = group_expr; loop { - let mut unnest_columns = vec![]; + let mut unnest_columns = HashMap::new(); let mut inner_projection_exprs = vec![]; let outer_projection_exprs = rewrite_recursive_unnests_bottom_up( @@ -445,11 +465,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; projection_exprs.extend(inner_projection_exprs); + let mut list_recursions = vec![]; + let unnest_col_vec = unnest_columns + .into_iter() + .map(|(col, maybe_list_unnest)| { + match maybe_list_unnest { + None => {} + Some(list_unnest) => { + list_recursions.extend(list_unnest.into_iter().map( + |unnest_list| RecursionUnnestOption { + input_column: col.clone(), + output_column: unnest_list.output_column, + depth: unnest_list.depth, + }, + )); + } + } + col + }) + .collect::>(); + intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) .project(projection_exprs)? .unnest_columns_recursive_with_options( - unnest_columns, - unnest_options, + unnest_col_vec, + unnest_options.with_recursions(list_recursions), )? .build()?; diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 656e4b851aa8..f304d23c10a2 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -18,6 +18,7 @@ //! SQL Utility Functions use std::collections::HashMap; +use std::hash::Hash; use std::vec; use arrow_schema::{ @@ -295,7 +296,7 @@ pub(crate) fn value_to_string(value: &Value) -> Option { pub(crate) fn rewrite_recursive_unnests_bottom_up( input: &LogicalPlan, - unnest_placeholder_columns: &mut Vec<(Column, ColumnUnnestType)>, + unnest_placeholder_columns: &mut HashMap>>, inner_projection_exprs: &mut Vec, original_exprs: &[Expr], ) -> Result> { @@ -326,7 +327,7 @@ struct RecursiveUnnestRewriter<'a> { top_most_unnest: Option, consecutive_unnest: Vec>, inner_projection_exprs: &'a mut Vec, - columns_unnestings: &'a mut Vec<(Column, ColumnUnnestType)>, + columns_unnestings: &'a mut HashMap>>, transformed_root_exprs: Option>, } impl<'a> RecursiveUnnestRewriter<'a> { @@ -380,10 +381,8 @@ impl<'a> RecursiveUnnestRewriter<'a> { self.inner_projection_exprs, expr_in_unnest.clone().alias(placeholder_name.clone()), ); - self.columns_unnestings.push(( - Column::from_name(placeholder_name.clone()), - ColumnUnnestType::Struct, - )); + self.columns_unnestings + .insert(Column::from_name(placeholder_name.clone()), None); Ok( get_struct_unnested_columns(&placeholder_name, &inner_fields) .into_iter() @@ -401,37 +400,17 @@ impl<'a> RecursiveUnnestRewriter<'a> { // let post_unnest_column = Column::from_name(post_unnest_name); let post_unnest_expr = col(post_unnest_name.clone()).alias(alias_name); - match self + let list_unnesting = self .columns_unnestings - .iter_mut() - .find(|(inner_col, _)| inner_col == &placeholder_column) - { - // there is not unnesting done on this column yet - None => { - self.columns_unnestings.push(( - Column::from_name(placeholder_name.clone()), - ColumnUnnestType::List(vec![ColumnUnnestList { - output_column: Column::from_name(post_unnest_name), - depth: level, - }]), - )); - } - // some unnesting(at some level) has been done on this column - // e.g select unnest(column3), unnest(unnest(column3)) - Some((_, unnesting)) => match unnesting { - ColumnUnnestType::List(list) => { - let unnesting = ColumnUnnestList { - output_column: Column::from_name(post_unnest_name), - depth: level, - }; - if !list.contains(&unnesting) { - list.push(unnesting); - } - } - _ => { - return internal_err!("not reached"); - } - }, + .entry(placeholder_column) + .or_insert(Some(vec![])); + let unnesting = ColumnUnnestList { + output_column: Column::from_name(post_unnest_name), + depth: level, + }; + let list_unnestings = list_unnesting.as_mut().unwrap(); + if !list_unnestings.contains(&unnesting) { + list_unnestings.push(unnesting); } Ok(vec![post_unnest_expr]) } @@ -478,8 +457,7 @@ impl<'a> TreeNodeRewriter for RecursiveUnnestRewriter<'a> { } /// The rewriting only happens when the traversal has reached the top-most unnest expr - /// within a sequence of consecutive unnest exprs. - /// node, for example given a stack of expr + /// within a sequence of consecutive unnest exprs node /// /// For example an expr of **unnest(unnest(column1)) + unnest(unnest(unnest(column2)))** /// ```text @@ -589,7 +567,7 @@ fn push_projection_dedupl(projection: &mut Vec, expr: Expr) { /// is done only for the bottom expression pub(crate) fn rewrite_recursive_unnest_bottom_up( input: &LogicalPlan, - unnest_placeholder_columns: &mut Vec<(Column, ColumnUnnestType)>, + unnest_placeholder_columns: &mut HashMap>>, inner_projection_exprs: &mut Vec, original_expr: &Expr, ) -> Result> { From a16799b88c1efeaff6f9a6c899db718e540fb005 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 6 Oct 2024 14:20:10 +0200 Subject: [PATCH 02/12] refactor unnest options --- datafusion/common/src/unnest.rs | 4 -- datafusion/expr/src/logical_plan/builder.rs | 59 +++++----------- datafusion/expr/src/logical_plan/mod.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 33 --------- datafusion/sql/src/utils.rs | 74 +++++++++++---------- 5 files changed, 57 insertions(+), 117 deletions(-) diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index aba3e9e41726..2a6af09a2c08 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -15,10 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! [`UnnestOptions`] for unnesting structured types - -use hashbrown::HashMap; - use crate::Column; /// Options for unnesting a column that contains a list type, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f879d9d64c4b..cf07a8845782 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,7 +54,7 @@ use datafusion_common::{ TableReference, ToDFSchema, UnnestOptions, }; -use super::plan::{ColumnUnnestList, ColumnUnnestType}; +use super::plan::ColumnUnnestList; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -1611,27 +1611,6 @@ fn get_unnested_list_datatype_recursive( internal_err!("trying to unnest on invalid data type {:?}", data_type) } -/// Infer the unnest type based on the data type: -/// - list type: infer to unnest(list(col, depth=1)) -/// - struct type: infer to unnest(struct) -fn infer_unnest_type( - col_name: &String, - data_type: &DataType, -) -> Result { - match data_type { - DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => { - Ok(ColumnUnnestType::List(vec![ColumnUnnestList { - output_column: Column::from_name(col_name), - depth: 1, - }])) - } - DataType::Struct(_) => Ok(ColumnUnnestType::Struct), - _ => { - internal_err!("trying to unnest on invalid data type {:?}", data_type) - } - } -} - pub fn get_struct_unnested_columns( col_name: &String, inner_fields: &Fields, @@ -1764,7 +1743,7 @@ pub fn unnest_with_options( &p.input_column == *column_to_unnest }) .collect::>(); - if list_recursions_on_column.len() == 0 { + if list_recursions_on_column.is_empty() { None } else { Some(list_recursions_on_column) @@ -1870,7 +1849,7 @@ mod tests { use crate::logical_plan::StringifiedPlan; use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery}; - use datafusion_common::SchemaError; + use datafusion_common::{RecursionUnnestOption, SchemaError}; #[test] fn plan_builder_simple() -> Result<()> { @@ -2278,24 +2257,20 @@ mod tests { // Simultaneously unnesting a list (with different depth) and a struct column let plan = nested_table_scan("test_table")? - .unnest_columns_recursive_with_options( - vec![ - ( - "stringss".into(), - ColumnUnnestType::List(vec![ - ColumnUnnestList { - output_column: Column::from_name("stringss_depth_1"), - depth: 1, - }, - ColumnUnnestList { - output_column: Column::from_name("stringss_depth_2"), - depth: 2, - }, - ]), - ), - ("struct_singular".into(), ColumnUnnestType::Inferred), - ], - UnnestOptions::default(), + .unnest_columns_with_options( + vec!["stringss".into(), "struct_singular".into()], + UnnestOptions::default().with_recursions(vec![ + RecursionUnnestOption { + input_column: "stringss".into(), + output_column: "stringss_depth_1".into(), + depth: 1, + }, + RecursionUnnestOption { + input_column: "stringss".into(), + output_column: "stringss_depth_2".into(), + depth: 2, + }, + ]), )? .build()?; diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index a189d4635e00..da44cfb010d7 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -35,8 +35,8 @@ pub use ddl::{ }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ - projection_schema, Aggregate, Analyze, ColumnUnnestList, ColumnUnnestType, CrossJoin, - DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, + projection_schema, Aggregate, Analyze, ColumnUnnestList, CrossJoin, DescribeTable, + Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index a553113495af..1b333d8e21d9 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -3300,39 +3300,6 @@ pub enum Partitioning { DistributeBy(Vec), } -/// Represents the unnesting operation on a column based on the context (a known struct -/// column, a list column, or let the planner infer the unnesting type). -/// -/// The inferred unnesting type works for both struct and list column, but the unnesting -/// will only be done once (depth = 1). In case recursion is needed on a multi-dimensional -/// list type, use [`ColumnUnnestList`] -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] -pub enum ColumnUnnestType { - // Unnesting a list column, a vector of ColumnUnnestList is used because - // a column can be unnested at different levels, resulting different output columns - List(Vec), - // for struct, there can only be one unnest performed on one column at a time - Struct, - // Infer the unnest type based on column schema - // If column is a list column, the unnest depth will be 1 - // This value is to support sugar syntax of old api in Dataframe (unnest(either_list_or_struct_column)) - Inferred, -} - -impl fmt::Display for ColumnUnnestType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ColumnUnnestType::List(lists) => { - let list_strs: Vec = - lists.iter().map(|list| list.to_string()).collect(); - write!(f, "List([{}])", list_strs.join(", ")) - } - ColumnUnnestType::Struct => write!(f, "Struct"), - ColumnUnnestType::Inferred => write!(f, "Inferred"), - } - } -} - /// Represent the unnesting operation on a list column, such as the recursion depth and /// the output column name after unnesting /// diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index f304d23c10a2..6bb226f7eb70 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -18,7 +18,6 @@ //! SQL Utility Functions use std::collections::HashMap; -use std::hash::Hash; use std::vec; use arrow_schema::{ @@ -35,8 +34,7 @@ use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use datafusion_expr::{ - col, expr_vec_fmt, ColumnUnnestList, ColumnUnnestType, Expr, ExprSchemable, - LogicalPlan, + col, expr_vec_fmt, ColumnUnnestList, Expr, ExprSchemable, LogicalPlan, }; use sqlparser::ast::{Ident, Value}; @@ -361,13 +359,11 @@ impl<'a> RecursiveUnnestRewriter<'a> { // Full context, we are trying to plan the execution as InnerProjection->Unnest->OuterProjection // inside unnest execution, each column inside the inner projection // will be transformed into new columns. Thus we need to keep track of these placeholding column names - // let placeholder_name = unnest_expr.display_name()?; let placeholder_name = format!("unnest_placeholder({})", inner_expr_name); let post_unnest_name = format!("unnest_placeholder({},depth={})", inner_expr_name, level); // This is due to the fact that unnest transformation should keep the original // column name as is, to comply with group by and order by - // let post_unnest_alias = print_unnest(&inner_expr_name, level); let placeholder_column = Column::from_name(placeholder_name.clone()); let (data_type, _) = expr_in_unnest.data_type_and_nullable(self.input_schema)?; @@ -398,7 +394,6 @@ impl<'a> RecursiveUnnestRewriter<'a> { expr_in_unnest.clone().alias(placeholder_name.clone()), ); - // let post_unnest_column = Column::from_name(post_unnest_name); let post_unnest_expr = col(post_unnest_name.clone()).alias(alias_name); let list_unnesting = self .columns_unnestings @@ -538,7 +533,7 @@ impl<'a> TreeNodeRewriter for RecursiveUnnestRewriter<'a> { // For column exprs that are not descendants of any unnest node // retain their projection // e.g given expr tree unnest(col_a) + col_b, we have to retain projection of col_b - // this condition can be checked by maintaining an Option + // this condition can be checked by maintaining an Option if matches!(&expr, Expr::Column(_)) && self.top_most_unnest.is_none() { push_projection_dedupl(self.inner_projection_exprs, expr.clone()); } @@ -588,8 +583,8 @@ pub(crate) fn rewrite_recursive_unnest_bottom_up( // TODO: This can be resolved after this issue is resolved: https://github.com/apache/datafusion/issues/10102 // // The transformation looks like: - // - unnest(array_col) will be transformed into unnest(array_col) - // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1 + // - unnest(array_col) will be transformed into Column("unnest_place_holder(array_col)") + // - unnest(array_col) + 1 will be transformed into Column("unnest_place_holder(array_col) + 1") let Transformed { data: transformed_expr, transformed, @@ -617,23 +612,37 @@ pub(crate) fn rewrite_recursive_unnest_bottom_up( #[cfg(test)] mod tests { - use std::{ops::Add, sync::Arc}; + use std::{collections::HashMap, ops::Add, sync::Arc}; use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; use arrow_schema::Fields; use datafusion_common::{Column, DFSchema, Result}; use datafusion_expr::{ - col, lit, unnest, ColumnUnnestType, EmptyRelation, LogicalPlan, + col, lit, unnest, ColumnUnnestList, EmptyRelation, LogicalPlan, }; use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions_aggregate::expr_fn::count; use crate::utils::{resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up}; - fn column_unnests_eq(l: Vec<(&str, &str)>, r: &[(Column, ColumnUnnestType)]) { - let r_formatted: Vec = - r.iter().map(|i| format!("{}|{}", i.0, i.1)).collect(); - let l_formatted: Vec = - l.iter().map(|i| format!("{}|{}", i.0, i.1)).collect(); + fn column_unnests_eq( + l: Vec<&str>, + r: &HashMap>>, + ) { + let r_formatted: Vec = r + .iter() + .map(|i| match i.1 { + None => format!("{}", i.0), + Some(vec) => format!( + "{}=>[{}]", + i.0, + vec.iter() + .map(|i| format!("{}", i)) + .collect::>() + .join(", ") + ), + }) + .collect(); + let l_formatted: Vec = l.iter().map(|i| i.to_string()).collect(); assert_eq!(l_formatted, r_formatted); } @@ -663,7 +672,7 @@ mod tests { schema: Arc::new(dfschema), }); - let mut unnest_placeholder_columns = vec![]; + let mut unnest_placeholder_columns = HashMap::new(); let mut inner_projection_exprs = vec![]; // unnest(unnest(3d_col)) + unnest(unnest(3d_col)) @@ -688,10 +697,9 @@ mod tests { .add(col("i64_col"))] ); column_unnests_eq( - vec![( - "unnest_placeholder(3d_col)", - "List([unnest_placeholder(3d_col,depth=2)|depth=2])", - )], + vec![ + "unnest_placeholder(3d_col)=>[unnest_placeholder(3d_col,depth=2)|depth=2]", + ], &unnest_placeholder_columns, ); @@ -722,9 +730,7 @@ mod tests { ] ); column_unnests_eq( - vec![("unnest_placeholder(3d_col)", - "List([unnest_placeholder(3d_col,depth=2)|depth=2, unnest_placeholder(3d_col,depth=1)|depth=1])"), - ], + vec!["unnest_placeholder(3d_col)=>[unnest_placeholder(3d_col,depth=2)|depth=2, unnest_placeholder(3d_col,depth=1)|depth=1]"], &unnest_placeholder_columns, ); // still reference struct_col in original schema but with alias, @@ -770,7 +776,7 @@ mod tests { schema: Arc::new(dfschema), }); - let mut unnest_placeholder_columns = vec![]; + let mut unnest_placeholder_columns = HashMap::new(); let mut inner_projection_exprs = vec![]; // unnest(struct_col) @@ -789,7 +795,7 @@ mod tests { ] ); column_unnests_eq( - vec![("unnest_placeholder(struct_col)", "Struct")], + vec!["unnest_placeholder(struct_col)"], &unnest_placeholder_columns, ); // still reference struct_col in original schema but with alias, @@ -809,11 +815,8 @@ mod tests { )?; column_unnests_eq( vec![ - ("unnest_placeholder(struct_col)", "Struct"), - ( - "unnest_placeholder(array_col)", - "List([unnest_placeholder(array_col,depth=1)|depth=1])", - ), + "unnest_placeholder(struct_col)", + "unnest_placeholder(array_col)=>[unnest_placeholder(array_col,depth=1)|depth=1]", ], &unnest_placeholder_columns, ); @@ -865,7 +868,7 @@ mod tests { schema: Arc::new(dfschema), }); - let mut unnest_placeholder_columns = vec![]; + let mut unnest_placeholder_columns = HashMap::new(); let mut inner_projection_exprs = vec![]; // An expr with multiple unnest @@ -886,10 +889,9 @@ mod tests { // unnest -> field access -> unnest column_unnests_eq( - vec![( - "unnest_placeholder(struct_col[matrix])", - "List([unnest_placeholder(struct_col[matrix],depth=2)|depth=2])", - )], + vec![ + "unnest_placeholder(struct_col[matrix])=>[unnest_placeholder(struct_col[matrix],depth=2)|depth=2]", + ], &unnest_placeholder_columns, ); From e43bc66891866d38664f97630324c8a752cc40d0 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 6 Oct 2024 14:55:42 +0200 Subject: [PATCH 03/12] more test --- datafusion/expr/src/logical_plan/builder.rs | 13 --- datafusion/sql/src/select.rs | 4 +- datafusion/sql/src/utils.rs | 99 ++++++++++++++++----- 3 files changed, 77 insertions(+), 39 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index cf07a8845782..4b0a4554d9d6 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1200,19 +1200,6 @@ impl LogicalPlanBuilder { unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options) .map(Self::new) } - - /// Unnest the given columns with the given [`UnnestOptions`] - /// if one column is a list type, it can be recursively and simultaneously - /// unnested into the desired recursion levels - /// e.g select unnest(list_col,depth=1), unnest(list_col,depth=2) - pub fn unnest_columns_recursive_with_options( - self, - columns: Vec, - options: UnnestOptions, - ) -> Result { - unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options) - .map(Self::new) - } } impl From for LogicalPlanBuilder { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index e888d15d7458..0ddc89ff0d33 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -358,7 +358,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = LogicalPlanBuilder::from(intermediate_plan) .project(inner_projection_exprs)? - .unnest_columns_recursive_with_options( + .unnest_columns_with_options( unnest_col_vec, unnest_options.with_recursions(list_recursions), )? @@ -487,7 +487,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) .project(projection_exprs)? - .unnest_columns_recursive_with_options( + .unnest_columns_with_options( unnest_col_vec, unnest_options.with_recursions(list_recursions), )? diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 6bb226f7eb70..a97234c881d6 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -839,24 +839,44 @@ mod tests { ] ); - // a nested structure struct[[]] + Ok(()) + } + + // unnest -> field access -> unnest + #[test] + fn test_transform_non_consecutive_unnests() -> Result<()> { + // list of struct + // [struct{'subfield1':list(i64), 'subfield2':list(utf8)}] let schema = Schema::new(vec![ Field::new( - "struct_col", // {array_col: [1,2,3]} - ArrowDataType::Struct(Fields::from(vec![Field::new( - "matrix", - ArrowDataType::List(Arc::new(Field::new( - "matrix_row", - ArrowDataType::List(Arc::new(Field::new( - "item", - ArrowDataType::Int64, + "struct_list", + ArrowDataType::List(Arc::new(Field::new( + "element", + ArrowDataType::Struct(Fields::from(vec![ + Field::new( + // list of i64 + "subfield1", + ArrowDataType::List(Arc::new(Field::new( + "i64_element", + ArrowDataType::Int64, + true, + ))), true, - ))), - true, - ))), + ), + Field::new( + // list of utf8 + "subfield2", + ArrowDataType::List(Arc::new(Field::new( + "utf8_element", + ArrowDataType::Utf8, + true, + ))), + true, + ), + ])), true, - )])), - false, + ))), + true, ), Field::new("int_col", ArrowDataType::Int32, false), ]); @@ -872,34 +892,65 @@ mod tests { let mut inner_projection_exprs = vec![]; // An expr with multiple unnest - let original_expr = unnest(unnest(col("struct_col").field("matrix"))); + let select_expr1 = unnest(unnest(col("struct_list")).field("subfield1")); let transformed_exprs = rewrite_recursive_unnest_bottom_up( &input, &mut unnest_placeholder_columns, &mut inner_projection_exprs, - &original_expr, + &select_expr1, + )?; + // Only the inner most/ bottom most unnest is transformed + assert_eq!( + transformed_exprs, + vec![unnest( + col("unnest_placeholder(struct_list,depth=1)") + .alias("UNNEST(struct_list)") + .field("subfield1") + )] + ); + + column_unnests_eq( + vec![ + "unnest_placeholder(struct_list)=>[unnest_placeholder(struct_list,depth=1)|depth=1]", + ], + &unnest_placeholder_columns, + ); + + assert_eq!( + inner_projection_exprs, + vec![col("struct_list").alias("unnest_placeholder(struct_list)")] + ); + + // continue rewrite another expr in select + let select_expr2 = unnest(unnest(col("struct_list")).field("subfield2")); + let transformed_exprs = rewrite_recursive_unnest_bottom_up( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + &select_expr2, )?; // Only the inner most/ bottom most unnest is transformed assert_eq!( transformed_exprs, - vec![col("unnest_placeholder(struct_col[matrix],depth=2)") - .alias("UNNEST(UNNEST(struct_col[matrix]))")] + vec![unnest( + col("unnest_placeholder(struct_list,depth=1)") + .alias("UNNEST(struct_list)") + .field("subfield2") + )] ); - // TODO: add a test case where - // unnest -> field access -> unnest + // unnest place holder columns remain the same + // because expr1 and expr2 derive from the same unnest result column_unnests_eq( vec![ - "unnest_placeholder(struct_col[matrix])=>[unnest_placeholder(struct_col[matrix],depth=2)|depth=2]", + "unnest_placeholder(struct_list)=>[unnest_placeholder(struct_list,depth=1)|depth=1]", ], &unnest_placeholder_columns, ); assert_eq!( inner_projection_exprs, - vec![col("struct_col") - .field("matrix") - .alias("unnest_placeholder(struct_col[matrix])"),] + vec![col("struct_list").alias("unnest_placeholder(struct_list)")] ); Ok(()) From 7d4ae95365791fb78ce9452c990fc09784ef6f9a Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Thu, 17 Oct 2024 21:18:49 +0200 Subject: [PATCH 04/12] resolve comments --- datafusion/physical-plan/src/unnest.rs | 70 +++++++++++++------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 50af6b4960a5..a24e3c9e0aeb 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -905,12 +905,10 @@ fn repeat_arrs_from_indices( #[cfg(test)] mod tests { use super::*; - use arrow::{ - datatypes::{Field, Int32Type}, - util::pretty::pretty_format_batches, - }; + use arrow::datatypes::{Field, Int32Type}; use arrow_array::{GenericListArray, OffsetSizeTrait, StringArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; + use datafusion_common::assert_batches_eq; // Create a GenericListArray with the following list values: // [A, B, C], [], NULL, [D], NULL, [NULL, F] @@ -1092,38 +1090,37 @@ mod tests { &HashSet::default(), &UnnestOptions { preserve_nulls: true, + recursions: None, }, )?; - let actual = - format!("{}", pretty_format_batches(vec![ret].as_ref())?).to_lowercase(); - let expected = r#" -+---------------------------------+---------------------------------+---------------------------------+ -| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 | -+---------------------------------+---------------------------------+---------------------------------+ -| [1, 2, 3] | 1 | a | -| | 2 | b | -| [4, 5] | 3 | | -| [1, 2, 3] | | a | -| | | b | -| [4, 5] | | | -| [1, 2, 3] | 4 | a | -| | 5 | b | -| [4, 5] | | | -| [7, 8, 9, 10] | 7 | c | -| | 8 | d | -| [11, 12, 13] | 9 | | -| | 10 | | -| [7, 8, 9, 10] | | c | -| | | d | -| [11, 12, 13] | | | -| [7, 8, 9, 10] | 11 | c | -| | 12 | d | -| [11, 12, 13] | 13 | | -| | | e | -+---------------------------------+---------------------------------+---------------------------------+ - "# - .trim(); - assert_eq!(actual, expected); + + let expected = &[ +"+---------------------------------+---------------------------------+---------------------------------+", +"| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |", +"+---------------------------------+---------------------------------+---------------------------------+", +"| [1, 2, 3] | 1 | a |", +"| | 2 | b |", +"| [4, 5] | 3 | |", +"| [1, 2, 3] | | a |", +"| | | b |", +"| [4, 5] | | |", +"| [1, 2, 3] | 4 | a |", +"| | 5 | b |", +"| [4, 5] | | |", +"| [7, 8, 9, 10] | 7 | c |", +"| | 8 | d |", +"| [11, 12, 13] | 9 | |", +"| | 10 | |", +"| [7, 8, 9, 10] | | c |", +"| | | d |", +"| [11, 12, 13] | | |", +"| [7, 8, 9, 10] | 11 | c |", +"| | 12 | d |", +"| [11, 12, 13] | 13 | |", +"| | | e |", +"+---------------------------------+---------------------------------+---------------------------------+", + ]; + assert_batches_eq!(expected, &[ret]); Ok(()) } @@ -1177,7 +1174,10 @@ mod tests { preserve_nulls: bool, expected: Vec, ) -> datafusion_common::Result<()> { - let options = UnnestOptions { preserve_nulls }; + let options = UnnestOptions { + preserve_nulls, + recursions: None, + }; let longest_length = find_longest_length(list_arrays, &options)?; let expected_array = Int64Array::from(expected); assert_eq!( From 8e824a9cdb71dc5151fa7827727ecaee9dd11bc5 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Thu, 17 Oct 2024 21:23:03 +0200 Subject: [PATCH 05/12] add back doc --- datafusion/common/src/unnest.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index 2a6af09a2c08..9c67df6fa7fe 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`UnnestOptions`] for unnesting structured types + use crate::Column; /// Options for unnesting a column that contains a list type, From 6ea0c9306eaaa78d6e0b33d2eac08d3e2621dc45 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Thu, 17 Oct 2024 22:39:04 +0200 Subject: [PATCH 06/12] fix proto --- datafusion/proto/proto/datafusion.proto | 18 +- datafusion/proto/src/generated/pbjson.rs | 285 +++++++++--------- datafusion/proto/src/generated/prost.rs | 32 +- .../proto/src/logical_plan/from_proto.rs | 17 +- datafusion/proto/src/logical_plan/mod.rs | 63 +--- datafusion/proto/src/logical_plan/to_proto.rs | 12 + 6 files changed, 198 insertions(+), 229 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 5256f7473c95..ac13b8585732 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -264,7 +264,7 @@ message CopyToNode { message UnnestNode { LogicalPlanNode input = 1; - repeated ColumnUnnestExec exec_columns = 2; + repeated datafusion_common.Column exec_columns = 2; repeated ColumnUnnestListItem list_type_columns = 3; repeated uint64 struct_type_columns = 4; repeated uint64 dependency_indices = 5; @@ -285,17 +285,15 @@ message ColumnUnnestListRecursion { uint32 depth = 2; } -message ColumnUnnestExec { - datafusion_common.Column column = 1; - oneof UnnestType { - ColumnUnnestListRecursions list = 2; - datafusion_common.EmptyMessage struct = 3; - datafusion_common.EmptyMessage inferred = 4; - } -} - message UnnestOptions { bool preserve_nulls = 1; + repeated RecursionUnnestOption recursions = 2; +} + +message RecursionUnnestOption { + datafusion_common.Column output_column = 1; + datafusion_common.Column input_column = 2; + uint32 depth = 3; } message UnionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e876008e853f..68911984020e 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -2312,145 +2312,6 @@ impl<'de> serde::Deserialize<'de> for ColumnIndex { deserializer.deserialize_struct("datafusion.ColumnIndex", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for ColumnUnnestExec { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.column.is_some() { - len += 1; - } - if self.unnest_type.is_some() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.ColumnUnnestExec", len)?; - if let Some(v) = self.column.as_ref() { - struct_ser.serialize_field("column", v)?; - } - if let Some(v) = self.unnest_type.as_ref() { - match v { - column_unnest_exec::UnnestType::List(v) => { - struct_ser.serialize_field("list", v)?; - } - column_unnest_exec::UnnestType::Struct(v) => { - struct_ser.serialize_field("struct", v)?; - } - column_unnest_exec::UnnestType::Inferred(v) => { - struct_ser.serialize_field("inferred", v)?; - } - } - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for ColumnUnnestExec { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "column", - "list", - "struct", - "inferred", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Column, - List, - Struct, - Inferred, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "column" => Ok(GeneratedField::Column), - "list" => Ok(GeneratedField::List), - "struct" => Ok(GeneratedField::Struct), - "inferred" => Ok(GeneratedField::Inferred), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = ColumnUnnestExec; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.ColumnUnnestExec") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut column__ = None; - let mut unnest_type__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::Column => { - if column__.is_some() { - return Err(serde::de::Error::duplicate_field("column")); - } - column__ = map_.next_value()?; - } - GeneratedField::List => { - if unnest_type__.is_some() { - return Err(serde::de::Error::duplicate_field("list")); - } - unnest_type__ = map_.next_value::<::std::option::Option<_>>()?.map(column_unnest_exec::UnnestType::List) -; - } - GeneratedField::Struct => { - if unnest_type__.is_some() { - return Err(serde::de::Error::duplicate_field("struct")); - } - unnest_type__ = map_.next_value::<::std::option::Option<_>>()?.map(column_unnest_exec::UnnestType::Struct) -; - } - GeneratedField::Inferred => { - if unnest_type__.is_some() { - return Err(serde::de::Error::duplicate_field("inferred")); - } - unnest_type__ = map_.next_value::<::std::option::Option<_>>()?.map(column_unnest_exec::UnnestType::Inferred) -; - } - } - } - Ok(ColumnUnnestExec { - column: column__, - unnest_type: unnest_type__, - }) - } - } - deserializer.deserialize_struct("datafusion.ColumnUnnestExec", FIELDS, GeneratedVisitor) - } -} impl serde::Serialize for ColumnUnnestListItem { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -17495,6 +17356,135 @@ impl<'de> serde::Deserialize<'de> for ProjectionNode { deserializer.deserialize_struct("datafusion.ProjectionNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for RecursionUnnestOption { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.output_column.is_some() { + len += 1; + } + if self.input_column.is_some() { + len += 1; + } + if self.depth != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.RecursionUnnestOption", len)?; + if let Some(v) = self.output_column.as_ref() { + struct_ser.serialize_field("outputColumn", v)?; + } + if let Some(v) = self.input_column.as_ref() { + struct_ser.serialize_field("inputColumn", v)?; + } + if self.depth != 0 { + struct_ser.serialize_field("depth", &self.depth)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for RecursionUnnestOption { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "output_column", + "outputColumn", + "input_column", + "inputColumn", + "depth", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + OutputColumn, + InputColumn, + Depth, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "outputColumn" | "output_column" => Ok(GeneratedField::OutputColumn), + "inputColumn" | "input_column" => Ok(GeneratedField::InputColumn), + "depth" => Ok(GeneratedField::Depth), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = RecursionUnnestOption; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.RecursionUnnestOption") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut output_column__ = None; + let mut input_column__ = None; + let mut depth__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::OutputColumn => { + if output_column__.is_some() { + return Err(serde::de::Error::duplicate_field("outputColumn")); + } + output_column__ = map_.next_value()?; + } + GeneratedField::InputColumn => { + if input_column__.is_some() { + return Err(serde::de::Error::duplicate_field("inputColumn")); + } + input_column__ = map_.next_value()?; + } + GeneratedField::Depth => { + if depth__.is_some() { + return Err(serde::de::Error::duplicate_field("depth")); + } + depth__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(RecursionUnnestOption { + output_column: output_column__, + input_column: input_column__, + depth: depth__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.RecursionUnnestOption", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for RepartitionExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -20417,10 +20407,16 @@ impl serde::Serialize for UnnestOptions { if self.preserve_nulls { len += 1; } + if !self.recursions.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.UnnestOptions", len)?; if self.preserve_nulls { struct_ser.serialize_field("preserveNulls", &self.preserve_nulls)?; } + if !self.recursions.is_empty() { + struct_ser.serialize_field("recursions", &self.recursions)?; + } struct_ser.end() } } @@ -20433,11 +20429,13 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { const FIELDS: &[&str] = &[ "preserve_nulls", "preserveNulls", + "recursions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { PreserveNulls, + Recursions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -20460,6 +20458,7 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { { match value { "preserveNulls" | "preserve_nulls" => Ok(GeneratedField::PreserveNulls), + "recursions" => Ok(GeneratedField::Recursions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -20480,6 +20479,7 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { V: serde::de::MapAccess<'de>, { let mut preserve_nulls__ = None; + let mut recursions__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::PreserveNulls => { @@ -20488,10 +20488,17 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { } preserve_nulls__ = Some(map_.next_value()?); } + GeneratedField::Recursions => { + if recursions__.is_some() { + return Err(serde::de::Error::duplicate_field("recursions")); + } + recursions__ = Some(map_.next_value()?); + } } } Ok(UnnestOptions { preserve_nulls: preserve_nulls__.unwrap_or_default(), + recursions: recursions__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 2aa14f7e80b0..a016099b530b 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -400,7 +400,7 @@ pub struct UnnestNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, repeated, tag = "2")] - pub exec_columns: ::prost::alloc::vec::Vec, + pub exec_columns: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "3")] pub list_type_columns: ::prost::alloc::vec::Vec, #[prost(uint64, repeated, tag = "4")] @@ -432,28 +432,20 @@ pub struct ColumnUnnestListRecursion { pub depth: u32, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ColumnUnnestExec { - #[prost(message, optional, tag = "1")] - pub column: ::core::option::Option, - #[prost(oneof = "column_unnest_exec::UnnestType", tags = "2, 3, 4")] - pub unnest_type: ::core::option::Option, -} -/// Nested message and enum types in `ColumnUnnestExec`. -pub mod column_unnest_exec { - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum UnnestType { - #[prost(message, tag = "2")] - List(super::ColumnUnnestListRecursions), - #[prost(message, tag = "3")] - Struct(super::super::datafusion_common::EmptyMessage), - #[prost(message, tag = "4")] - Inferred(super::super::datafusion_common::EmptyMessage), - } -} -#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct UnnestOptions { #[prost(bool, tag = "1")] pub preserve_nulls: bool, + #[prost(message, repeated, tag = "2")] + pub recursions: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RecursionUnnestOption { + #[prost(message, optional, tag = "1")] + pub output_column: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub input_column: ::core::option::Option, + #[prost(uint32, tag = "3")] + pub depth: u32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct UnionNode { diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 32e1b68203ce..7865876cc9e5 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use datafusion::execution::registry::FunctionRegistry; use datafusion_common::{ - exec_datafusion_err, internal_err, plan_datafusion_err, Result, ScalarValue, - TableReference, UnnestOptions, + exec_datafusion_err, internal_err, plan_datafusion_err, RecursionUnnestOption, + Result, ScalarValue, TableReference, UnnestOptions, }; use datafusion_expr::expr::{Alias, Placeholder, Sort}; use datafusion_expr::expr::{Unnest, WildcardOptions}; @@ -56,6 +56,19 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions { fn from(opts: &protobuf::UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, + recursions: match opts.recursions.len() { + 0 => None, + _ => Some( + opts.recursions + .iter() + .map(|r| RecursionUnnestOption { + input_column: r.input_column.as_ref().unwrap().into(), + output_column: r.output_column.as_ref().unwrap().into(), + depth: r.depth as usize, + }) + .collect::>(), + ), + }, } } } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 6061a7a0619a..f57910b09ade 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -19,11 +19,10 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use crate::protobuf::column_unnest_exec::UnnestType; use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan; use crate::protobuf::{ - ColumnUnnestExec, ColumnUnnestListItem, ColumnUnnestListRecursion, - ColumnUnnestListRecursions, CustomTableScanNode, SortExprNodeCollection, + ColumnUnnestListItem, ColumnUnnestListRecursion, CustomTableScanNode, + SortExprNodeCollection, }; use crate::{ convert_required, into_required, @@ -69,8 +68,7 @@ use datafusion_expr::{ DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, WindowUDF, }; -use datafusion_expr::{AggregateUDF, ColumnUnnestList, ColumnUnnestType, Unnest}; -use datafusion_proto_common::EmptyMessage; +use datafusion_expr::{AggregateUDF, ColumnUnnestList, Unnest}; use self::to_proto::{serialize_expr, serialize_exprs}; use crate::logical_plan::to_proto::serialize_sorts; @@ -875,33 +873,7 @@ impl AsLogicalPlan for LogicalPlanNode { into_logical_plan!(unnest.input, ctx, extension_codec)?; Ok(datafusion_expr::LogicalPlan::Unnest(Unnest { input: Arc::new(input), - exec_columns: unnest - .exec_columns - .iter() - .map(|c| { - ( - c.column.as_ref().unwrap().to_owned().into(), - match c.unnest_type.as_ref().unwrap() { - UnnestType::Inferred(_) => ColumnUnnestType::Inferred, - UnnestType::Struct(_) => ColumnUnnestType::Struct, - UnnestType::List(l) => ColumnUnnestType::List( - l.recursions - .iter() - .map(|ul| ColumnUnnestList { - output_column: ul - .output_column - .as_ref() - .unwrap() - .to_owned() - .into(), - depth: ul.depth as usize, - }) - .collect(), - ), - }, - ) - }) - .collect(), + exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(), list_type_columns: unnest .list_type_columns .iter() @@ -1610,32 +1582,7 @@ impl AsLogicalPlan for LogicalPlanNode { input: Some(Box::new(input)), exec_columns: exec_columns .iter() - .map(|(col, unnesting)| ColumnUnnestExec { - column: Some(col.into()), - unnest_type: Some(match unnesting { - ColumnUnnestType::Inferred => { - UnnestType::Inferred(EmptyMessage {}) - } - ColumnUnnestType::Struct => { - UnnestType::Struct(EmptyMessage {}) - } - ColumnUnnestType::List(list) => { - UnnestType::List(ColumnUnnestListRecursions { - recursions: list - .iter() - .map(|ul| ColumnUnnestListRecursion { - output_column: Some( - ul.output_column - .to_owned() - .into(), - ), - depth: ul.depth as _, - }) - .collect(), - }) - } - }), - }) + .map(|col| col.into()) .collect(), list_type_columns: proto_unnest_list_items, struct_type_columns: struct_type_columns diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 07823b422f71..8e8d9fbf5ce7 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -30,6 +30,7 @@ use datafusion_expr::{ WindowFrameUnits, WindowFunctionDefinition, }; +use crate::protobuf::RecursionUnnestOption; use crate::protobuf::{ self, plan_type::PlanTypeEnum::{ @@ -49,6 +50,17 @@ impl From<&UnnestOptions> for protobuf::UnnestOptions { fn from(opts: &UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, + recursions: match opts.recursions.as_ref() { + None => vec![], + Some(recursions) => recursions + .iter() + .map(|r| RecursionUnnestOption { + input_column: Some((&r.input_column).into()), + output_column: Some((&r.output_column).into()), + depth: r.depth as u32, + }) + .collect(), + }, } } } From 1f92614c785ed8b2ddf04fb37539f8bafe0d06dc Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Thu, 17 Oct 2024 22:52:39 +0200 Subject: [PATCH 07/12] flaky test --- datafusion/sql/src/utils.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 3a4870c56e89..23e990a28009 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -630,7 +630,7 @@ mod tests { l: Vec<&str>, r: &HashMap>>, ) { - let r_formatted: Vec = r + let mut r_formatted: Vec = r .iter() .map(|i| match i.1 { None => format!("{}", i.0), @@ -644,7 +644,9 @@ mod tests { ), }) .collect(); - let l_formatted: Vec = l.iter().map(|i| i.to_string()).collect(); + let mut l_formatted: Vec = l.iter().map(|i| i.to_string()).collect(); + r_formatted.sort(); + l_formatted.sort(); assert_eq!(l_formatted, r_formatted); } From 2d19955a05691ddb34f81715538c5ff89d9dabdd Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Thu, 17 Oct 2024 23:12:52 +0200 Subject: [PATCH 08/12] clippy --- datafusion/expr/src/logical_plan/builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 53466d9f8c8f..159ce45aa006 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1568,8 +1568,7 @@ impl TableSource for LogicalTableSource { /// Create a [`LogicalPlan::Unnest`] plan pub fn unnest(input: LogicalPlan, columns: Vec) -> Result { - let unnestings = columns.into_iter().map(|c| c).collect(); - unnest_with_options(input, unnestings, UnnestOptions::default()) + unnest_with_options(input, columns, UnnestOptions::default()) } // Get the data type of a multi-dimensional type after unnesting it From aa9327ac968f3cae26e471e4689a8939335cf94a Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 20 Oct 2024 10:48:20 +0200 Subject: [PATCH 09/12] use indexmap --- datafusion/common/src/unnest.rs | 21 +-- datafusion/expr/src/logical_plan/builder.rs | 125 ++++++++---------- datafusion/physical-plan/src/unnest.rs | 2 +- .../proto/src/logical_plan/from_proto.rs | 22 ++- datafusion/proto/src/logical_plan/to_proto.rs | 20 ++- datafusion/sql/Cargo.toml | 1 + datafusion/sql/src/select.rs | 95 ++++++------- datafusion/sql/src/utils.rs | 25 ++-- 8 files changed, 142 insertions(+), 169 deletions(-) diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index 9c67df6fa7fe..7ae1d06a68e3 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -62,15 +62,22 @@ use crate::Column; /// └─────────┘ └─────┘ └─────────┘ └─────┘ /// c1 c2 c1 c2 /// ``` +/// +/// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple +/// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this +/// options is inferred to be unnested with depth = 1 #[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)] pub struct UnnestOptions { /// Should nulls in the input be preserved? Defaults to true pub preserve_nulls: bool, - /// Without explicit recursions, all - /// Datafusion will infer the unesting type with depth = 1 - pub recursions: Option>, + /// If specific columns need to be unnested multiple times (e.g at different depth), + /// declare them here. Any unnested columns not being mentioned inside this option + /// will be unnested with depth = 1 + pub recursions: Vec, } +/// Instruction on how to unnest a column (mostly with a list type) +/// such as how to name the output, and how many times it should be unnested #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] pub struct RecursionUnnestOption { pub input_column: Column, @@ -83,7 +90,7 @@ impl Default for UnnestOptions { Self { // default to true to maintain backwards compatible behavior preserve_nulls: true, - recursions: None, + recursions: vec![], } } } @@ -102,10 +109,8 @@ impl UnnestOptions { } /// Set the recursions for the unnest operation - pub fn with_recursions(mut self, recursions: Vec) -> Self { - if !recursions.is_empty() { - self.recursions = Some(recursions); - } + pub fn with_recursions(mut self, recursion: RecursionUnnestOption) -> Self { + self.recursions.push(recursion); self } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 159ce45aa006..1b33e4385608 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1714,72 +1714,56 @@ pub fn unnest_with_options( .map(|(index, (original_qualifier, original_field))| { match indices_to_unnest.get(&index) { Some(column_to_unnest) => { - let transformed_columns; - let maybe_explicit_recursion = match options.recursions { - None => None, - Some(ref recursions) => { - let list_recursions_on_column = recursions - .iter() - .filter(|p| -> bool { - &p.input_column == *column_to_unnest - }) - .collect::>(); - if list_recursions_on_column.is_empty() { - None - } else { - Some(list_recursions_on_column) - } - } - }; - match maybe_explicit_recursion { - Some(explicit_recursion) => { - transformed_columns = explicit_recursion - .iter() - .map(|r| { - list_columns.push(( - index, - ColumnUnnestList { - output_column: r.output_column.clone(), - depth: r.depth, - }, - )); - get_unnested_columns( - &r.output_column.name, - original_field.data_type(), - r.depth, - ) - }) - .collect::)>>>>()? - .into_iter() - .flatten() - .collect::>(); - } - None => { - transformed_columns = get_unnested_columns( - &column_to_unnest.name, + let recursions_on_column = options + .recursions + .iter() + .filter(|p| -> bool { &p.input_column == *column_to_unnest }) + .collect::>(); + let mut transformed_columns = recursions_on_column + .iter() + .map(|r| { + list_columns.push(( + index, + ColumnUnnestList { + output_column: r.output_column.clone(), + depth: r.depth, + }, + )); + Ok(get_unnested_columns( + &r.output_column.name, original_field.data_type(), - 1, - )?; - match original_field.data_type() { - DataType::Struct(_) => { - struct_columns.push(index); - } - DataType::List(_) - | DataType::FixedSizeList(_, _) - | DataType::LargeList(_) => { - list_columns.push(( - index, - ColumnUnnestList { - output_column: Column::from_name( - &column_to_unnest.name, - ), - depth: 1, - }, - )); - } - _ => {} - }; - } + r.depth, + )? + .into_iter() + .next() + .unwrap()) // because unnesting a list column always result into one result + }) + .collect::)>>>()?; + if transformed_columns.is_empty() { + transformed_columns = get_unnested_columns( + &column_to_unnest.name, + original_field.data_type(), + 1, + )?; + match original_field.data_type() { + DataType::Struct(_) => { + struct_columns.push(index); + } + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) => { + list_columns.push(( + index, + ColumnUnnestList { + output_column: Column::from_name( + &column_to_unnest.name, + ), + depth: 1, + }, + )); + } + _ => {} + }; } // new columns dependent on the same original index @@ -2240,18 +2224,17 @@ mod tests { let plan = nested_table_scan("test_table")? .unnest_columns_with_options( vec!["stringss".into(), "struct_singular".into()], - UnnestOptions::default().with_recursions(vec![ - RecursionUnnestOption { + UnnestOptions::default() + .with_recursions(RecursionUnnestOption { input_column: "stringss".into(), output_column: "stringss_depth_1".into(), depth: 1, - }, - RecursionUnnestOption { + }) + .with_recursions(RecursionUnnestOption { input_column: "stringss".into(), output_column: "stringss_depth_2".into(), depth: 2, - }, - ]), + }), )? .build()?; diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index a24e3c9e0aeb..9135b526582c 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -1090,7 +1090,7 @@ mod tests { &HashSet::default(), &UnnestOptions { preserve_nulls: true, - recursions: None, + recursions: vec![], }, )?; diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index bf8f14f04594..99b11939e95b 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -56,19 +56,15 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions { fn from(opts: &protobuf::UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, - recursions: match opts.recursions.len() { - 0 => None, - _ => Some( - opts.recursions - .iter() - .map(|r| RecursionUnnestOption { - input_column: r.input_column.as_ref().unwrap().into(), - output_column: r.output_column.as_ref().unwrap().into(), - depth: r.depth as usize, - }) - .collect::>(), - ), - }, + recursions: opts + .recursions + .iter() + .map(|r| RecursionUnnestOption { + input_column: r.input_column.as_ref().unwrap().into(), + output_column: r.output_column.as_ref().unwrap().into(), + depth: r.depth as usize, + }) + .collect::>(), } } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 5ecca320d205..a34a220e490c 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -50,17 +50,15 @@ impl From<&UnnestOptions> for protobuf::UnnestOptions { fn from(opts: &UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, - recursions: match opts.recursions.as_ref() { - None => vec![], - Some(recursions) => recursions - .iter() - .map(|r| RecursionUnnestOption { - input_column: Some((&r.input_column).into()), - output_column: Some((&r.output_column).into()), - depth: r.depth as u32, - }) - .collect(), - }, + recursions: opts + .recursions + .iter() + .map(|r| RecursionUnnestOption { + input_column: Some((&r.input_column).into()), + output_column: Some((&r.output_column).into()), + depth: r.depth as u32, + }) + .collect(), } } } diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 5c4b83fe38e1..b29e3e9468dd 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -50,6 +50,7 @@ log = { workspace = true } regex = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } +indexmap = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index ecd3d142133d..80a08da5e35d 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; @@ -38,6 +38,7 @@ use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, }; +use indexmap::IndexMap; use sqlparser::ast::{ Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, WildcardAdditionalOptions, WindowType, @@ -301,7 +302,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // The transformation happen bottom up, one at a time for each iteration // Only exhaust the loop if no more unnest transformation is found for i in 0.. { - let mut unnest_columns = HashMap::new(); + let mut unnest_columns = IndexMap::new(); // from which column used for projection, before the unnest happen // including non unnest column and unnest column let mut inner_projection_exprs = vec![]; @@ -329,34 +330,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { break; } else { // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL - let unnest_options = UnnestOptions::new().with_preserve_nulls(false); - - let mut list_recursions = vec![]; - let unnest_col_vec = unnest_columns - .into_iter() - .map(|(col, maybe_list_unnest)| { - match maybe_list_unnest { - None => {} - Some(list_unnest) => { - list_recursions.extend(list_unnest.into_iter().map( - |unnest_list| RecursionUnnestOption { - input_column: col.clone(), - output_column: unnest_list.output_column, - depth: unnest_list.depth, - }, - )); - } - } - col - }) - .collect::>(); - + let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false); + let mut unnest_col_vec = vec![]; + + for (col, maybe_list_unnest) in unnest_columns.into_iter() { + if let Some(list_unnest) = maybe_list_unnest { + unnest_options = list_unnest.into_iter().fold( + unnest_options, + |options, unnest_list| { + options.with_recursions(RecursionUnnestOption { + input_column: col.clone(), + output_column: unnest_list.output_column, + depth: unnest_list.depth, + }) + }, + ); + } + unnest_col_vec.push(col); + } let plan = LogicalPlanBuilder::from(intermediate_plan) .project(inner_projection_exprs)? - .unnest_columns_with_options( - unnest_col_vec, - unnest_options.with_recursions(list_recursions), - )? + .unnest_columns_with_options(unnest_col_vec, unnest_options)? .build()?; intermediate_plan = plan; intermediate_select_exprs = outer_projection_exprs; @@ -425,7 +419,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut intermediate_select_exprs = group_expr; loop { - let mut unnest_columns = HashMap::new(); + let mut unnest_columns = IndexMap::new(); let mut inner_projection_exprs = vec![]; let outer_projection_exprs = rewrite_recursive_unnests_bottom_up( @@ -438,7 +432,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if unnest_columns.is_empty() { break; } else { - let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false); let mut projection_exprs = match &aggr_expr_using_columns { Some(exprs) => (*exprs).clone(), @@ -460,32 +454,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; projection_exprs.extend(inner_projection_exprs); - let mut list_recursions = vec![]; - let unnest_col_vec = unnest_columns - .into_iter() - .map(|(col, maybe_list_unnest)| { - match maybe_list_unnest { - None => {} - Some(list_unnest) => { - list_recursions.extend(list_unnest.into_iter().map( - |unnest_list| RecursionUnnestOption { - input_column: col.clone(), - output_column: unnest_list.output_column, - depth: unnest_list.depth, - }, - )); - } - } - col - }) - .collect::>(); + let mut unnest_col_vec = vec![]; + + for (col, maybe_list_unnest) in unnest_columns.into_iter() { + if let Some(list_unnest) = maybe_list_unnest { + unnest_options = list_unnest.into_iter().fold( + unnest_options, + |options, unnest_list| { + options.with_recursions(RecursionUnnestOption { + input_column: col.clone(), + output_column: unnest_list.output_column, + depth: unnest_list.depth, + }) + }, + ); + } + unnest_col_vec.push(col); + } intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) .project(projection_exprs)? - .unnest_columns_with_options( - unnest_col_vec, - unnest_options.with_recursions(list_recursions), - )? + .unnest_columns_with_options(unnest_col_vec, unnest_options)? .build()?; intermediate_select_exprs = outer_projection_exprs; diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 23e990a28009..14436de01843 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -36,6 +36,7 @@ use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use datafusion_expr::{ col, expr_vec_fmt, ColumnUnnestList, Expr, ExprSchemable, LogicalPlan, }; +use indexmap::IndexMap; use sqlparser::ast::{Ident, Value}; /// Make a best-effort attempt at resolving all columns in the expression tree @@ -294,7 +295,7 @@ pub(crate) fn value_to_string(value: &Value) -> Option { pub(crate) fn rewrite_recursive_unnests_bottom_up( input: &LogicalPlan, - unnest_placeholder_columns: &mut HashMap>>, + unnest_placeholder_columns: &mut IndexMap>>, inner_projection_exprs: &mut Vec, original_exprs: &[Expr], ) -> Result> { @@ -325,7 +326,7 @@ struct RecursiveUnnestRewriter<'a> { top_most_unnest: Option, consecutive_unnest: Vec>, inner_projection_exprs: &'a mut Vec, - columns_unnestings: &'a mut HashMap>>, + columns_unnestings: &'a mut IndexMap>>, transformed_root_exprs: Option>, } impl<'a> RecursiveUnnestRewriter<'a> { @@ -562,7 +563,7 @@ fn push_projection_dedupl(projection: &mut Vec, expr: Expr) { /// is done only for the bottom expression pub(crate) fn rewrite_recursive_unnest_bottom_up( input: &LogicalPlan, - unnest_placeholder_columns: &mut HashMap>>, + unnest_placeholder_columns: &mut IndexMap>>, inner_projection_exprs: &mut Vec, original_expr: &Expr, ) -> Result> { @@ -614,7 +615,7 @@ pub(crate) fn rewrite_recursive_unnest_bottom_up( #[cfg(test)] mod tests { - use std::{collections::HashMap, ops::Add, sync::Arc}; + use std::{ops::Add, sync::Arc}; use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; use arrow_schema::Fields; @@ -624,13 +625,15 @@ mod tests { }; use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions_aggregate::expr_fn::count; + use indexmap::IndexMap; use crate::utils::{resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up}; + fn column_unnests_eq( l: Vec<&str>, - r: &HashMap>>, + r: &IndexMap>>, ) { - let mut r_formatted: Vec = r + let r_formatted: Vec = r .iter() .map(|i| match i.1 { None => format!("{}", i.0), @@ -644,9 +647,7 @@ mod tests { ), }) .collect(); - let mut l_formatted: Vec = l.iter().map(|i| i.to_string()).collect(); - r_formatted.sort(); - l_formatted.sort(); + let l_formatted: Vec = l.iter().map(|i| i.to_string()).collect(); assert_eq!(l_formatted, r_formatted); } @@ -676,7 +677,7 @@ mod tests { schema: Arc::new(dfschema), }); - let mut unnest_placeholder_columns = HashMap::new(); + let mut unnest_placeholder_columns = IndexMap::new(); let mut inner_projection_exprs = vec![]; // unnest(unnest(3d_col)) + unnest(unnest(3d_col)) @@ -780,7 +781,7 @@ mod tests { schema: Arc::new(dfschema), }); - let mut unnest_placeholder_columns = HashMap::new(); + let mut unnest_placeholder_columns = IndexMap::new(); let mut inner_projection_exprs = vec![]; // unnest(struct_col) @@ -892,7 +893,7 @@ mod tests { schema: Arc::new(dfschema), }); - let mut unnest_placeholder_columns = HashMap::new(); + let mut unnest_placeholder_columns = IndexMap::new(); let mut inner_projection_exprs = vec![]; // An expr with multiple unnest From e6aef1f09453f34a6c43649565c884fc7a011d90 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 20 Oct 2024 10:55:13 +0200 Subject: [PATCH 10/12] chore: compile err --- datafusion/physical-plan/src/unnest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 9135b526582c..2311541816f3 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -1176,7 +1176,7 @@ mod tests { ) -> datafusion_common::Result<()> { let options = UnnestOptions { preserve_nulls, - recursions: None, + recursions: vec![], }; let longest_length = find_longest_length(list_arrays, &options)?; let expected_array = Int64Array::from(expected); From c9eb970e97201fb0184c449b8d6f7d9a45c79ea1 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 20 Oct 2024 11:06:14 +0200 Subject: [PATCH 11/12] chore: update cargo --- datafusion-cli/Cargo.lock | 1 + datafusion/common/src/unnest.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index dfd07a7658ff..08d5d4843c62 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1571,6 +1571,7 @@ dependencies = [ "arrow-schema", "datafusion-common", "datafusion-expr", + "indexmap", "log", "regex", "sqlparser", diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index 7ae1d06a68e3..db48edd06160 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -77,7 +77,7 @@ pub struct UnnestOptions { } /// Instruction on how to unnest a column (mostly with a list type) -/// such as how to name the output, and how many times it should be unnested +/// such as how to name the output, and how many level it should be unnested #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] pub struct RecursionUnnestOption { pub input_column: Column, From 52db541a012a668c52d0b67e5c40ab0416cf021f Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 20 Oct 2024 11:44:31 +0200 Subject: [PATCH 12/12] chore: fmt cargotoml --- datafusion/sql/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index b29e3e9468dd..90be576a884e 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -46,11 +46,11 @@ arrow-array = { workspace = true } arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +indexmap = { workspace = true } log = { workspace = true } regex = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } -indexmap = { workspace = true } [dev-dependencies] ctor = { workspace = true }