Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,17 @@ impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
let properties: PlanProperties = (plan.properties)(plan).try_into()?;

let children_rvec = (plan.children)(plan);
let children: Result<Vec<_>> = children_rvec
let children = children_rvec
.iter()
.map(ForeignExecutionPlan::try_from)
.map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
.collect();
.collect::<Result<Vec<_>>>()?;

Ok(Self {
name,
plan: plan.clone(),
properties,
children: children?,
children,
})
}
}
Expand Down Expand Up @@ -281,6 +281,7 @@ impl ExecutionPlan for ForeignExecutionPlan {

#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
physical_plan::{
execution_plan::{Boundedness, EmissionType},
Expand All @@ -294,6 +295,7 @@ mod tests {
#[derive(Debug)]
pub struct EmptyExec {
props: PlanProperties,
children: Vec<Arc<dyn ExecutionPlan>>,
}

impl EmptyExec {
Expand All @@ -305,6 +307,7 @@ mod tests {
EmissionType::Incremental,
Boundedness::Bounded,
),
children: Vec::default(),
}
}
}
Expand Down Expand Up @@ -333,14 +336,17 @@ mod tests {
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
self.children.iter().collect()
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
Ok(Arc::new(EmptyExec {
props: self.props.clone(),
children,
}))
}

fn execute(
Expand All @@ -358,7 +364,6 @@ mod tests {

#[test]
fn test_round_trip_ffi_execution_plan() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let ctx = SessionContext::new();
Expand All @@ -372,6 +377,49 @@ mod tests {

assert!(original_name == foreign_plan.name());

let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
&foreign_plan,
);

let buf = display.one_line().to_string();
assert_eq!(buf.trim(), "FFI_ExecutionPlan(number_of_children=0)");

Ok(())
}

#[test]
fn test_ffi_execution_plan_children() -> Result<()> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let ctx = SessionContext::new();

// Version 1: Adding child to the foreign plan
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);

let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);

assert_eq!(parent_foreign.children().len(), 0);
assert_eq!(child_foreign.children().len(), 0);

let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
assert_eq!(parent_foreign.children().len(), 1);

// Version 2: Adding child to the local plan
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);

let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);

assert_eq!(parent_foreign.children().len(), 1);

Ok(())
}
}
23 changes: 23 additions & 0 deletions datafusion/ffi/src/insert_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,26 @@ impl From<InsertOp> for FFI_InsertOp {
}
}
}

#[cfg(test)]
mod tests {
use datafusion::logical_expr::dml::InsertOp;

use super::FFI_InsertOp;

fn test_round_trip_insert_op(insert_op: InsertOp) {
let ffi_insert_op: FFI_InsertOp = insert_op.into();
let round_trip: InsertOp = ffi_insert_op.into();

assert_eq!(insert_op, round_trip);
}

/// This test ensures we have not accidentally mapped the FFI
/// enums to the wrong internal enums values.
#[test]
fn test_all_round_trip_insert_ops() {
test_round_trip_insert_op(InsertOp::Append);
test_round_trip_insert_op(InsertOp::Overwrite);
test_round_trip_insert_op(InsertOp::Replace);
}
}
40 changes: 40 additions & 0 deletions datafusion/ffi/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,43 @@ impl From<TableType> for FFI_TableType {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::error::Result;

fn round_trip_filter_pushdown(pushdown: TableProviderFilterPushDown) -> Result<()> {
let ffi_pushdown: FFI_TableProviderFilterPushDown = (&pushdown).into();
let round_trip: TableProviderFilterPushDown = (&ffi_pushdown).into();

assert_eq!(pushdown, round_trip);
Ok(())
}

#[test]
fn round_trip_all_filter_pushdowns() -> Result<()> {
round_trip_filter_pushdown(TableProviderFilterPushDown::Exact)?;
round_trip_filter_pushdown(TableProviderFilterPushDown::Inexact)?;
round_trip_filter_pushdown(TableProviderFilterPushDown::Unsupported)?;

Ok(())
}

fn round_trip_table_type(table_type: TableType) -> Result<()> {
let ffi_type: FFI_TableType = table_type.into();
let round_trip_type: TableType = ffi_type.into();

assert_eq!(table_type, round_trip_type);
Ok(())
}

#[test]
fn test_round_all_trip_table_type() -> Result<()> {
round_trip_table_type(TableType::Base)?;
round_trip_table_type(TableType::Temporary)?;
round_trip_table_type(TableType::View)?;

Ok(())
}
}
21 changes: 21 additions & 0 deletions datafusion/ffi/src/volatility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,24 @@ impl From<&FFI_Volatility> for Volatility {
}
}
}

#[cfg(test)]
mod tests {
use datafusion::logical_expr::Volatility;

use super::FFI_Volatility;

fn test_round_trip_volatility(volatility: Volatility) {
let ffi_volatility: FFI_Volatility = volatility.into();
let round_trip: Volatility = (&ffi_volatility).into();

assert_eq!(volatility, round_trip);
}

#[test]
fn test_all_round_trip_volatility() {
test_round_trip_volatility(Volatility::Immutable);
test_round_trip_volatility(Volatility::Stable);
test_round_trip_volatility(Volatility::Volatile);
}
}