Skip to content

Commit 4093afb

Browse files
committed
Preserve input's equivalence properties in UnnestExec
1 parent 6c81ca0 commit 4093afb

File tree

2 files changed

+307
-7
lines changed

2 files changed

+307
-7
lines changed

datafusion/physical-plan/src/unnest.rs

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,19 @@ use arrow::record_batch::RecordBatch;
4343
use arrow_ord::cmp::lt;
4444
use async_trait::async_trait;
4545
use datafusion_common::{
46-
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
46+
exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result,
47+
UnnestOptions,
4748
};
4849
use datafusion_execution::TaskContext;
49-
use datafusion_physical_expr::EquivalenceProperties;
50+
use datafusion_physical_expr::equivalence::ProjectionMapping;
51+
use datafusion_physical_expr::expressions::Column;
52+
use datafusion_physical_expr::PhysicalExpr;
5053
use futures::{Stream, StreamExt};
5154
use log::trace;
5255

5356
/// Unnest the given columns (either with type struct or list)
54-
/// For list unnesting, each rows is vertically transformed into multiple rows
55-
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
57+
/// For list unnesting, each row is vertically transformed into multiple rows
58+
/// For struct unnesting, each column is horizontally transformed into multiple columns,
5659
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
5760
///
5861
/// See [`UnnestOptions`] for more details and an example.
@@ -83,7 +86,12 @@ impl UnnestExec {
8386
schema: SchemaRef,
8487
options: UnnestOptions,
8588
) -> Self {
86-
let cache = Self::compute_properties(&input, Arc::clone(&schema));
89+
let cache = Self::compute_properties(
90+
&input,
91+
&list_column_indices,
92+
&struct_column_indices,
93+
Arc::clone(&schema),
94+
);
8795

8896
UnnestExec {
8997
input,
@@ -99,11 +107,64 @@ impl UnnestExec {
99107
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100108
fn compute_properties(
101109
input: &Arc<dyn ExecutionPlan>,
110+
list_column_indices: &[ListUnnest],
111+
struct_column_indices: &[usize],
102112
schema: SchemaRef,
103113
) -> PlanProperties {
114+
let list_column_indices: Vec<usize> = list_column_indices
115+
.iter()
116+
.map(|list_unnest| list_unnest.index_in_input_schema)
117+
.collect();
118+
let non_unnested_indices: Vec<usize> = input
119+
.schema()
120+
.fields()
121+
.iter()
122+
.enumerate()
123+
.filter(|(idx, _)| {
124+
!list_column_indices.contains(idx) && !struct_column_indices.contains(idx)
125+
})
126+
.map(|(idx, _)| idx)
127+
.collect();
128+
129+
// Manually build projection mapping from non-unnested input columns to their positions in the output
130+
let input_schema = input.schema();
131+
let projection_mapping: ProjectionMapping = non_unnested_indices
132+
.iter()
133+
.map(|&input_idx| {
134+
// Find what index the input column has in the output schema
135+
let input_field = input_schema.field(input_idx);
136+
let output_idx = schema
137+
.fields()
138+
.iter()
139+
.position(|output_field| output_field.name() == input_field.name())
140+
.expect("Non-unnested column must exist in output schema");
141+
142+
let input_col = Arc::new(Column::new(input_field.name(), input_idx))
143+
as Arc<dyn PhysicalExpr>;
144+
let target_col = Arc::new(Column::new(input_field.name(), output_idx))
145+
as Arc<dyn PhysicalExpr>;
146+
// Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
147+
let targets = vec![(target_col, output_idx)].into();
148+
(input_col, targets)
149+
})
150+
.collect(); // Use FromIterator to collect the projection mapping
151+
152+
// Create the unnest's equivalence properties by copying the input plan's equivalence properties
153+
// for the unaffected columns. Except for the constraints, which are removed entirely because
154+
// the unnest operation invalidates any global uniqueness or primary-key constraints.
155+
let input_eq_properties = input.equivalence_properties();
156+
let eq_properties = input_eq_properties
157+
.project(&projection_mapping, Arc::clone(&schema))
158+
.with_constraints(Constraints::default());
159+
160+
// Output partitioning must use the projection mapping
161+
let output_partitioning = input
162+
.output_partitioning()
163+
.project(&projection_mapping, &eq_properties);
164+
104165
PlanProperties::new(
105-
EquivalenceProperties::new(schema),
106-
input.output_partitioning().to_owned(),
166+
eq_properties,
167+
output_partitioning,
107168
input.pipeline_behavior(),
108169
input.boundedness(),
109170
)

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,3 +941,242 @@ where min_height * width1 = (
941941
)
942942
----
943943
4 7 4 28
944+
945+
## Unnest with ordering on unrelated column is preserved
946+
query TT
947+
EXPLAIN WITH unnested AS (SELECT
948+
ROW_NUMBER() OVER () AS generated_id,
949+
unnest(array[value]) as ar
950+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
951+
----
952+
logical_plan
953+
01)Projection: array_agg(unnested.ar)
954+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
955+
03)----SubqueryAlias: unnested
956+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
957+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
958+
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
959+
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
960+
08)--------------TableScan: range() projection=[value]
961+
physical_plan
962+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
963+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
964+
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
965+
04)------CoalesceBatchesExec: target_batch_size=8192
966+
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
967+
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
968+
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
969+
08)--------------UnnestExec
970+
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
971+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
972+
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
973+
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
974+
975+
## Unnest with ordering on unrelated column is preserved
976+
query TT
977+
EXPLAIN WITH unnested AS (SELECT
978+
ROW_NUMBER() OVER () AS generated_id,
979+
unnest(array[value]) as ar
980+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
981+
----
982+
logical_plan
983+
01)Projection: array_agg(unnested.ar)
984+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
985+
03)----SubqueryAlias: unnested
986+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
987+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
988+
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
989+
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
990+
08)--------------TableScan: range() projection=[value]
991+
physical_plan
992+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
993+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
994+
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
995+
04)------CoalesceBatchesExec: target_batch_size=8192
996+
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
997+
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
998+
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
999+
08)--------------UnnestExec
1000+
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
1001+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
1002+
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
1003+
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
1004+
1005+
# Unnest array where data is already ordered by column2 (100, 200, 300, 400)
1006+
statement ok
1007+
COPY (
1008+
SELECT * FROM VALUES
1009+
([1,2,3], 100),
1010+
([3], 200),
1011+
([], 300),
1012+
([3,1], 400)
1013+
ORDER BY column2
1014+
) TO 'test_files/scratch/unnest/ordered_array.parquet';
1015+
1016+
statement ok
1017+
CREATE EXTERNAL TABLE t
1018+
STORED AS PARQUET
1019+
LOCATION 'test_files/scratch/unnest/ordered_array.parquet'
1020+
WITH ORDER (column2)
1021+
1022+
query ?I
1023+
SELECT * FROM t;
1024+
----
1025+
[1, 2, 3] 100
1026+
[3] 200
1027+
[] 300
1028+
[3, 1] 400
1029+
1030+
# Data is sorted on column2 already, so no need to sort again
1031+
query II
1032+
SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
1033+
----
1034+
1 100
1035+
2 100
1036+
3 100
1037+
3 200
1038+
3 400
1039+
1 400
1040+
1041+
# Explain should not have a SortExec
1042+
query TT
1043+
EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
1044+
----
1045+
logical_plan
1046+
01)Sort: t.column2 ASC NULLS LAST
1047+
02)--Projection: __unnest_placeholder(t.column1,depth=1) AS UNNEST(t.column1), t.column2
1048+
03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=1] structs[]
1049+
04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2
1050+
05)--------TableScan: t projection=[column1, column2]
1051+
physical_plan
1052+
01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=1)@0 as UNNEST(t.column1), column2@1 as column2]
1053+
02)--UnnestExec
1054+
03)----ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2]
1055+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_array.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet
1056+
1057+
# cleanup
1058+
statement ok
1059+
drop table t;
1060+
1061+
# Unnest struct where data is already ordered by column2 (100, 200, 300, 400)
1062+
statement ok
1063+
COPY (
1064+
SELECT * FROM VALUES
1065+
(named_struct('s1', 1, 's2', 2, 's3', 3), 100),
1066+
(named_struct('s1', 1, 's2', 3, 's3', 2), 200),
1067+
(named_struct('s1', 2, 's2', 1, 's3', 3), 300),
1068+
(named_struct('s1', 3, 's2', 2, 's3', 1), 400)
1069+
ORDER BY column2
1070+
) TO 'test_files/scratch/unnest/ordered_struct.parquet';
1071+
1072+
statement ok
1073+
CREATE EXTERNAL TABLE t
1074+
STORED AS PARQUET
1075+
LOCATION 'test_files/scratch/unnest/ordered_struct.parquet'
1076+
WITH ORDER (column2)
1077+
1078+
query ?I
1079+
SELECT * FROM t;
1080+
----
1081+
{s1: 1, s2: 2, s3: 3} 100
1082+
{s1: 1, s2: 3, s3: 2} 200
1083+
{s1: 2, s2: 1, s3: 3} 300
1084+
{s1: 3, s2: 2, s3: 1} 400
1085+
1086+
# data is sorted on column2 already, so no need to sort again
1087+
query IIII
1088+
SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
1089+
----
1090+
1 2 3 100
1091+
1 3 2 200
1092+
2 1 3 300
1093+
3 2 1 400
1094+
1095+
# Explain should not have a SortExec
1096+
query TT
1097+
EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
1098+
----
1099+
logical_plan
1100+
01)Sort: t.column2 ASC NULLS LAST
1101+
02)--Unnest: lists[] structs[__unnest_placeholder(t.column1)]
1102+
03)----Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2
1103+
04)------TableScan: t projection=[column1, column2]
1104+
physical_plan
1105+
01)UnnestExec
1106+
02)--ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as column2]
1107+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet
1108+
1109+
# cleanup
1110+
statement ok
1111+
drop table t;
1112+
1113+
# Unnest nested array (unnesting twice), struct, and array, where data is already ordered by column4 (100, 200, 300, 400)
1114+
statement ok
1115+
COPY (
1116+
SELECT * FROM VALUES
1117+
([[1],[2],[3]], [1,2,3], named_struct('s1', 1, 's2', 2, 's3', 3), 100),
1118+
([[1],[3],[2]], [3], named_struct('s1', 1, 's2', 3, 's3', 2), 200),
1119+
([[2],[1],[3]], [], named_struct('s1', 2, 's2', 1, 's3', 3), 300),
1120+
([[3],[2],[1]], [3,1], named_struct('s1', 3, 's2', 2, 's3', 1), 400)
1121+
ORDER BY column4
1122+
) TO 'test_files/scratch/unnest/ordered_struct_arrays.parquet';
1123+
1124+
statement ok
1125+
CREATE EXTERNAL TABLE t
1126+
STORED AS PARQUET
1127+
LOCATION 'test_files/scratch/unnest/ordered_struct_arrays.parquet'
1128+
WITH ORDER (column4)
1129+
1130+
query ???I
1131+
SELECT * FROM t;
1132+
----
1133+
[[1], [2], [3]] [1, 2, 3] {s1: 1, s2: 2, s3: 3} 100
1134+
[[1], [3], [2]] [3] {s1: 1, s2: 3, s3: 2} 200
1135+
[[2], [1], [3]] [] {s1: 2, s2: 1, s3: 3} 300
1136+
[[3], [2], [1]] [3, 1] {s1: 3, s2: 2, s3: 1} 400
1137+
1138+
# data is sorted on column4 already, so no need to sort again
1139+
query IIIIII
1140+
SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column4 FROM t ORDER BY column4;
1141+
----
1142+
1 1 1 2 3 100
1143+
NULL 2 1 2 3 100
1144+
NULL 3 1 2 3 100
1145+
2 1 1 2 3 100
1146+
NULL 2 1 2 3 100
1147+
NULL 3 1 2 3 100
1148+
3 1 1 2 3 100
1149+
NULL 2 1 2 3 100
1150+
NULL 3 1 2 3 100
1151+
1 3 1 3 2 200
1152+
3 3 1 3 2 200
1153+
2 3 1 3 2 200
1154+
2 NULL 2 1 3 300
1155+
1 NULL 2 1 3 300
1156+
3 NULL 2 1 3 300
1157+
3 3 3 2 1 400
1158+
NULL 1 3 2 1 400
1159+
2 3 3 2 1 400
1160+
NULL 1 3 2 1 400
1161+
1 3 3 2 1 400
1162+
NULL 1 3 2 1 400
1163+
1164+
# Explain should not have a SortExec
1165+
query TT
1166+
EXPLAIN SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column4 FROM t ORDER BY column4;
1167+
----
1168+
logical_plan
1169+
01)Sort: t.column4 ASC NULLS LAST
1170+
02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3, t.column4
1171+
03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=2, __unnest_placeholder(t.column2)|depth=1] structs[__unnest_placeholder(t.column3)]
1172+
04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 AS __unnest_placeholder(t.column2), t.column3 AS __unnest_placeholder(t.column3), t.column4
1173+
05)--------TableScan: t projection=[column1, column2, column3, column4]
1174+
physical_plan
1175+
01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2@3 as __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3@4 as __unnest_placeholder(t.column3).s3, column4@5 as column4]
1176+
02)--UnnestExec
1177+
03)----ProjectionExec: expr=[column1@0 as __unnest_placeholder(t.column1), column2@1 as __unnest_placeholder(t.column2), column3@2 as __unnest_placeholder(t.column3), column4@3 as column4]
1178+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct_arrays.parquet]]}, projection=[column1, column2, column3, column4], output_ordering=[column4@3 ASC NULLS LAST], file_type=parquet
1179+
1180+
# cleanup
1181+
statement ok
1182+
drop table t;

0 commit comments

Comments
 (0)