Skip to content

Commit d496ae7

Browse files
EeshanBembiEeshanebembi-crdbalamb
authored andcommitted
fix: prevent UnionExec panic with empty inputs (apache#17449)
* fix: prevent UnionExec panic with empty inputs This commit fixes a panic in UnionExec when constructed with empty inputs. Previously, UnionExec::new(vec![]) would cause an index out of bounds panic at union.rs:542 when trying to access inputs[0]. Changes: - Made UnionExec::new() return Result<Self> with proper validation - Made union_schema() return Result<SchemaRef> with empty input checks - Added descriptive error messages for empty input cases - Updated all call sites to handle the new Result return type - Added comprehensive tests for edge cases Error messages: - "UnionExec requires at least one input" - "Cannot create union schema from empty inputs" The fix maintains backward compatibility for valid inputs while preventing crashes and providing clear error messages for invalid usage. Fixes apache#17052 * refactor: address PR review comments for UnionExec empty inputs fix - Add new try_new method that returns Result<Arc<dyn ExecutionPlan>> - Deprecate existing new method in favor of try_new - Optimize single-input case: try_new returns the input directly - Remove redundant assert!(result.is_err()) from tests - Rename test_union_multiple_inputs_still_works to test_union_schema_multiple_inputs - Update all call sites to use appropriate API (try_new for new code, deprecated new for tests) This maintains backward compatibility while providing better error handling and optimization for single-input cases. * Fix cargo fmt and clippy warnings - Add proper feature gates for parquet_encryption in datasource-parquet - Format code to pass cargo fmt checks - All tests passing * Fix clippy --------- Co-authored-by: Eeshan <eeshan@Eeshans-MacBook-Pro.local> Co-authored-by: ebembi-crdb <ebembi@cockroachlabs.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> (cherry picked from commit b122a16)
1 parent 5d119c8 commit d496ae7

File tree

11 files changed

+138
-10
lines changed

11 files changed

+138
-10
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1243,7 +1243,7 @@ impl DefaultPhysicalPlanner {
12431243
}
12441244

12451245
// N Children
1246-
LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())),
1246+
LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?,
12471247
LogicalPlan::Extension(Extension { node }) => {
12481248
let mut maybe_plan = None;
12491249
let children = children.vec();

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,6 +1783,7 @@ fn union_to_interleave() -> Result<()> {
17831783
);
17841784

17851785
// Union
1786+
#[allow(deprecated)]
17861787
let plan = Arc::new(UnionExec::new(vec![left, right]));
17871788

17881789
// final agg
@@ -1827,6 +1828,7 @@ fn union_not_to_interleave() -> Result<()> {
18271828
);
18281829

18291830
// Union
1831+
#[allow(deprecated)]
18301832
let plan = Arc::new(UnionExec::new(vec![left, right]));
18311833

18321834
// final agg

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ mod test {
356356
#[tokio::test]
357357
async fn test_statistic_by_partition_of_union() -> Result<()> {
358358
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
359+
#[allow(deprecated)]
359360
let union_exec: Arc<dyn ExecutionPlan> =
360361
Arc::new(UnionExec::new(vec![scan.clone(), scan]));
361362
let statistics = (0..union_exec.output_partitioning().partition_count())

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,7 @@ fn test_sort_preserving_after_projection() -> Result<()> {
15351535
#[test]
15361536
fn test_union_after_projection() -> Result<()> {
15371537
let csv = create_simple_csv_exec();
1538+
#[allow(deprecated)]
15381539
let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv]));
15391540
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
15401541
vec![

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ pub fn sort_preserving_merge_exec_with_fetch(
304304
}
305305

306306
pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
307+
#[allow(deprecated)]
307308
Arc::new(UnionExec::new(input))
308309
}
309310

datafusion/datasource-parquet/src/opener.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub(super) struct ParquetOpener {
9999
/// Coerce INT96 timestamps to specific TimeUnit
100100
pub coerce_int96: Option<TimeUnit>,
101101
/// Optional parquet FileDecryptionProperties
102+
#[cfg(feature = "parquet_encryption")]
102103
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
103104
/// Rewrite expressions in the context of the file schema
104105
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
@@ -155,10 +156,12 @@ impl FileOpener for ParquetOpener {
155156
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
156157

157158
let enable_page_index = self.enable_page_index;
159+
#[cfg(feature = "parquet_encryption")]
158160
let encryption_context = self.get_encryption_context();
159161
let max_predicate_cache_size = self.max_predicate_cache_size;
160162

161163
Ok(Box::pin(async move {
164+
#[cfg(feature = "parquet_encryption")]
162165
let file_decryption_properties = encryption_context
163166
.get_file_decryption_properties(&file_location)
164167
.await?;
@@ -544,6 +547,7 @@ where
544547
}
545548

546549
#[derive(Default)]
550+
#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))]
547551
struct EncryptionContext {
548552
#[cfg(feature = "parquet_encryption")]
549553
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -586,6 +590,7 @@ impl EncryptionContext {
586590
}
587591

588592
#[cfg(not(feature = "parquet_encryption"))]
593+
#[allow(dead_code)]
589594
impl EncryptionContext {
590595
async fn get_file_decryption_properties(
591596
&self,
@@ -605,6 +610,7 @@ impl ParquetOpener {
605610
}
606611

607612
#[cfg(not(feature = "parquet_encryption"))]
613+
#[allow(dead_code)]
608614
fn get_encryption_context(&self) -> EncryptionContext {
609615
EncryptionContext::default()
610616
}
@@ -861,6 +867,7 @@ mod test {
861867
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
862868
enable_row_group_stats_pruning: true,
863869
coerce_int96: None,
870+
#[cfg(feature = "parquet_encryption")]
864871
file_decryption_properties: None,
865872
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
866873
#[cfg(feature = "parquet_encryption")]
@@ -950,6 +957,7 @@ mod test {
950957
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
951958
enable_row_group_stats_pruning: true,
952959
coerce_int96: None,
960+
#[cfg(feature = "parquet_encryption")]
953961
file_decryption_properties: None,
954962
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
955963
#[cfg(feature = "parquet_encryption")]
@@ -1055,6 +1063,7 @@ mod test {
10551063
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
10561064
enable_row_group_stats_pruning: true,
10571065
coerce_int96: None,
1066+
#[cfg(feature = "parquet_encryption")]
10581067
file_decryption_properties: None,
10591068
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
10601069
#[cfg(feature = "parquet_encryption")]
@@ -1170,6 +1179,7 @@ mod test {
11701179
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
11711180
enable_row_group_stats_pruning: false, // note that this is false!
11721181
coerce_int96: None,
1182+
#[cfg(feature = "parquet_encryption")]
11731183
file_decryption_properties: None,
11741184
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
11751185
#[cfg(feature = "parquet_encryption")]
@@ -1286,6 +1296,7 @@ mod test {
12861296
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
12871297
enable_row_group_stats_pruning: true,
12881298
coerce_int96: None,
1299+
#[cfg(feature = "parquet_encryption")]
12891300
file_decryption_properties: None,
12901301
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
12911302
#[cfg(feature = "parquet_encryption")]

datafusion/datasource-parquet/src/source.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use datafusion_physical_plan::metrics::Count;
5252
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
5353
use datafusion_physical_plan::DisplayFormatType;
5454

55+
#[cfg(feature = "parquet_encryption")]
5556
use datafusion_common::encryption::map_config_decryption_to_decryption;
5657
#[cfg(feature = "parquet_encryption")]
5758
use datafusion_execution::parquet_encryption::EncryptionFactory;
@@ -547,6 +548,7 @@ impl FileSource for ParquetSource {
547548
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
548549
});
549550

551+
#[cfg(feature = "parquet_encryption")]
550552
let file_decryption_properties = self
551553
.table_parquet_options()
552554
.crypto
@@ -582,6 +584,7 @@ impl FileSource for ParquetSource {
582584
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
583585
schema_adapter_factory,
584586
coerce_int96,
587+
#[cfg(feature = "parquet_encryption")]
585588
file_decryption_properties,
586589
expr_adapter_factory,
587590
#[cfg(feature = "parquet_encryption")]

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,6 +1783,7 @@ mod test {
17831783
let source1 = sorted_memory_exec(&schema, sort_exprs.clone());
17841784
let source2 = sorted_memory_exec(&schema, sort_exprs);
17851785
// output has multiple partitions, and is sorted
1786+
#[allow(deprecated)]
17861787
let union = UnionExec::new(vec![source1, source2]);
17871788
let exec =
17881789
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
@@ -1825,6 +1826,7 @@ mod test {
18251826
let source1 = memory_exec(&schema);
18261827
let source2 = memory_exec(&schema);
18271828
// output has multiple partitions, but is not sorted
1829+
#[allow(deprecated)]
18281830
let union = UnionExec::new(vec![source1, source2]);
18291831
let exec =
18301832
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))

datafusion/physical-plan/src/union.rs

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,10 @@ pub struct UnionExec {
103103

104104
impl UnionExec {
105105
/// Create a new UnionExec
106+
#[deprecated(since = "44.0.0", note = "Use UnionExec::try_new instead")]
106107
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
107-
let schema = union_schema(&inputs);
108+
let schema =
109+
union_schema(&inputs).expect("UnionExec::new called with empty inputs");
108110
// The schema of the inputs and the union schema is consistent when:
109111
// - They have the same number of fields, and
110112
// - Their fields have same types at the same indices.
@@ -118,6 +120,37 @@ impl UnionExec {
118120
}
119121
}
120122

123+
/// Try to create a new UnionExec.
124+
///
125+
/// # Errors
126+
/// Returns an error if:
127+
/// - `inputs` is empty
128+
///
129+
/// # Optimization
130+
/// If there is only one input, returns that input directly rather than wrapping it in a UnionExec
131+
pub fn try_new(
132+
inputs: Vec<Arc<dyn ExecutionPlan>>,
133+
) -> Result<Arc<dyn ExecutionPlan>> {
134+
match inputs.len() {
135+
0 => exec_err!("UnionExec requires at least one input"),
136+
1 => Ok(inputs.into_iter().next().unwrap()),
137+
_ => {
138+
let schema = union_schema(&inputs)?;
139+
// The schema of the inputs and the union schema is consistent when:
140+
// - They have the same number of fields, and
141+
// - Their fields have same types at the same indices.
142+
// Here, we know that schemas are consistent and the call below can
143+
// not return an error.
144+
let cache = Self::compute_properties(&inputs, schema).unwrap();
145+
Ok(Arc::new(UnionExec {
146+
inputs,
147+
metrics: ExecutionPlanMetricsSet::new(),
148+
cache,
149+
}))
150+
}
151+
}
152+
}
153+
121154
/// Get inputs of the execution plan
122155
pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
123156
&self.inputs
@@ -226,7 +259,7 @@ impl ExecutionPlan for UnionExec {
226259
self: Arc<Self>,
227260
children: Vec<Arc<dyn ExecutionPlan>>,
228261
) -> Result<Arc<dyn ExecutionPlan>> {
229-
Ok(Arc::new(UnionExec::new(children)))
262+
UnionExec::try_new(children)
230263
}
231264

232265
fn execute(
@@ -321,6 +354,7 @@ impl ExecutionPlan for UnionExec {
321354
.map(|child| make_with_child(projection, child))
322355
.collect::<Result<Vec<_>>>()?;
323356

357+
#[allow(deprecated)]
324358
Ok(Some(Arc::new(UnionExec::new(new_children))))
325359
}
326360

@@ -384,7 +418,7 @@ impl InterleaveExec {
384418
"Not all InterleaveExec children have a consistent hash partitioning"
385419
);
386420
}
387-
let cache = Self::compute_properties(&inputs);
421+
let cache = Self::compute_properties(&inputs)?;
388422
Ok(InterleaveExec {
389423
inputs,
390424
metrics: ExecutionPlanMetricsSet::new(),
@@ -398,17 +432,17 @@ impl InterleaveExec {
398432
}
399433

400434
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
401-
fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> PlanProperties {
402-
let schema = union_schema(inputs);
435+
fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<PlanProperties> {
436+
let schema = union_schema(inputs)?;
403437
let eq_properties = EquivalenceProperties::new(schema);
404438
// Get output partitioning:
405439
let output_partitioning = inputs[0].output_partitioning().clone();
406-
PlanProperties::new(
440+
Ok(PlanProperties::new(
407441
eq_properties,
408442
output_partitioning,
409443
emission_type_from_children(inputs),
410444
boundedness_from_children(inputs),
411-
)
445+
))
412446
}
413447
}
414448

@@ -549,7 +583,11 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
549583
.all(|partition| partition == *reference)
550584
}
551585

552-
fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
586+
fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<SchemaRef> {
587+
if inputs.is_empty() {
588+
return exec_err!("Cannot create union schema from empty inputs");
589+
}
590+
553591
let first_schema = inputs[0].schema();
554592

555593
let fields = (0..first_schema.fields().len())
@@ -592,7 +630,10 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
592630
.flat_map(|i| i.schema().metadata().clone().into_iter())
593631
.collect();
594632

595-
Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))
633+
Ok(Arc::new(Schema::new_with_metadata(
634+
fields,
635+
all_metadata_merged,
636+
)))
596637
}
597638

598639
/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
@@ -721,6 +762,7 @@ mod tests {
721762
let csv = test::scan_partitioned(4);
722763
let csv2 = test::scan_partitioned(5);
723764

765+
#[allow(deprecated)]
724766
let union_exec = Arc::new(UnionExec::new(vec![csv, csv2]));
725767

726768
// Should have 9 partitions and 9 output batches
@@ -903,6 +945,7 @@ mod tests {
903945
let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema));
904946
union_expected_eq.add_orderings(union_expected_orderings);
905947

948+
#[allow(deprecated)]
906949
let union = UnionExec::new(vec![child1, child2]);
907950
let union_eq_properties = union.properties().equivalence_properties();
908951
let err_msg = format!(
@@ -927,4 +970,66 @@ mod tests {
927970
assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
928971
}
929972
}
973+
974+
#[test]
975+
fn test_union_empty_inputs() {
976+
// Test that UnionExec::try_new fails with empty inputs
977+
let result = UnionExec::try_new(vec![]);
978+
assert!(result
979+
.unwrap_err()
980+
.to_string()
981+
.contains("UnionExec requires at least one input"));
982+
}
983+
984+
#[test]
985+
fn test_union_schema_empty_inputs() {
986+
// Test that union_schema fails with empty inputs
987+
let result = union_schema(&[]);
988+
assert!(result
989+
.unwrap_err()
990+
.to_string()
991+
.contains("Cannot create union schema from empty inputs"));
992+
}
993+
994+
#[test]
995+
fn test_union_single_input() -> Result<()> {
996+
// Test that UnionExec::try_new returns the single input directly
997+
let schema = create_test_schema()?;
998+
let memory_exec: Arc<dyn ExecutionPlan> =
999+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1000+
let memory_exec_clone = Arc::clone(&memory_exec);
1001+
let result = UnionExec::try_new(vec![memory_exec])?;
1002+
1003+
// Check that the result is the same as the input (no UnionExec wrapper)
1004+
assert_eq!(result.schema(), schema);
1005+
// Verify it's the same execution plan
1006+
assert!(Arc::ptr_eq(&result, &memory_exec_clone));
1007+
1008+
Ok(())
1009+
}
1010+
1011+
#[test]
1012+
fn test_union_schema_multiple_inputs() -> Result<()> {
1013+
// Test that existing functionality with multiple inputs still works
1014+
let schema = create_test_schema()?;
1015+
let memory_exec1 =
1016+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1017+
let memory_exec2 =
1018+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1019+
1020+
let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?;
1021+
1022+
// Downcast to verify it's a UnionExec
1023+
let union = union_plan
1024+
.as_any()
1025+
.downcast_ref::<UnionExec>()
1026+
.expect("Expected UnionExec");
1027+
1028+
// Check that schema is correct
1029+
assert_eq!(union.schema(), schema);
1030+
// Check that we have 2 inputs
1031+
assert_eq!(union.inputs().len(), 2);
1032+
1033+
Ok(())
1034+
}
9301035
}

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,7 @@ impl protobuf::PhysicalPlanNode {
14051405
for input in &union.inputs {
14061406
inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?);
14071407
}
1408+
#[allow(deprecated)]
14081409
Ok(Arc::new(UnionExec::new(inputs)))
14091410
}
14101411

0 commit comments

Comments
 (0)