Skip to content

Commit a17ec47

Browse files
committed
Preserve input's equivalence properties in UnnestExec
1 parent 6d9b76e commit a17ec47

File tree

2 files changed

+70
-6
lines changed

2 files changed

+70
-6
lines changed

datafusion/physical-plan/src/unnest.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,17 @@ use arrow::record_batch::RecordBatch;
4343
use arrow_ord::cmp::lt;
4444
use async_trait::async_trait;
4545
use datafusion_common::{
46-
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
46+
exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result,
47+
UnnestOptions,
4748
};
4849
use datafusion_execution::TaskContext;
49-
use datafusion_physical_expr::EquivalenceProperties;
50+
use datafusion_physical_expr::equivalence::ProjectionMapping;
5051
use futures::{Stream, StreamExt};
5152
use log::trace;
5253

5354
/// Unnest the given columns (either with type struct or list)
54-
/// For list unnesting, each rows is vertically transformed into multiple rows
55-
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
55+
/// For list unnesting, each row is vertically transformed into multiple rows
56+
/// For struct unnesting, each column is horizontally transformed into multiple columns,
5657
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
5758
///
5859
/// See [`UnnestOptions`] for more details and an example.
@@ -83,7 +84,12 @@ impl UnnestExec {
8384
schema: SchemaRef,
8485
options: UnnestOptions,
8586
) -> Self {
86-
let cache = Self::compute_properties(&input, Arc::clone(&schema));
87+
let cache = Self::compute_properties(
88+
&input,
89+
&list_column_indices,
90+
&struct_column_indices,
91+
Arc::clone(&schema),
92+
);
8793

8894
UnnestExec {
8995
input,
@@ -99,10 +105,38 @@ impl UnnestExec {
99105
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100106
fn compute_properties(
101107
input: &Arc<dyn ExecutionPlan>,
108+
list_column_indices: &[ListUnnest],
109+
struct_column_indices: &[usize],
102110
schema: SchemaRef,
103111
) -> PlanProperties {
112+
let list_column_indices: Vec<usize> = list_column_indices
113+
.iter()
114+
.map(|list_unnest| list_unnest.index_in_input_schema)
115+
.collect();
116+
let non_unnested_indices: Vec<usize> = input
117+
.schema()
118+
.fields()
119+
.iter()
120+
.enumerate()
121+
.filter(|(idx, _)| {
122+
!list_column_indices.contains(idx) && !struct_column_indices.contains(idx)
123+
})
124+
.map(|(idx, _)| idx)
125+
.collect();
126+
127+
// Create the unnest equivalence properties by copying the input plan's equivalence properties
128+
// for the unaffected columns.
129+
// Except for the constraints, which are removed entirely because the unnest operation invalidates
130+
// any global uniqueness or primary-key constraints.
131+
let input_eq_properties = input.equivalence_properties();
132+
let projection_mapping =
133+
ProjectionMapping::from_indices(&non_unnested_indices, &schema).unwrap();
134+
let eq_properties = input_eq_properties
135+
.project(&projection_mapping, schema.clone())
136+
// Remove any existing constraints
137+
.with_constraints(Constraints::default());
104138
PlanProperties::new(
105-
EquivalenceProperties::new(schema),
139+
eq_properties,
106140
input.output_partitioning().to_owned(),
107141
input.pipeline_behavior(),
108142
input.boundedness(),

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,3 +941,33 @@ where min_height * width1 = (
941941
)
942942
----
943943
4 7 4 28
944+
945+
## Unnest with ordering on unrelated column is preserved
946+
query TT
947+
EXPLAIN WITH unnested AS (SELECT
948+
ROW_NUMBER() OVER () AS generated_id,
949+
unnest(array[value]) as ar
950+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
951+
----
952+
logical_plan
953+
01)Projection: array_agg(unnested.ar)
954+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
955+
03)----SubqueryAlias: unnested
956+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
957+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
958+
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
959+
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
960+
08)--------------TableScan: range() projection=[value]
961+
physical_plan
962+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
963+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
964+
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
965+
04)------CoalesceBatchesExec: target_batch_size=8192
966+
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
967+
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
968+
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
969+
08)--------------UnnestExec
970+
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
971+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
972+
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
973+
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

0 commit comments

Comments
 (0)