From 9662e1a9f65aea3beb114b45ecfbaa9c8c81ae88 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Sun, 7 Jan 2024 20:57:03 +0800 Subject: [PATCH 1/3] fix: failed to create ValuesExec with non-nullable schema --- datafusion/physical-plan/src/values.rs | 34 ++++++++++++------- datafusion/sqllogictest/test_files/select.slt | 8 +++++ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index b624fb362e65..84e4d0e5920a 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -27,9 +27,8 @@ use crate::{ PhysicalExpr, }; -use arrow::array::new_null_array; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::{internal_err, plan_err, DataFusionError, Result, ScalarValue}; use datafusion_execution::TaskContext; @@ -53,15 +52,14 @@ impl ValuesExec { } let n_row = data.len(); let n_col = schema.fields().len(); - // we have this single row, null, typed batch as a placeholder to satisfy evaluation argument - let batch = RecordBatch::try_new( - schema.clone(), - schema - .fields() - .iter() - .map(|field| new_null_array(field.data_type(), 1)) - .collect::>(), + // we have this single row batch as a placeholder to satisfy evaluation argument + // and generate a single output row + let batch = RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), )?; + let arr = (0..n_col) .map(|j| { (0..n_row) @@ -71,7 +69,7 @@ impl ValuesExec { match r { Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), Ok(ColumnarValue::Array(a)) if a.len() == 1 => { - Ok(ScalarValue::List(a)) + ScalarValue::try_from_array(&a, 0) } Ok(ColumnarValue::Array(a)) => { plan_err!( @@ -201,6 +199,7 @@ impl ExecutionPlan for ValuesExec { #[cfg(test)] mod tests { use super::*; + use crate::expressions::lit; use crate::test::{self, make_partition}; use arrow_schema::{DataType, Field, Schema}; @@ -240,4 +239,15 @@ mod tests { ])); let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); } + + // Test issue: https://github.com/apache/arrow-datafusion/issues/8763 + #[test] + fn new_exec_with_non_nullable_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let _ = ValuesExec::try_new(schema, vec![vec![lit(1u32)]]).unwrap(); + } } diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index ea570b99d4dd..132bcdd246fe 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -114,6 +114,14 @@ VALUES (1,2,3,4,5,6,7,8,9,10,11,12,13,NULL,'F',3.5) ---- 1 2 3 4 5 6 7 8 9 10 11 12 13 NULL F 3.5 +# Test non-literal expressions in VALUES +query II +VALUES (1, CASE WHEN RANDOM() > 0.5 THEN 1 ELSE 1 END), + (2, CASE WHEN RANDOM() > 0.5 THEN 2 ELSE 2 END); +---- +1 1 +2 2 + query IT SELECT * FROM (VALUES (1,'a'),(2,NULL)) AS t(c1, c2) ---- From d15b0ef12b8ac8d80eccb9cb04cf753fd752d91d Mon Sep 17 00:00:00 2001 From: jonahgao Date: Sun, 7 Jan 2024 21:36:42 +0800 Subject: [PATCH 2/3] update test --- datafusion/physical-plan/src/values.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 84e4d0e5920a..28c4965e0eca 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -248,6 +248,9 @@ mod tests { DataType::UInt32, false, )])); - let _ = ValuesExec::try_new(schema, vec![vec![lit(1u32)]]).unwrap(); + let _ = ValuesExec::try_new(schema.clone(), vec![vec![lit(1u32)]]).unwrap(); + // Test that a null value is rejected + let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) + .unwrap_err(); } } From b0bd2bcc758cc25e9dc4dc14520eeb0635011c5c Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 8 Jan 2024 14:57:54 +0800 Subject: [PATCH 3/3] fix incorrect comment --- datafusion/physical-plan/src/values.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 28c4965e0eca..f82f7ea2f869 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -172,7 +172,7 @@ impl ExecutionPlan for ValuesExec { partition: usize, _context: Arc, ) -> Result { - // GlobalLimitExec has a single output partition + // ValuesExec has a single output partition if 0 != partition { return internal_err!( "ValuesExec invalid partition {partition} (expected 0)"