Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logical vs physical schema mismatch for UNION where some inputs are constants #12954

Merged
merged 6 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,35 +468,41 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
}

fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
let first_schema = inputs[0].schema();

let fields = (0..first_schema.fields().len())
.map(|i| {
inputs
.iter()
.filter_map(|input| {
if input.schema().fields().len() > i {
let field = input.schema().field(i).clone();
let right_hand_metdata = inputs
.get(1)
.map(|right_input| {
right_input.schema().field(i).metadata().clone()
})
.unwrap_or_default();
let mut metadata = field.metadata().clone();
metadata.extend(right_hand_metdata);
Some(field.with_metadata(metadata))
} else {
None
}
.enumerate()
.map(|(input_idx, input)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to recommend extracting this logic into a function that is commented to help explain what it is doing -- specifically I think it is trying to get the first non-null metadata from any previous input

Reading it more closely, though, doesn't this code assume there are exactly 2 inputs to the Union? What if there are more than 2 inputs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that. It would be helpful to add some inline comments, or possibly extract it into another function

let field = input.schema().field(i).clone();
let mut metadata = field.metadata().clone();

let other_metadatas = inputs
.iter()
.enumerate()
.filter(|(other_idx, _)| *other_idx != input_idx)
.flat_map(|(_, other_input)| {
other_input.schema().field(i).metadata().clone().into_iter()
});

metadata.extend(other_metadatas);
field.with_metadata(metadata)
})
.find_or_first(|f| f.is_nullable())
.find_or_first(Field::is_nullable)
// We can unwrap this because if inputs was empty, this would've already panic'ed when we
// indexed into inputs[0].
.unwrap()
})
.collect::<Vec<_>>();

let all_metadata_merged = inputs
.iter()
.flat_map(|i| i.schema().metadata().clone().into_iter())
.collect();

Arc::new(Schema::new_with_metadata(
fields,
inputs[0].schema().metadata().clone(),
))
Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))
}

/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
Expand Down
21 changes: 18 additions & 3 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,27 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
String::from("metadata_key"),
String::from("the l_name field"),
)]));

let ts = Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
.with_metadata(HashMap::from([(
String::from("metadata_key"),
String::from("ts non-nullable field"),
)]));

let schema =
Schema::new(vec![id, name, l_name, ts]).with_metadata(HashMap::from([(
let nonnull_name =
Field::new("nonnull_name", DataType::Utf8, false).with_metadata(HashMap::from([
(
String::from("metadata_key"),
String::from("the nonnull_name field"),
),
]));

let schema = Schema::new(vec![id, name, l_name, ts, nonnull_name]).with_metadata(
HashMap::from([(
String::from("metadata_key"),
String::from("the entire schema"),
)]));
)]),
);

let batch = RecordBatch::try_new(
Arc::new(schema),
Expand All @@ -342,6 +352,11 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
1599572549190855123,
1599572549190855123,
])) as _,
Arc::new(StringArray::from(vec![
Some("no_foo"),
Some("no_bar"),
Some("no_baz"),
])) as _,
],
)
.unwrap();
Expand Down
27 changes: 27 additions & 0 deletions datafusion/sqllogictest/test_files/metadata.slt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,34 @@ ORDER BY id, name, l_name;
NULL bar NULL
NULL NULL l_bar

# Regression test: missing field metadata from left side of the union when right side is chosen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this query fails without the code changes in this PR:

External error: query failed: DataFusion error: Schema error: No field named nonnull_name. Valid fields are table_with_metadata.id, table_with_metadata.name, table_with_metadata.l_name.
[SQL] select name from (
  SELECT nonnull_name as name FROM "table_with_metadata"
  UNION ALL
  SELECT NULL::string as name
) group by name order by name;
at test_files/metadata.slt:127

Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

query T
select name from (
SELECT nonnull_name as name FROM "table_with_metadata"
UNION ALL
SELECT NULL::string as name
) group by name order by name;
----
no_bar
no_baz
no_foo
NULL

# Regression test: missing schema metadata from union when schema with metadata isn't the first one
# and also ensure it works fine with multiple unions
query T
select name from (
SELECT NULL::string as name
UNION ALL
SELECT nonnull_name as name FROM "table_with_metadata"
UNION ALL
SELECT NULL::string as name
) group by name order by name;
----
no_bar
no_baz
no_foo
NULL

query P rowsort
SELECT ts
Expand Down
Loading