Skip to content

Commit 9d1bfc1

Browse files
authored
FFI support for versions and alternate tokio runtimes (#13937)
* Add optional reference to tokio runtime for table providers * Add function to return the library version over FFI * Resolve clippy warnings * Function does not need to be defined as unsafe * Add integration test for FFI table provider * Add version call on FFI integration test * Making use explicit on crate to try to get CI to ensure it builds first * Add license text * Fix unit test to find deps in ci profile * Remove ffitest crate and put test lib behind a feature flag * Add integation-tests feature to ci tests * Add integration-tests feature to CI run * Add clarifying text * Update CI to only run integration tests for certain checks * When the feature integtation-tests is enabled, we get conflicting library entries for the example table provider and integration test, so disable the example during CI run * Remove typo * Specify each excluded crate separately * Doc tests did not need the exclusion * Integration tests shouldn't need doc test
1 parent b46e281 commit 9d1bfc1

File tree

12 files changed

+649
-21
lines changed

12 files changed

+649
-21
lines changed

.github/workflows/rust.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ jobs:
6060
with:
6161
rust-version: stable
6262
- name: Prepare cargo build
63-
run: cargo check --profile ci --all-targets
63+
run: cargo check --profile ci --all-targets --features integration-tests
6464

6565
# cargo check common, functions and substrait with no default features
6666
linux-cargo-check-no-default-features:
@@ -92,8 +92,8 @@ jobs:
9292
- name: Check workspace in debug mode
9393
run: cargo check --profile ci --all-targets --workspace
9494

95-
- name: Check workspace with avro,json features
96-
run: cargo check --profile ci --workspace --benches --features avro,json
95+
- name: Check workspace with additional features
96+
run: cargo check --profile ci --workspace --benches --features avro,json,integration-tests
9797

9898
- name: Check Cargo.lock for datafusion-cli
9999
run: |
@@ -185,7 +185,7 @@ jobs:
185185
with:
186186
rust-version: stable
187187
- name: Run tests (excluding doctests)
188-
run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace
188+
run: cargo test --profile ci --exclude datafusion-examples --exclude ffi_example_table_provider --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests
189189
- name: Verify Working Directory Clean
190190
run: git diff --exit-code
191191

@@ -417,7 +417,7 @@ jobs:
417417
- name: Run tests (excluding doctests)
418418
shell: bash
419419
run: |
420-
cargo test --profile ci --lib --tests --bins --features avro,json,backtrace
420+
cargo test --profile ci --lib --tests --bins --features avro,json,backtrace,integration-tests
421421
cd datafusion-cli
422422
cargo test --profile ci --lib --tests --bins --all-features
423423

ci/scripts/rust_clippy.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@
1818
# under the License.
1919

2020
set -ex
21-
cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings
21+
cargo clippy --all-targets --workspace --features avro,pyarrow,integration-tests -- -D warnings
2222
cd datafusion-cli
2323
cargo clippy --all-targets --all-features -- -D warnings

datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {
5353

5454
let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
5555

56-
FFI_TableProvider::new(Arc::new(table_provider), true)
56+
FFI_TableProvider::new(Arc::new(table_provider), true, None)
5757
}
5858

5959
#[export_root_module]

datafusion/ffi/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,25 @@ workspace = true
3333
[lib]
3434
name = "datafusion_ffi"
3535
path = "src/lib.rs"
36+
crate-type = ["cdylib", "rlib"]
3637

3738
[dependencies]
3839
abi_stable = "0.11.3"
3940
arrow = { workspace = true, features = ["ffi"] }
41+
arrow-array = { workspace = true }
42+
arrow-schema = { workspace = true }
4043
async-ffi = { version = "0.5.0", features = ["abi_stable"] }
4144
async-trait = { workspace = true }
4245
datafusion = { workspace = true, default-features = false }
4346
datafusion-proto = { workspace = true }
4447
futures = { workspace = true }
4548
log = { workspace = true }
4649
prost = { workspace = true }
50+
semver = "1.0.24"
51+
tokio = { workspace = true }
4752

4853
[dev-dependencies]
4954
doc-comment = { workspace = true }
50-
tokio = { workspace = true }
55+
56+
[features]
57+
integration-tests = []

datafusion/ffi/src/execution_plan.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion::{
2727
execution::{SendableRecordBatchStream, TaskContext},
2828
physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
2929
};
30+
use tokio::runtime::Runtime;
3031

3132
use crate::{
3233
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
@@ -71,6 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {}
7172
pub struct ExecutionPlanPrivateData {
7273
pub plan: Arc<dyn ExecutionPlan>,
7374
pub context: Arc<TaskContext>,
75+
pub runtime: Option<Arc<Runtime>>,
7476
}
7577

7678
unsafe extern "C" fn properties_fn_wrapper(
@@ -88,11 +90,14 @@ unsafe extern "C" fn children_fn_wrapper(
8890
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
8991
let plan = &(*private_data).plan;
9092
let ctx = &(*private_data).context;
93+
let runtime = &(*private_data).runtime;
9194

9295
let children: Vec<_> = plan
9396
.children()
9497
.into_iter()
95-
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx)))
98+
.map(|child| {
99+
FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
100+
})
96101
.collect();
97102

98103
children.into()
@@ -105,9 +110,10 @@ unsafe extern "C" fn execute_fn_wrapper(
105110
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
106111
let plan = &(*private_data).plan;
107112
let ctx = &(*private_data).context;
113+
let runtime = (*private_data).runtime.as_ref().map(Arc::clone);
108114

109115
match plan.execute(partition, Arc::clone(ctx)) {
110-
Ok(rbs) => RResult::ROk(rbs.into()),
116+
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
111117
Err(e) => RResult::RErr(
112118
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
113119
),
@@ -129,7 +135,11 @@ unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_Execution
129135
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
130136
let plan_data = &(*private_data);
131137

132-
FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), Arc::clone(&plan_data.context))
138+
FFI_ExecutionPlan::new(
139+
Arc::clone(&plan_data.plan),
140+
Arc::clone(&plan_data.context),
141+
plan_data.runtime.clone(),
142+
)
133143
}
134144

135145
impl Clone for FFI_ExecutionPlan {
@@ -140,8 +150,16 @@ impl Clone for FFI_ExecutionPlan {
140150

141151
impl FFI_ExecutionPlan {
142152
/// This function is called on the provider's side.
143-
pub fn new(plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> Self {
144-
let private_data = Box::new(ExecutionPlanPrivateData { plan, context });
153+
pub fn new(
154+
plan: Arc<dyn ExecutionPlan>,
155+
context: Arc<TaskContext>,
156+
runtime: Option<Arc<Runtime>>,
157+
) -> Self {
158+
let private_data = Box::new(ExecutionPlanPrivateData {
159+
plan,
160+
context,
161+
runtime,
162+
});
145163

146164
Self {
147165
properties: properties_fn_wrapper,
@@ -357,7 +375,7 @@ mod tests {
357375
let original_plan = Arc::new(EmptyExec::new(schema));
358376
let original_name = original_plan.name().to_string();
359377

360-
let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx());
378+
let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
361379

362380
let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
363381

datafusion/ffi/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,18 @@ pub mod session_config;
2626
pub mod table_provider;
2727
pub mod table_source;
2828

29+
#[cfg(feature = "integration-tests")]
30+
pub mod tests;
31+
32+
/// Returns the major version of the FFI implementation. If the API evolves,
33+
/// we use the major version to identify compatibility over the unsafe
34+
/// boundary. This call is intended to be used by implementers to validate
35+
/// they have compatible libraries.
36+
pub extern "C" fn version() -> u64 {
37+
let version_str = env!("CARGO_PKG_VERSION");
38+
let version = semver::Version::parse(version_str).expect("Invalid version string");
39+
version.major
40+
}
41+
2942
#[cfg(doctest)]
3043
doc_comment::doctest!("../README.md", readme_example_test);

datafusion/ffi/src/record_batch_stream.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{ffi::c_void, task::Poll};
18+
use std::{ffi::c_void, sync::Arc, task::Poll};
1919

2020
use abi_stable::{
2121
std_types::{ROption, RResult, RString},
@@ -33,6 +33,7 @@ use datafusion::{
3333
execution::{RecordBatchStream, SendableRecordBatchStream},
3434
};
3535
use futures::{Stream, TryStreamExt};
36+
use tokio::runtime::Runtime;
3637

3738
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
3839

@@ -58,20 +59,36 @@ pub struct FFI_RecordBatchStream {
5859
pub private_data: *mut c_void,
5960
}
6061

62+
pub struct RecordBatchStreamPrivateData {
63+
pub rbs: SendableRecordBatchStream,
64+
pub runtime: Option<Arc<Runtime>>,
65+
}
66+
6167
impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
6268
fn from(stream: SendableRecordBatchStream) -> Self {
69+
Self::new(stream, None)
70+
}
71+
}
72+
73+
impl FFI_RecordBatchStream {
74+
pub fn new(stream: SendableRecordBatchStream, runtime: Option<Arc<Runtime>>) -> Self {
75+
let private_data = Box::into_raw(Box::new(RecordBatchStreamPrivateData {
76+
rbs: stream,
77+
runtime,
78+
})) as *mut c_void;
6379
FFI_RecordBatchStream {
6480
poll_next: poll_next_fn_wrapper,
6581
schema: schema_fn_wrapper,
66-
private_data: Box::into_raw(Box::new(stream)) as *mut c_void,
82+
private_data,
6783
}
6884
}
6985
}
7086

7187
unsafe impl Send for FFI_RecordBatchStream {}
7288

7389
unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> WrappedSchema {
74-
let stream = stream.private_data as *const SendableRecordBatchStream;
90+
let private_data = stream.private_data as *const RecordBatchStreamPrivateData;
91+
let stream = &(*private_data).rbs;
7592

7693
(*stream).schema().into()
7794
}
@@ -106,7 +123,10 @@ unsafe extern "C" fn poll_next_fn_wrapper(
106123
stream: &FFI_RecordBatchStream,
107124
cx: &mut FfiContext,
108125
) -> FfiPoll<ROption<RResult<WrappedArray, RString>>> {
109-
let stream = stream.private_data as *mut SendableRecordBatchStream;
126+
let private_data = stream.private_data as *mut RecordBatchStreamPrivateData;
127+
let stream = &mut (*private_data).rbs;
128+
129+
let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter());
110130

111131
let poll_result = cx.with_context(|std_cx| {
112132
(*stream)

datafusion/ffi/src/table_provider.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion_proto::{
4040
protobuf::LogicalExprList,
4141
};
4242
use prost::Message;
43+
use tokio::runtime::Runtime;
4344

4445
use crate::{
4546
arrow_wrappers::WrappedSchema,
@@ -139,6 +140,9 @@ pub struct FFI_TableProvider {
139140
/// Release the memory of the private data when it is no longer being used.
140141
pub release: unsafe extern "C" fn(arg: &mut Self),
141142

143+
/// Return the major DataFusion version number of this provider.
144+
pub version: unsafe extern "C" fn() -> u64,
145+
142146
/// Internal data. This is only to be accessed by the provider of the plan.
143147
/// A [`ForeignExecutionPlan`] should never attempt to access this data.
144148
pub private_data: *mut c_void,
@@ -149,6 +153,7 @@ unsafe impl Sync for FFI_TableProvider {}
149153

150154
struct ProviderPrivateData {
151155
provider: Arc<dyn TableProvider + Send>,
156+
runtime: Option<Arc<Runtime>>,
152157
}
153158

154159
unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
@@ -216,6 +221,7 @@ unsafe extern "C" fn scan_fn_wrapper(
216221
let private_data = provider.private_data as *mut ProviderPrivateData;
217222
let internal_provider = &(*private_data).provider;
218223
let session_config = session_config.clone();
224+
let runtime = &(*private_data).runtime;
219225

220226
async move {
221227
let config = match ForeignSessionConfig::try_from(&session_config) {
@@ -261,7 +267,11 @@ unsafe extern "C" fn scan_fn_wrapper(
261267
Err(e) => return RResult::RErr(e.to_string().into()),
262268
};
263269

264-
RResult::ROk(FFI_ExecutionPlan::new(plan, ctx.task_ctx()))
270+
RResult::ROk(FFI_ExecutionPlan::new(
271+
plan,
272+
ctx.task_ctx(),
273+
runtime.clone(),
274+
))
265275
}
266276
.into_ffi()
267277
}
@@ -273,9 +283,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
273283

274284
unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
275285
let old_private_data = provider.private_data as *const ProviderPrivateData;
286+
let runtime = (*old_private_data).runtime.clone();
276287

277288
let private_data = Box::into_raw(Box::new(ProviderPrivateData {
278289
provider: Arc::clone(&(*old_private_data).provider),
290+
runtime,
279291
})) as *mut c_void;
280292

281293
FFI_TableProvider {
@@ -285,6 +297,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table
285297
supports_filters_pushdown: provider.supports_filters_pushdown,
286298
clone: clone_fn_wrapper,
287299
release: release_fn_wrapper,
300+
version: super::version,
288301
private_data,
289302
}
290303
}
@@ -300,8 +313,9 @@ impl FFI_TableProvider {
300313
pub fn new(
301314
provider: Arc<dyn TableProvider + Send>,
302315
can_support_pushdown_filters: bool,
316+
runtime: Option<Arc<Runtime>>,
303317
) -> Self {
304-
let private_data = Box::new(ProviderPrivateData { provider });
318+
let private_data = Box::new(ProviderPrivateData { provider, runtime });
305319

306320
Self {
307321
schema: schema_fn_wrapper,
@@ -313,6 +327,7 @@ impl FFI_TableProvider {
313327
},
314328
clone: clone_fn_wrapper,
315329
release: release_fn_wrapper,
330+
version: super::version,
316331
private_data: Box::into_raw(private_data) as *mut c_void,
317332
}
318333
}
@@ -463,7 +478,7 @@ mod tests {
463478
let provider =
464479
Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?);
465480

466-
let ffi_provider = FFI_TableProvider::new(provider, true);
481+
let ffi_provider = FFI_TableProvider::new(provider, true, None);
467482

468483
let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into();
469484

0 commit comments

Comments
 (0)