Skip to content

Commit 6a8297e

Browse files
committed
merge
2 parents d256caa + d7eebc9 commit 6a8297e

File tree

5 files changed

+169
-29
lines changed

5 files changed

+169
-29
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5274,7 +5274,7 @@ async fn test_dataframe_placeholder_column_parameter() -> Result<()> {
52745274
assert_snapshot!(
52755275
actual,
52765276
@r"
5277-
Projection: Int32(3) AS $1 [$1:Null;N]
5277+
Projection: Int32(3) AS $1 [$1:Int32]
52785278
EmptyRelation: rows=1 []
52795279
"
52805280
);

datafusion/core/tests/sql/select.rs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use std::collections::HashMap;
1919

2020
use super::*;
21-
use datafusion::assert_batches_eq;
2221
use datafusion_common::{metadata::ScalarAndMetadata, ParamValues, ScalarValue};
2322
use insta::assert_snapshot;
2423

@@ -343,26 +342,53 @@ async fn test_query_parameters_with_metadata() -> Result<()> {
343342
]))
344343
.unwrap();
345344

346-
// df_with_params_replaced.schema() is not correct here
347-
// https://github.com/apache/datafusion/issues/18102
348-
let batches = df_with_params_replaced.clone().collect().await.unwrap();
349-
let schema = batches[0].schema();
350-
345+
let schema = df_with_params_replaced.schema();
351346
assert_eq!(schema.field(0).data_type(), &DataType::UInt32);
352347
assert_eq!(schema.field(0).metadata(), &metadata1);
353348
assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
354349
assert_eq!(schema.field(1).metadata(), &metadata2);
355350

356-
assert_batches_eq!(
357-
[
358-
"+----+-----+",
359-
"| $1 | $2 |",
360-
"+----+-----+",
361-
"| 1 | two |",
362-
"+----+-----+",
363-
],
364-
&batches
365-
);
351+
let batches = df_with_params_replaced.collect().await.unwrap();
352+
assert_snapshot!(batches_to_sort_string(&batches), @r"
353+
+----+-----+
354+
| $1 | $2 |
355+
+----+-----+
356+
| 1 | two |
357+
+----+-----+
358+
");
359+
360+
Ok(())
361+
}
362+
363+
/// Test for https://github.com/apache/datafusion/issues/18102
364+
#[tokio::test]
365+
async fn test_query_parameters_in_values_list_relation() -> Result<()> {
366+
let ctx = SessionContext::new();
367+
368+
let df = ctx
369+
.sql("SELECT a, b FROM (VALUES ($1, $2)) AS t(a, b)")
370+
.await
371+
.unwrap();
372+
373+
let df_with_params_replaced = df
374+
.with_param_values(ParamValues::List(vec![
375+
ScalarAndMetadata::new(ScalarValue::UInt32(Some(1)), None),
376+
ScalarAndMetadata::new(ScalarValue::Utf8(Some("two".to_string())), None),
377+
]))
378+
.unwrap();
379+
380+
let schema = df_with_params_replaced.schema();
381+
assert_eq!(schema.field(0).data_type(), &DataType::UInt32);
382+
assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
383+
384+
let batches = df_with_params_replaced.collect().await.unwrap();
385+
assert_snapshot!(batches_to_sort_string(&batches), @r"
386+
+---+-----+
387+
| a | b |
388+
+---+-----+
389+
| 1 | two |
390+
+---+-----+
391+
");
366392

367393
Ok(())
368394
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,9 @@ impl LogicalPlanBuilder {
216216
if values.is_empty() {
217217
return plan_err!("Values list cannot be empty");
218218
}
219+
220+
// values list can have no columns, see: https://github.com/apache/datafusion/pull/12339
219221
let n_cols = values[0].len();
220-
if n_cols == 0 {
221-
return plan_err!("Values list cannot be zero length");
222-
}
223222
for (i, row) in values.iter().enumerate() {
224223
if row.len() != n_cols {
225224
return plan_err!(

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -633,8 +633,43 @@ impl LogicalPlan {
633633
LogicalPlan::Dml(_) => Ok(self),
634634
LogicalPlan::Copy(_) => Ok(self),
635635
LogicalPlan::Values(Values { schema, values }) => {
636-
// todo it isn't clear why the schema is not recomputed here
637-
Ok(LogicalPlan::Values(Values { schema, values }))
636+
// Using `values` alone cannot compute correct schema for the plan. For example:
637+
// Projection: col_1, col_2
638+
// Values: (Float32(1), Float32(10)), (Float32(100), Float32(10))
639+
//
640+
// Thus, we need to recompute a new schema from `values` and retain some
641+
// information from the original schema.
642+
let new_plan = LogicalPlanBuilder::values(values.clone())?.build()?;
643+
644+
let qualified_fields = schema
645+
.iter()
646+
.zip(new_plan.schema().fields())
647+
.map(|((table_ref, old_field), new_field)| {
648+
// `old_field`'s data type is unknown but `new_field`'s is known
649+
if old_field.data_type().is_null()
650+
&& !new_field.data_type().is_null()
651+
{
652+
let field = old_field
653+
.as_ref()
654+
.clone()
655+
.with_data_type(new_field.data_type().clone());
656+
(table_ref.cloned(), Arc::new(field))
657+
} else {
658+
(table_ref.cloned(), Arc::clone(old_field))
659+
}
660+
})
661+
.collect::<Vec<_>>();
662+
663+
let schema = DFSchema::new_with_metadata(
664+
qualified_fields,
665+
schema.metadata().clone(),
666+
)?
667+
.with_functional_dependencies(schema.functional_dependencies().clone())?;
668+
669+
Ok(LogicalPlan::Values(Values {
670+
schema: Arc::new(schema),
671+
values,
672+
}))
638673
}
639674
LogicalPlan::Filter(Filter { predicate, input }) => {
640675
Filter::try_new(predicate, input).map(LogicalPlan::Filter)
@@ -1471,7 +1506,10 @@ impl LogicalPlan {
14711506
// Preserve name to avoid breaking column references to this expression
14721507
Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
14731508
}
1474-
})
1509+
})?
1510+
// always recompute the schema to ensure the changed in the schema's field should be
1511+
// poplulated to the plan's parent
1512+
.map_data(|plan| plan.recompute_schema())
14751513
})
14761514
.map(|res| res.data)
14771515
}
@@ -4247,6 +4285,7 @@ mod tests {
42474285
use super::*;
42484286
use crate::builder::LogicalTableSource;
42494287
use crate::logical_plan::table_scan;
4288+
use crate::select_expr::SelectExpr;
42504289
use crate::test::function_stub::{count, count_udaf};
42514290
use crate::{
42524291
binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
@@ -4825,6 +4864,85 @@ mod tests {
48254864
.expect_err("prepared field metadata mismatch unexpectedly succeeded");
48264865
}
48274866

4867+
#[test]
4868+
fn test_replace_placeholder_values_relation_valid_schema() {
4869+
// SELECT a, b, c, d FROM (VALUES (1), ($1), ($2), ($3 + $4)) AS t(a, b, c, d);
4870+
let plan = LogicalPlanBuilder::values(vec![vec![
4871+
lit(1),
4872+
placeholder("$1"),
4873+
placeholder("$2"),
4874+
binary_expr(placeholder("$3"), Operator::Plus, placeholder("$4")),
4875+
]])
4876+
.unwrap()
4877+
.project(vec![
4878+
col("column1").alias("a"),
4879+
col("column2").alias("b"),
4880+
col("column3").alias("c"),
4881+
col("column4").alias("d"),
4882+
])
4883+
.unwrap()
4884+
.alias("t")
4885+
.unwrap()
4886+
.project(vec![col("a"), col("b"), col("c"), col("d")])
4887+
.unwrap()
4888+
.build()
4889+
.unwrap();
4890+
4891+
// original
4892+
assert_snapshot!(plan.display_indent_schema(), @r"
4893+
Projection: t.a, t.b, t.c, t.d [a:Int32;N, b:Null;N, c:Null;N, d:Int64;N]
4894+
SubqueryAlias: t [a:Int32;N, b:Null;N, c:Null;N, d:Int64;N]
4895+
Projection: column1 AS a, column2 AS b, column3 AS c, column4 AS d [a:Int32;N, b:Null;N, c:Null;N, d:Int64;N]
4896+
Values: (Int32(1), $1, $2, $3 + $4) [column1:Int32;N, column2:Null;N, column3:Null;N, column4:Int64;N]
4897+
");
4898+
4899+
let plan = plan
4900+
.with_param_values(vec![
4901+
ScalarValue::from(1i32),
4902+
ScalarValue::from("s"),
4903+
ScalarValue::from(3),
4904+
ScalarValue::from(4),
4905+
])
4906+
.unwrap();
4907+
4908+
// replaced
4909+
assert_snapshot!(plan.display_indent_schema(), @r#"
4910+
Projection: t.a, t.b, t.c, t.d [a:Int32;N, b:Int32;N, c:Utf8;N, d:Int64;N]
4911+
SubqueryAlias: t [a:Int32;N, b:Int32;N, c:Utf8;N, d:Int64;N]
4912+
Projection: column1 AS a, column2 AS b, column3 AS c, column4 AS d [a:Int32;N, b:Int32;N, c:Utf8;N, d:Int64;N]
4913+
Values: (Int32(1), Int32(1) AS $1, Utf8("s") AS $2, Int32(3) + Int32(4) AS $3 + $4) [column1:Int32;N, column2:Int32;N, column3:Utf8;N, column4:Int64;N]
4914+
"#);
4915+
}
4916+
4917+
#[test]
4918+
fn test_replace_placeholder_empty_relation_valid_schema() {
4919+
// SELECT $1, $2;
4920+
let plan = LogicalPlanBuilder::empty(false)
4921+
.project(vec![
4922+
SelectExpr::from(placeholder("$1")),
4923+
SelectExpr::from(placeholder("$2")),
4924+
])
4925+
.unwrap()
4926+
.build()
4927+
.unwrap();
4928+
4929+
// original
4930+
assert_snapshot!(plan.display_indent_schema(), @r"
4931+
Projection: $1, $2 [$1:Null;N, $2:Null;N]
4932+
EmptyRelation: rows=0 []
4933+
");
4934+
4935+
let plan = plan
4936+
.with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4937+
.unwrap();
4938+
4939+
// replaced
4940+
assert_snapshot!(plan.display_indent_schema(), @r#"
4941+
Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4942+
EmptyRelation: rows=0 []
4943+
"#);
4944+
}
4945+
48284946
#[test]
48294947
fn test_nullable_schema_after_grouping_set() {
48304948
let schema = Schema::new(vec![

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use datafusion::execution::registry::SerializerRegistry;
3232
use datafusion::execution::runtime_env::RuntimeEnv;
3333
use datafusion::execution::session_state::SessionStateBuilder;
3434
use datafusion::logical_expr::{
35-
Extension, InvariantLevel, LogicalPlan, PartitionEvaluator, Repartition,
36-
UserDefinedLogicalNode, Values, Volatility,
35+
Extension, InvariantLevel, LogicalPlan, LogicalPlanBuilder, PartitionEvaluator,
36+
Repartition, UserDefinedLogicalNode, Volatility,
3737
};
3838
use datafusion::optimizer::simplify_expressions::expr_simplifier::THRESHOLD_INLINE_INLIST;
3939
use datafusion::prelude::*;
@@ -1258,10 +1258,7 @@ async fn roundtrip_values() -> Result<()> {
12581258
async fn roundtrip_values_no_columns() -> Result<()> {
12591259
let ctx = create_context().await?;
12601260
// "VALUES ()" is not yet supported by the SQL parser, so we construct the plan manually
1261-
let plan = LogicalPlan::Values(Values {
1262-
values: vec![vec![], vec![]], // two rows, no columns
1263-
schema: DFSchemaRef::new(DFSchema::empty()),
1264-
});
1261+
let plan = LogicalPlanBuilder::values(vec![vec![], vec![]])?.build()?;
12651262
roundtrip_logical_plan_with_ctx(plan, ctx).await?;
12661263
Ok(())
12671264
}

0 commit comments

Comments
 (0)