diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 8087acfa33c8..00602474d621 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -219,17 +219,17 @@ impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan { let properties: PlanProperties = (plan.properties)(plan).try_into()?; let children_rvec = (plan.children)(plan); - let children: Result> = children_rvec + let children = children_rvec .iter() .map(ForeignExecutionPlan::try_from) .map(|child| child.map(|c| Arc::new(c) as Arc)) - .collect(); + .collect::>>()?; Ok(Self { name, plan: plan.clone(), properties, - children: children?, + children, }) } } @@ -281,6 +281,7 @@ impl ExecutionPlan for ForeignExecutionPlan { #[cfg(test)] mod tests { + use arrow::datatypes::{DataType, Field, Schema}; use datafusion::{ physical_plan::{ execution_plan::{Boundedness, EmissionType}, @@ -294,6 +295,7 @@ mod tests { #[derive(Debug)] pub struct EmptyExec { props: PlanProperties, + children: Vec>, } impl EmptyExec { @@ -305,6 +307,7 @@ mod tests { EmissionType::Incremental, Boundedness::Bounded, ), + children: Vec::default(), } } } @@ -333,14 +336,17 @@ mod tests { } fn children(&self) -> Vec<&Arc> { - vec![] + self.children.iter().collect() } fn with_new_children( self: Arc, - _: Vec>, + children: Vec>, ) -> Result> { - unimplemented!() + Ok(Arc::new(EmptyExec { + props: self.props.clone(), + children, + })) } fn execute( @@ -358,7 +364,6 @@ mod tests { #[test] fn test_round_trip_ffi_execution_plan() -> Result<()> { - use arrow::datatypes::{DataType, Field, Schema}; let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); let ctx = SessionContext::new(); @@ -372,6 +377,49 @@ mod tests { assert!(original_name == foreign_plan.name()); + let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new( + &foreign_plan, + ); + + let buf = display.one_line().to_string(); + assert_eq!(buf.trim(), "FFI_ExecutionPlan(number_of_children=0)"); + + Ok(()) + } + + #[test] + fn test_ffi_execution_plan_children() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let ctx = SessionContext::new(); + + // Version 1: Adding child to the foreign plan + let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); + let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?); + + let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); + let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?); + + assert_eq!(parent_foreign.children().len(), 0); + assert_eq!(child_foreign.children().len(), 0); + + let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?; + assert_eq!(parent_foreign.children().len(), 1); + + // Version 2: Adding child to the local plan + let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); + let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?); + + let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let parent_plan = parent_plan.with_new_children(vec![child_foreign])?; + let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); + let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?); + + assert_eq!(parent_foreign.children().len(), 1); + Ok(()) } } diff --git a/datafusion/ffi/src/insert_op.rs b/datafusion/ffi/src/insert_op.rs index e44262377405..8e8693076cc0 100644 --- a/datafusion/ffi/src/insert_op.rs +++ b/datafusion/ffi/src/insert_op.rs @@ -47,3 +47,26 @@ impl From for FFI_InsertOp { } } } + +#[cfg(test)] +mod tests { + use datafusion::logical_expr::dml::InsertOp; + + use super::FFI_InsertOp; + + fn test_round_trip_insert_op(insert_op: InsertOp) { + let ffi_insert_op: FFI_InsertOp = insert_op.into(); + let round_trip: InsertOp = ffi_insert_op.into(); + + assert_eq!(insert_op, round_trip); + } + + /// This test ensures we have not accidentally mapped the FFI + /// enums to the wrong internal enums values. + #[test] + fn test_all_round_trip_insert_ops() { + test_round_trip_insert_op(InsertOp::Append); + test_round_trip_insert_op(InsertOp::Overwrite); + test_round_trip_insert_op(InsertOp::Replace); + } +} diff --git a/datafusion/ffi/src/table_source.rs b/datafusion/ffi/src/table_source.rs index a59836622ee6..418fdf16a564 100644 --- a/datafusion/ffi/src/table_source.rs +++ b/datafusion/ffi/src/table_source.rs @@ -85,3 +85,43 @@ impl From for FFI_TableType { } } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::error::Result; + + fn round_trip_filter_pushdown(pushdown: TableProviderFilterPushDown) -> Result<()> { + let ffi_pushdown: FFI_TableProviderFilterPushDown = (&pushdown).into(); + let round_trip: TableProviderFilterPushDown = (&ffi_pushdown).into(); + + assert_eq!(pushdown, round_trip); + Ok(()) + } + + #[test] + fn round_trip_all_filter_pushdowns() -> Result<()> { + round_trip_filter_pushdown(TableProviderFilterPushDown::Exact)?; + round_trip_filter_pushdown(TableProviderFilterPushDown::Inexact)?; + round_trip_filter_pushdown(TableProviderFilterPushDown::Unsupported)?; + + Ok(()) + } + + fn round_trip_table_type(table_type: TableType) -> Result<()> { + let ffi_type: FFI_TableType = table_type.into(); + let round_trip_type: TableType = ffi_type.into(); + + assert_eq!(table_type, round_trip_type); + Ok(()) + } + + #[test] + fn test_round_all_trip_table_type() -> Result<()> { + round_trip_table_type(TableType::Base)?; + round_trip_table_type(TableType::Temporary)?; + round_trip_table_type(TableType::View)?; + + Ok(()) + } +} diff --git a/datafusion/ffi/src/volatility.rs b/datafusion/ffi/src/volatility.rs index 8b565b91b76d..0aaf68a174cf 100644 --- a/datafusion/ffi/src/volatility.rs +++ b/datafusion/ffi/src/volatility.rs @@ -46,3 +46,24 @@ impl From<&FFI_Volatility> for Volatility { } } } + +#[cfg(test)] +mod tests { + use datafusion::logical_expr::Volatility; + + use super::FFI_Volatility; + + fn test_round_trip_volatility(volatility: Volatility) { + let ffi_volatility: FFI_Volatility = volatility.into(); + let round_trip: Volatility = (&ffi_volatility).into(); + + assert_eq!(volatility, round_trip); + } + + #[test] + fn test_all_round_trip_volatility() { + test_round_trip_volatility(Volatility::Immutable); + test_round_trip_volatility(Volatility::Stable); + test_round_trip_volatility(Volatility::Volatile); + } +}