Skip to content

Commit

Permalink
Add pycapsule interface to provide table provider to DataFusion
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Saucer <timsaucer@gmail.com>
  • Loading branch information
timsaucer committed Nov 21, 2024
1 parent bf94295 commit e08a27c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
3 changes: 3 additions & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ delta_kernel.workspace = true
# arrow
arrow-schema = { workspace = true, features = ["serde"] }

# datafusion
datafusion-ffi = { workspace = true }

# serde
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
36 changes: 36 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,42 @@ def repair(
def transaction_versions(self) -> Dict[str, Transaction]:
return self._table.transaction_versions()

def __datafusion_table_provider__(self) -> Any:
"""Return the DataFusion table provider PyCapsule interface.
To support DataFusion features such as push down filtering, this function will return a PyCapsule
interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
you should not need to call this function directly. Instead you can use ``register_table_provider`` in
the DataFusion SessionContext.
Returns:
A PyCapsule DataFusion TableProvider interface.
Example:
```python
from deltalake import DeltaTable, write_deltalake
from datafusion import SessionContext
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
ctx = SessionContext()
ctx.register_table_provider("test", table)
ctx.table("test").show()
```
Results in
```
DataFrame()
+----+----+----+
| c3 | c1 | c2 |
+----+----+----+
| 4 | 6 | a |
| 6 | 5 | b |
| 5 | 4 | c |
+----+----+----+
```
"""
return self._table.__datafusion_table_provider__()

class TableMerger:
"""API for various table `MERGE` commands."""
Expand Down
16 changes: 15 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ mod schema;
mod utils;

use std::collections::{HashMap, HashSet};
use std::ffi::CString;
use std::future::IntoFuture;
use std::str::FromStr;
use std::sync::Arc;
use std::time;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, Duration, FixedOffset, Utc};
use datafusion_ffi::table_provider::FFI_TableProvider;
use delta_kernel::expressions::Scalar;
use delta_kernel::schema::StructField;
use deltalake::arrow::compute::concat_batches;
Expand Down Expand Up @@ -58,7 +61,7 @@ use futures::future::join_all;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyDict, PyFrozenSet};
use pyo3::types::{PyCapsule, PyDict, PyFrozenSet};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
Expand Down Expand Up @@ -1240,6 +1243,17 @@ impl RawDeltaTable {
.map(|(app_id, transaction)| (app_id, PyTransaction::from(transaction)))
.collect()
}

fn __datafusion_table_provider__<'py>(
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let name = CString::new("datafusion_table_provider").unwrap();

let provider = FFI_TableProvider::new(Arc::new(self._table.clone()), false);

PyCapsule::new_bound(py, provider, Some(name.clone()))
}
}

fn set_post_commithook_properties(
Expand Down

0 comments on commit e08a27c

Please sign in to comment.