Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve recursive unnest options API #12836

Merged
merged 14 commits into from
Oct 20, 2024
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::SchemaReference;
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::UnnestOptions;
pub use unnest::{RecursionUnnestOption, UnnestOptions};
pub use utils::project_schema;

// These are hidden from docs purely to avoid polluting the public view of what this crate exports.
Expand Down
21 changes: 20 additions & 1 deletion datafusion/common/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`UnnestOptions`] for unnesting structured types
use crate::Column;

/// Options for unnesting a column that contains a list type,
/// replicating values in the other, non nested rows.
Expand Down Expand Up @@ -64,13 +64,24 @@
pub struct UnnestOptions {
/// Should nulls in the input be preserved? Defaults to true
pub preserve_nulls: bool,
/// Without explicit recursions, all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document what the Vec means in this context? Is the option for each of the columns in the schema? If so can you say it is expected to have the same number of elements?

If possible it would also be great to update the documentation on this struct to explain what is going on with recursive options

/// Datafusion will infer the unesting type with depth = 1
pub recursions: Option<Vec<RecursionUnnestOption>>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
pub struct RecursionUnnestOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add documentation to this structure as it is public?

pub input_column: Column,
pub output_column: Column,
pub depth: usize,
}

impl Default for UnnestOptions {
fn default() -> Self {
Self {
// default to true to maintain backwards compatible behavior
preserve_nulls: true,
recursions: None,
}
}
}
Expand All @@ -87,4 +98,12 @@ impl UnnestOptions {
self.preserve_nulls = preserve_nulls;
self
}

/// Set the recursions for the unnest operation
pub fn with_recursions(mut self, recursions: Vec<RecursionUnnestOption>) -> Self {
if !recursions.is_empty() {
self.recursions = Some(recursions);
}
self
}
}
207 changes: 90 additions & 117 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion_common::{
TableReference, ToDFSchema, UnnestOptions,
};

use super::plan::{ColumnUnnestList, ColumnUnnestType};
use super::plan::ColumnUnnestList;

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";
Expand Down Expand Up @@ -1185,7 +1185,7 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
vec![(column.into(), ColumnUnnestType::Inferred)],
vec![column.into()],
options,
)
.map(Self::new)
Expand All @@ -1196,26 +1196,6 @@ impl LogicalPlanBuilder {
self,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
columns
.into_iter()
.map(|c| (c, ColumnUnnestType::Inferred))
.collect(),
options,
)
.map(Self::new)
}

/// Unnest the given columns with the given [`UnnestOptions`]
/// if one column is a list type, it can be recursively and simultaneously
/// unnested into the desired recursion levels
/// e.g select unnest(list_col,depth=1), unnest(list_col,depth=2)
pub fn unnest_columns_recursive_with_options(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self,
columns: Vec<(Column, ColumnUnnestType)>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
Expand Down Expand Up @@ -1593,14 +1573,13 @@ impl TableSource for LogicalTableSource {

/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
let unnestings = columns
.into_iter()
.map(|c| (c, ColumnUnnestType::Inferred))
.collect();
let unnestings = columns.into_iter().map(|c| c).collect();
unnest_with_options(input, unnestings, UnnestOptions::default())
}

pub fn get_unnested_list_datatype_recursive(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this method, recursion specification can be declared using UnnestOptions

// get the data type of a multi-dimensional type after unnesting it
// with a given depth
fn get_unnested_list_datatype_recursive(
data_type: &DataType,
depth: usize,
) -> Result<DataType> {
Expand All @@ -1619,27 +1598,6 @@ pub fn get_unnested_list_datatype_recursive(
internal_err!("trying to unnest on invalid data type {:?}", data_type)
}

/// Infer the unnest type based on the data type:
/// - list type: infer to unnest(list(col, depth=1))
/// - struct type: infer to unnest(struct)
fn infer_unnest_type(
col_name: &String,
data_type: &DataType,
) -> Result<ColumnUnnestType> {
match data_type {
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
Ok(ColumnUnnestType::List(vec![ColumnUnnestList {
output_column: Column::from_name(col_name),
depth: 1,
}]))
}
DataType::Struct(_) => Ok(ColumnUnnestType::Struct),
_ => {
internal_err!("trying to unnest on invalid data type {:?}", data_type)
}
}
}

pub fn get_struct_unnested_columns(
col_name: &String,
inner_fields: &Fields,
Expand Down Expand Up @@ -1728,20 +1686,15 @@ pub fn get_unnested_columns(
/// ```
pub fn unnest_with_options(
input: LogicalPlan,
columns_to_unnest: Vec<(Column, ColumnUnnestType)>,
columns_to_unnest: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
let mut struct_columns = vec![];
let indices_to_unnest = columns_to_unnest
.iter()
.map(|col_unnesting| {
Ok((
input.schema().index_of_column(&col_unnesting.0)?,
col_unnesting,
))
})
.collect::<Result<HashMap<usize, &(Column, ColumnUnnestType)>>>()?;
.map(|c| Ok((input.schema().index_of_column(c)?, c)))
.collect::<Result<HashMap<usize, &Column>>>()?;

let input_schema = input.schema();

Expand All @@ -1766,51 +1719,75 @@ pub fn unnest_with_options(
.enumerate()
.map(|(index, (original_qualifier, original_field))| {
match indices_to_unnest.get(&index) {
Some((column_to_unnest, unnest_type)) => {
let mut inferred_unnest_type = unnest_type.clone();
if let ColumnUnnestType::Inferred = unnest_type {
inferred_unnest_type = infer_unnest_type(
&column_to_unnest.name,
original_field.data_type(),
)?;
}
let transformed_columns: Vec<(Column, Arc<Field>)> =
match inferred_unnest_type {
ColumnUnnestType::Struct => {
struct_columns.push(index);
get_unnested_columns(
&column_to_unnest.name,
original_field.data_type(),
1,
)?
Some(column_to_unnest) => {
let transformed_columns;
let maybe_explicit_recursion = match options.recursions {
None => None,
Some(ref recursions) => {
let list_recursions_on_column = recursions
.iter()
.filter(|p| -> bool {
&p.input_column == *column_to_unnest
})
.collect::<Vec<_>>();
if list_recursions_on_column.is_empty() {
None
} else {
Some(list_recursions_on_column)
}
ColumnUnnestType::List(unnest_lists) => {
list_columns.extend(
unnest_lists
.iter()
.map(|ul| (index, ul.to_owned().clone())),
);
unnest_lists
.iter()
.map(
|ColumnUnnestList {
output_column,
depth,
}| {
get_unnested_columns(
&output_column.name,
original_field.data_type(),
*depth,
)
}
};
match maybe_explicit_recursion {
Some(explicit_recursion) => {
transformed_columns = explicit_recursion
.iter()
.map(|r| {
list_columns.push((
index,
ColumnUnnestList {
output_column: r.output_column.clone(),
depth: r.depth,
},
));
get_unnested_columns(
&r.output_column.name,
original_field.data_type(),
r.depth,
)
.collect::<Result<Vec<Vec<(Column, Arc<Field>)>>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>()
}
_ => return internal_err!("Invalid unnest type"),
};
})
.collect::<Result<Vec<Vec<(Column, Arc<Field>)>>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
}
None => {
transformed_columns = get_unnested_columns(
&column_to_unnest.name,
original_field.data_type(),
1,
)?;
match original_field.data_type() {
DataType::Struct(_) => {
struct_columns.push(index);
}
DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_) => {
list_columns.push((
index,
ColumnUnnestList {
output_column: Column::from_name(
&column_to_unnest.name,
),
depth: 1,
},
));
}
_ => {}
};
}
}

// new columns dependent on the same original index
dependency_indices
.extend(std::iter::repeat(index).take(transformed_columns.len()));
Expand Down Expand Up @@ -1859,7 +1836,7 @@ mod tests {
use crate::logical_plan::StringifiedPlan;
use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};

use datafusion_common::SchemaError;
use datafusion_common::{RecursionUnnestOption, SchemaError};

#[test]
fn plan_builder_simple() -> Result<()> {
Expand Down Expand Up @@ -2267,24 +2244,20 @@ mod tests {

// Simultaneously unnesting a list (with different depth) and a struct column
let plan = nested_table_scan("test_table")?
.unnest_columns_recursive_with_options(
vec![
(
"stringss".into(),
ColumnUnnestType::List(vec![
ColumnUnnestList {
output_column: Column::from_name("stringss_depth_1"),
depth: 1,
},
ColumnUnnestList {
output_column: Column::from_name("stringss_depth_2"),
depth: 2,
},
]),
),
("struct_singular".into(), ColumnUnnestType::Inferred),
],
UnnestOptions::default(),
.unnest_columns_with_options(
vec!["stringss".into(), "struct_singular".into()],
UnnestOptions::default().with_recursions(vec![
RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_1".into(),
depth: 1,
},
RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_2".into(),
depth: 2,
},
]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another potential way for this API to look would be to take individual options rather than a single Vec

I don't feel strongly about this

Suggested change
UnnestOptions::default().with_recursions(vec![
RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_1".into(),
depth: 1,
},
RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_2".into(),
depth: 2,
},
]),
UnnestOptions::new()
.with_recursion(
RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_1".into(),
depth: 1,
})
.with_recusion(
RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_2".into(),
depth: 2,
},
),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, i refactored so that the recursion is optional at column level, and since None can be replaced by an empty vector, i simplified the type of this field from Option to Vec

)?
.build()?;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, ColumnUnnestType, CrossJoin,
DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join,
projection_schema, Aggregate, Analyze, ColumnUnnestList, CrossJoin, DescribeTable,
Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
Loading
Loading