-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
The sanity checker for physical plans fails with does not satisfy order requirements
under these very specific conditions (also see reproducer below):
- you have node that requires input ordering (e.g. a SortPreservingMergeExec (SPM)) over a
UnionExec
- the
UnionExec
has at least 2 children - the children may have 1 column that is actually data to-be-sorted (let's call this column
a
), and two constants (calledconst_1
andconst_2
) - the constant values for both are identical in the first child, but differ for the 2nd column for the second child (e.g.
[const_1=foo, const_2=foo], [const_1=foo, const_2=bar]
) - the children are sorted (by
SortExec
) but only for the cola
(because for the constants the sorting is note required)
The resulting error is (newlines added for clarity):
does not satisfy order requirements
[const_1@0 ASC NULLS LAST, const_2@1 ASC NULLS LAST, a@2 ASC NULLS LAST].
Child-0 order: [[a@2 ASC NULLS LAST]]
To Reproduce
You may add this to datafusion/core/tests/physical_optimizer/enforce_sorting.rs
(or a better file, I just originally thought that this is related to the enforce-sorting pass but it's not)
#[tokio::test]
async fn test_kaputt() -> Result<()> {
let schema_in = create_test_schema3().unwrap();
let proj_exprs_1 = vec![
(
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_owned())))) as _,
"const_1".to_owned(),
),
(
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_owned())))) as _,
"const_2".to_owned(),
),
(col("a", &schema_in).unwrap(), "a".to_owned()),
];
let proj_exprs_2 = vec![
(
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_owned())))) as _,
"const_1".to_owned(),
),
(
Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_owned())))) as _,
"const_2".to_owned(),
),
(col("a", &schema_in).unwrap(), "a".to_owned()),
];
let source_1 = memory_exec(&schema_in);
let source_1 = projection_exec(proj_exprs_1.clone(), source_1).unwrap();
let schema_sources = source_1.schema();
let ordering_sources: LexOrdering =
[sort_expr("a", &schema_sources).nulls_last()].into();
let source_1 = sort_exec(ordering_sources.clone(), source_1);
let source_2 = memory_exec(&schema_in);
let source_2 = projection_exec(proj_exprs_2, source_2).unwrap();
let source_2 = sort_exec(ordering_sources.clone(), source_2);
let plan = union_exec(vec![source_1, source_2]);
let schema_out = plan.schema();
let ordering_out: LexOrdering = [
sort_expr("const_1", &schema_out).nulls_last(),
sort_expr("const_2", &schema_out).nulls_last(),
sort_expr("a", &schema_out).nulls_last(),
]
.into();
let plan = sort_preserving_merge_exec(ordering_out, plan);
println!("{}", get_plan_string(&plan).join("\n"));
SanityCheckPlan::new()
.optimize(plan.clone(), &Default::default())
.unwrap();
Ok(())
}
This produces this plan:
SortPreservingMergeExec: [const_1@0 ASC NULLS LAST, const_2@1 ASC NULLS LAST, a@2 ASC NULLS LAST]
UnionExec
SortExec: expr=[a@2 ASC NULLS LAST], preserve_partitioning=[false]
ProjectionExec: expr=[foo as const_1, foo as const_2, a@0 as a]
DataSourceExec: partitions=1, partition_sizes=[0]
SortExec: expr=[a@2 ASC NULLS LAST], preserve_partitioning=[false]
ProjectionExec: expr=[foo as const_1, bar as const_2, a@0 as a]
DataSourceExec: partitions=1, partition_sizes=[0]
which will fail the sanity checker as described above.
Expected behavior
From what I can tell this plan is sound.
Additional context
Tested on da2f9e1 .
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working