Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
timsaucer committed Oct 26, 2024
1 parent e595ba1 commit 486bf06
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 41 deletions.
73 changes: 40 additions & 33 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,55 +43,32 @@ use prost::Message;

use crate::arrow_wrappers::WrappedSchema;

// TODO: should we just make ExecutionMode repr(C)?
#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Clone, StableAbi)]
pub enum FFI_ExecutionMode {
Bounded,
Unbounded,
PipelineBreaking,
}

impl From<ExecutionMode> for FFI_ExecutionMode {
fn from(value: ExecutionMode) -> Self {
match value {
ExecutionMode::Bounded => FFI_ExecutionMode::Bounded,
ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded,
ExecutionMode::PipelineBreaking => FFI_ExecutionMode::PipelineBreaking,
}
}
}

impl From<FFI_ExecutionMode> for ExecutionMode {
fn from(value: FFI_ExecutionMode) -> Self {
match value {
FFI_ExecutionMode::Bounded => ExecutionMode::Bounded,
FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded,
FFI_ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking,
}
}
}

/// A stable struct for sharing [`PlanProperties`] across FFI boundaries.
#[repr(C)]
#[derive(Debug, StableAbi)]
#[allow(missing_docs)]
#[allow(non_camel_case_types)]
pub struct FFI_PlanProperties {
// Returns protobuf serialized bytes of the partitioning
/// The output partitioning is a [`Partitioning`] protobuf message serialized
/// into bytes to pass across the FFI boundary.
pub output_partitioning:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,

/// Return the execution mode of the plan.
pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode,

// PhysicalSortExprNodeCollection proto
/// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message
/// serialized into bytes to pass across the FFI boundary.
pub output_ordering:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,

/// Return the schema of the plan.
pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema,

/// Release the memory of the private data when it is no longer being used.
pub release: unsafe extern "C" fn(arg: &mut Self),

/// Internal data. This is only to be accessed by the provider of the plan.
/// The foreign library should never attempt to access this data.
pub private_data: *mut c_void,
}

Expand Down Expand Up @@ -261,6 +238,36 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
}
}

/// FFI safe version of [`ExecutionMode`].
#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Clone, StableAbi)]
pub enum FFI_ExecutionMode {
Bounded,
Unbounded,
PipelineBreaking,
}

impl From<ExecutionMode> for FFI_ExecutionMode {
fn from(value: ExecutionMode) -> Self {
match value {
ExecutionMode::Bounded => FFI_ExecutionMode::Bounded,
ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded,
ExecutionMode::PipelineBreaking => FFI_ExecutionMode::PipelineBreaking,
}
}
}

impl From<FFI_ExecutionMode> for ExecutionMode {
fn from(value: FFI_ExecutionMode) -> Self {
match value {
FFI_ExecutionMode::Bounded => ExecutionMode::Bounded,
FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded,
FFI_ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking,
}
}
}

#[cfg(test)]
mod tests {
use datafusion::physical_plan::Partitioning;
Expand Down
8 changes: 7 additions & 1 deletion datafusion/ffi/src/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,25 @@ use futures::{Stream, TryStreamExt};

use crate::arrow_wrappers::{WrappedArray, WrappedSchema};

/// A stable struct for sharing [`RecordBatchStream`] across FFI boundaries.
/// We use the async-ffi crate for handling async calls across libraries.
#[repr(C)]
#[derive(Debug, StableAbi)]
#[allow(missing_docs)]
#[allow(non_camel_case_types)]
pub struct FFI_RecordBatchStream {
/// This mirrors the `poll_next` of [`RecordBatchStream`] but does so
/// in a FFI safe manner.
pub poll_next:
unsafe extern "C" fn(
stream: &Self,
cx: &mut FfiContext,
) -> FfiPoll<ROption<RResult<WrappedArray, RString>>>,

/// Return the schema of the record batch
pub schema: unsafe extern "C" fn(stream: &Self) -> WrappedSchema,

/// Internal data. This is only to be accessed by the provider of the plan.
/// The foreign library should never attempt to access this data.
pub private_data: *mut c_void,
}

Expand Down
26 changes: 23 additions & 3 deletions datafusion/ffi/src/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,35 @@ use abi_stable::{
use datafusion::{config::ConfigOptions, error::Result};
use datafusion::{error::DataFusionError, prelude::SessionConfig};

/// A stable struct for sharing [`SessionConfig`] across FFI boundaries.
/// Instead of attempting to expose the entire SessionConfig interface, we
/// convert the config options into a map from a string to string and pass
/// those values across the FFI boundary. On the receiver side, we
/// reconstruct a SessionConfig from those values.
///
/// It is possible that using different versions of DataFusion across the
/// FFI boundary could have differing expectations of the config options.
/// This is a limitation of this approach, but exposing the entire
/// SessionConfig via a FFI interface would be extensive and provide limited
/// value over this version.
#[repr(C)]
#[derive(Debug, StableAbi)]
#[allow(missing_docs)]
#[allow(non_camel_case_types)]
pub struct FFI_SessionConfig {
/// Return a hash map from key to value of the config options represented
/// by string values.
pub config_options: unsafe extern "C" fn(config: &Self) -> RHashMap<RString, RString>,

/// Used to create a clone on the provider of the execution plan. This should
/// only need to be called by the receiver of the plan.
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,

/// Release the memory of the private data when it is no longer being used.
pub release: unsafe extern "C" fn(arg: &mut Self),

/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignSessionConfig`] should never attempt to access this data.
pub private_data: *mut c_void,
pub clone: unsafe extern "C" fn(&Self) -> Self,
pub release: unsafe extern "C" fn(config: &mut Self),
}

unsafe impl Send for FFI_SessionConfig {}
Expand Down Expand Up @@ -125,6 +144,7 @@ impl Drop for FFI_SessionConfig {
}
}

/// A wrapper struct for accessing [`SessionConfig`] across a FFI boundary
pub struct ForeignSessionConfig(pub SessionConfig);

impl TryFrom<&FFI_SessionConfig> for ForeignSessionConfig {
Expand Down
28 changes: 26 additions & 2 deletions datafusion/ffi/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,25 @@ use super::{
};
use datafusion::error::Result;

/// A stable interface for creating a DataFusion TableProvider.
/// A stable struct for sharing [`TableProvider`] across FFI boundaries.
#[repr(C)]
#[derive(Debug, StableAbi)]
#[allow(non_camel_case_types)]
pub struct FFI_TableProvider {
/// Return the table schema
pub schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema,

/// Perform a scan on the table. See [`TableProvider`] for detailed usage information.
///
/// # Arguments
///
/// * `provider` - the table provider
/// * `session_config` - session configuration
/// * `projections` - if specified, only a subset of the columns are returned
/// * `filters_serialized` - filters to apply to the scan, which are a
/// [`LogicalExprList`] protobuf message serialized into bytes to pass
/// across the FFI boundary.
/// * `limit` - if specified, limit the number of rows returned
pub scan: unsafe extern "C" fn(
provider: &Self,
session_config: &FFI_SessionConfig,
Expand All @@ -67,8 +80,12 @@ pub struct FFI_TableProvider {
limit: ROption<usize>,
) -> FfiFuture<RResult<FFI_ExecutionPlan, RString>>,

/// Return the type of table. See [`TableType`] for options.
pub table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType,

/// Based upon the input filters, identify which are supported. The filters
/// are a [`LogicalExprList`] protobuf message serialized into bytes to pass
/// across the FFI boundary.
pub supports_filters_pushdown: Option<
unsafe extern "C" fn(
provider: &FFI_TableProvider,
Expand All @@ -77,8 +94,15 @@ pub struct FFI_TableProvider {
-> RResult<RVec<FFI_TableProviderFilterPushDown>, RString>,
>,

pub clone: unsafe extern "C" fn(provider: &Self) -> Self,
/// Used to create a clone on the provider of the execution plan. This should
/// only need to be called by the receiver of the plan.
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,

/// Release the memory of the private data when it is no longer being used.
pub release: unsafe extern "C" fn(arg: &mut Self),

/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignExecutionPlan`] should never attempt to access this data.
pub private_data: *mut c_void,
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/ffi/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use abi_stable::StableAbi;
use datafusion::{datasource::TableType, logical_expr::TableProviderFilterPushDown};

// TODO Should we just define TableProviderFilterPushDown as repr(C)?
/// FFI safe version of [`TableProviderFilterPushDown`].
#[repr(C)]
#[derive(StableAbi)]
#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -56,7 +56,7 @@ impl From<&TableProviderFilterPushDown> for FFI_TableProviderFilterPushDown {
}
}

// TODO Should we just define FFI_TableType as repr(C)?
/// FFI safe version of [`TableType`].
#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, StableAbi)]
Expand Down

0 comments on commit 486bf06

Please sign in to comment.