Skip to content

Commit 7527f16

Browse files
committed
Preserve input's equivalence properties in UnnestExec
1 parent aab44fd commit 7527f16

File tree

3 files changed

+79
-3
lines changed

3 files changed

+79
-3
lines changed

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ impl OrderingEquivalenceClass {
7474
result
7575
}
7676

77+
/// Returns the orderings in this ordering equivalence class.
78+
pub fn orderings(&self) -> &[LexOrdering] {
79+
&self.orderings
80+
}
81+
7782
/// Extend this ordering equivalence class with the given orderings.
7883
pub fn extend(&mut self, orderings: impl IntoIterator<Item = LexOrdering>) {
7984
self.orderings.extend(orderings);

datafusion/physical-plan/src/unnest.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use futures::{Stream, StreamExt};
5151
use log::trace;
5252

5353
/// Unnest the given columns (either with type struct or list)
54-
/// For list unnesting, each rows is vertically transformed into multiple rows
55-
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
54+
/// For list unnesting, each row is vertically transformed into multiple rows
55+
/// For struct unnesting, each column is horizontally transformed into multiple columns,
5656
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
5757
///
5858
/// See [`UnnestOptions`] for more details and an example.
@@ -101,8 +101,22 @@ impl UnnestExec {
101101
input: &Arc<dyn ExecutionPlan>,
102102
schema: SchemaRef,
103103
) -> PlanProperties {
104+
// Extract equivalence properties from input plan
105+
let input_eq_properties = input.equivalence_properties();
106+
let input_oeq_class = input_eq_properties.oeq_class();
107+
let orderings = input_oeq_class.orderings().to_vec();
108+
let eq_group = input_eq_properties.eq_group();
109+
let constraints = input_eq_properties.constraints();
110+
111+
// Create new equivalence properties for the unnest plan based on the input plan
112+
let mut eq_properties = EquivalenceProperties::new(schema);
113+
eq_properties.add_orderings(orderings);
114+
eq_properties
115+
.add_equivalence_group(eq_group.to_owned())
116+
.unwrap(); // We can unwrap this because we know this is a valid equivalence group
117+
eq_properties = eq_properties.with_constraints(constraints.to_owned());
104118
PlanProperties::new(
105-
EquivalenceProperties::new(schema),
119+
eq_properties,
106120
input.output_partitioning().to_owned(),
107121
input.pipeline_behavior(),
108122
input.boundedness(),

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,3 +941,60 @@ where min_height * width1 = (
941941
)
942942
----
943943
4 7 4 28
944+
945+
## Unnest with ordering on unrelated column is preserved
946+
query TT
947+
EXPLAIN WITH unnested AS (SELECT
948+
ROW_NUMBER() OVER () AS generated_id,
949+
unnest(array[value]) as ar
950+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
951+
----
952+
logical_plan
953+
01)Projection: array_agg(unnested.ar)
954+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
955+
03)----SubqueryAlias: unnested
956+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
957+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
958+
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
959+
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
960+
08)--------------TableScan: range() projection=[value]
961+
physical_plan
962+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
963+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
964+
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
965+
04)------CoalesceBatchesExec: target_batch_size=8192
966+
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
967+
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
968+
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
969+
08)--------------UnnestExec
970+
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
971+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
972+
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
973+
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
974+
975+
## Unnest without ordering
976+
query TT
977+
EXPLAIN WITH unnested AS (SELECT
978+
random() AS generated_id,
979+
unnest(array[value]) as ar
980+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
981+
----
982+
logical_plan
983+
01)Projection: array_agg(unnested.ar)
984+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
985+
03)----SubqueryAlias: unnested
986+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
987+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
988+
06)----------Projection: random() AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
989+
07)------------TableScan: range() projection=[value]
990+
physical_plan
991+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
992+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
993+
03)----CoalesceBatchesExec: target_batch_size=8192
994+
04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
995+
05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
996+
06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
997+
07)------------UnnestExec
998+
08)--------------ProjectionExec: expr=[random() as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
999+
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
1000+
10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

0 commit comments

Comments
 (0)