Skip to content

Commit 92c5607

Browse files
authored
fix: UnnestExec preserves relevant equivalence properties of input (#16985)
## Which issue does this PR close? - Closes #15231. ## What changes are included in this PR? - In `UnnestExec`'s `compute_properties` we now construct its`EquivalenceProperties` using what we can from the input plan, so that we preserve sort ordering of unrelated columns (and avoid unnecessary sorts further up in the plan). ## Are these changes tested? - Adds test cases to the sqllogictests for `UnnestExec` in `unnest.slt` ## Are there any user-facing changes? No ## Explanation Given a struct or array value `col`, `unnest(col)` takes the N elements of `col` and "spreads" these onto N rows, where all other columns in the statement are preserved. Said another way, when we unnest a column we are inserting a lateral cross-join against its elements, which by construction: - Duplicates every other column once for each array/map element - Replaces the original collection column with one (or more) “element” columns - Expands one input row into zero (if empty) or many output rows E.g. (from `unnest.slt`): https://github.com/apache/datafusion/blob/6d9b76e4a30f6234ffa3f8100b5d4c2735558ca6/datafusion/sqllogictest/test_files/unnest.slt#L699-L712 The [`EquivalenceProperties` struct](https://github.com/apache/datafusion/blob/66d6995b8f626f28f811489bd2cb552b6c64a85f/datafusion/physical-expr/src/equivalence/properties/mod.rs#L133-L146) has three types of properties: 1. equivalence groups (expressions with the same value) 2. ordering equivalence classes (expressions that define the same ordering) 3. table constraints - a set of columns that form a primary key or a unique key In this PR we construct the `UnnestExec` node's `EquivalenceProperties` by using the input plan's equivalence properties for the columns that are not transformed - except for table constraints, which we discard entirely. The reasoning for discarding constraints is that because we're duplicating the other columns across rows, we are invalidating any uniqueness or primary-key constraint. We also need to some twiddling with the mapping of the projection (indices change due to the unnesting).
1 parent 408e1e4 commit 92c5607

File tree

5 files changed

+357
-19
lines changed

5 files changed

+357
-19
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ impl DefaultPhysicalPlanner {
989989
struct_type_columns.clone(),
990990
schema,
991991
options.clone(),
992-
))
992+
)?)
993993
}
994994

995995
// 2 Children

datafusion/physical-plan/src/unnest.rs

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use crate::{
3232
};
3333

3434
use arrow::array::{
35-
new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array,
36-
LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
35+
new_null_array, Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray,
36+
Int64Array, LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
3737
};
3838
use arrow::compute::kernels::length::length;
3939
use arrow::compute::kernels::zip::zip;
@@ -43,16 +43,19 @@ use arrow::record_batch::RecordBatch;
4343
use arrow_ord::cmp::lt;
4444
use async_trait::async_trait;
4545
use datafusion_common::{
46-
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
46+
exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result,
47+
UnnestOptions,
4748
};
4849
use datafusion_execution::TaskContext;
49-
use datafusion_physical_expr::EquivalenceProperties;
50+
use datafusion_physical_expr::equivalence::ProjectionMapping;
51+
use datafusion_physical_expr::expressions::Column;
52+
use datafusion_physical_expr::PhysicalExpr;
5053
use futures::{Stream, StreamExt};
5154
use log::trace;
5255

5356
/// Unnest the given columns (either with type struct or list)
54-
/// For list unnesting, each rows is vertically transformed into multiple rows
55-
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
57+
/// For list unnesting, each row is vertically transformed into multiple rows
58+
/// For struct unnesting, each column is horizontally transformed into multiple columns,
5659
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
5760
///
5861
/// See [`UnnestOptions`] for more details and an example.
@@ -82,31 +85,94 @@ impl UnnestExec {
8285
struct_column_indices: Vec<usize>,
8386
schema: SchemaRef,
8487
options: UnnestOptions,
85-
) -> Self {
86-
let cache = Self::compute_properties(&input, Arc::clone(&schema));
88+
) -> Result<Self> {
89+
let cache = Self::compute_properties(
90+
&input,
91+
&list_column_indices,
92+
&struct_column_indices,
93+
Arc::clone(&schema),
94+
)?;
8795

88-
UnnestExec {
96+
Ok(UnnestExec {
8997
input,
9098
schema,
9199
list_column_indices,
92100
struct_column_indices,
93101
options,
94102
metrics: Default::default(),
95103
cache,
96-
}
104+
})
97105
}
98106

99107
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100108
fn compute_properties(
101109
input: &Arc<dyn ExecutionPlan>,
110+
list_column_indices: &[ListUnnest],
111+
struct_column_indices: &[usize],
102112
schema: SchemaRef,
103-
) -> PlanProperties {
104-
PlanProperties::new(
105-
EquivalenceProperties::new(schema),
106-
input.output_partitioning().to_owned(),
113+
) -> Result<PlanProperties> {
114+
// Find out which indices are not unnested, such that they can be copied over from the input plan
115+
let input_schema = input.schema();
116+
let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117+
unnested_indices.append_n(input_schema.fields().len(), false);
118+
for list_unnest in list_column_indices {
119+
unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120+
}
121+
for struct_unnest in struct_column_indices {
122+
unnested_indices.set_bit(*struct_unnest, true)
123+
}
124+
let unnested_indices = unnested_indices.finish();
125+
let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126+
.filter(|idx| !unnested_indices.value(*idx))
127+
.collect();
128+
129+
// Manually build projection mapping from non-unnested input columns to their positions in the output
130+
let input_schema = input.schema();
131+
let projection_mapping: ProjectionMapping = non_unnested_indices
132+
.iter()
133+
.map(|&input_idx| {
134+
// Find what index the input column has in the output schema
135+
let input_field = input_schema.field(input_idx);
136+
let output_idx = schema
137+
.fields()
138+
.iter()
139+
.position(|output_field| output_field.name() == input_field.name())
140+
.ok_or_else(|| {
141+
exec_datafusion_err!(
142+
"Non-unnested column '{}' must exist in output schema",
143+
input_field.name()
144+
)
145+
})?;
146+
147+
let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148+
as Arc<dyn PhysicalExpr>;
149+
let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150+
as Arc<dyn PhysicalExpr>;
151+
// Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
152+
let targets = vec![(target_col, output_idx)].into();
153+
Ok((input_col, targets))
154+
})
155+
.collect::<Result<ProjectionMapping>>()?;
156+
157+
// Create the unnest's equivalence properties by copying the input plan's equivalence properties
158+
// for the unaffected columns. Except for the constraints, which are removed entirely because
159+
// the unnest operation invalidates any global uniqueness or primary-key constraints.
160+
let input_eq_properties = input.equivalence_properties();
161+
let eq_properties = input_eq_properties
162+
.project(&projection_mapping, Arc::clone(&schema))
163+
.with_constraints(Constraints::default());
164+
165+
// Output partitioning must use the projection mapping
166+
let output_partitioning = input
167+
.output_partitioning()
168+
.project(&projection_mapping, &eq_properties);
169+
170+
Ok(PlanProperties::new(
171+
eq_properties,
172+
output_partitioning,
107173
input.pipeline_behavior(),
108174
input.boundedness(),
109-
)
175+
))
110176
}
111177

112178
/// Input execution plan
@@ -173,7 +239,7 @@ impl ExecutionPlan for UnnestExec {
173239
self.struct_column_indices.clone(),
174240
Arc::clone(&self.schema),
175241
self.options.clone(),
176-
)))
242+
)?))
177243
}
178244

179245
fn required_input_distribution(&self) -> Vec<Distribution> {

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1759,7 +1759,7 @@ impl protobuf::PhysicalPlanNode {
17591759
unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
17601760
Arc::new(convert_required!(unnest.schema)?),
17611761
into_required!(unnest.options)?,
1762-
)))
1762+
)?))
17631763
}
17641764

17651765
fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1716,7 +1716,7 @@ fn roundtrip_unnest() -> Result<()> {
17161716
vec![2, 4],
17171717
output_schema,
17181718
options,
1719-
);
1719+
)?;
17201720
roundtrip_test(Arc::new(unnest))
17211721
}
17221722

0 commit comments

Comments
 (0)