Skip to content

Commit

Permalink
chore: add some more documentation to the new QueryBuilder interface
Browse files Browse the repository at this point in the history
Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
rtyler committed Nov 29, 2024
1 parent 3335a03 commit 822606c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
38 changes: 35 additions & 3 deletions python/deltalake/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
from deltalake.table import DeltaTable
from deltalake.warnings import ExperimentalWarning


class QueryBuilder:
"""
QueryBuilder is an experimental API which exposes Apache DataFusion SQL to Python users of the deltalake library.
This API is subject to change.
>>> qb = QueryBuilder()
"""

def __init__(self) -> None:
warnings.warn(
"QueryBuilder is experimental and subject to change",
Expand All @@ -19,13 +26,38 @@ def __init__(self) -> None:
self._query_builder = PyQueryBuilder()

def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder:
"""Add a table to the query builder."""
"""
Add a table to the query builder instance by name. The `table_name`
will be how the referenced `DeltaTable` can be referenced in SQL
queries.
For example:
>>> tmp = getfixture('tmp_path')
>>> import pyarrow as pa
>>> from deltalake import DeltaTable, QueryBuilder
>>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())]))
>>> qb = QueryBuilder().register('test', dt)
>>> assert qb is not None
"""
self._query_builder.register(
table_name=table_name,
delta_table=delta_table._table,
)
return self

def execute(self, sql: str) -> List[pyarrow.RecordBatch]:
"""Execute the query and return a list of record batches."""
"""
Execute the query and return a list of record batches
For example:
>>> tmp = getfixture('tmp_path')
>>> import pyarrow as pa
>>> from deltalake import DeltaTable, QueryBuilder
>>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())]))
>>> qb = QueryBuilder().register('test', dt)
>>> results = qb.execute('SELECT * FROM test')
>>> assert results is not None
"""
return self._query_builder.execute(sql)
2 changes: 1 addition & 1 deletion python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;

const DEFAULT_MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(crate) struct FsConfig {
pub(crate) root_url: String,
pub(crate) options: HashMap<String, String>,
Expand Down
25 changes: 20 additions & 5 deletions python/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,31 @@ use pyo3::prelude::*;

use crate::{error::PythonError, utils::rt, RawDeltaTable};

/// PyQueryBuilder supports the _experimental_ `QueryBuilder` Pythoh interface which allows users
/// to take advantage of the [Apache DataFusion](https://datafusion.apache.org) engine already
/// present in the Python package.
#[pyclass(module = "deltalake._internal")]
#[derive(Default)]
pub(crate) struct PyQueryBuilder {
_ctx: SessionContext,
/// DataFusion [SessionContext] to hold mappings of registered tables
ctx: SessionContext,
}

#[pymethods]
impl PyQueryBuilder {
#[new]
pub fn new() -> Self {
let config = DeltaSessionConfig::default().into();
let _ctx = SessionContext::new_with_config(config);
let ctx = SessionContext::new_with_config(config);

PyQueryBuilder { _ctx }
PyQueryBuilder { ctx }
}

/// Register the given [RawDeltaTable] into the [SessionContext] using the provided
/// `table_name`
///
/// Once called, the provided `delta_table` will be referencable in SQL queries so long as
/// another table of the same name is not registered over it.
pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> {
let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?;
let log_store = delta_table._table.log_store();
Expand All @@ -37,17 +47,22 @@ impl PyQueryBuilder {
.map_err(PythonError::from)?,
);

self._ctx
self.ctx
.register_table(table_name, provider)
.map_err(PythonError::from)?;

Ok(())
}

/// Execute the given SQL command within the [SessionContext] of this instance
///
/// **NOTE:** Since this function returns a materialized Python list of `RecordBatch`
/// instances, it may result unexpected memory consumption for queries which return large data
/// sets.
pub fn execute(&self, py: Python, sql: &str) -> PyResult<PyObject> {
let batches = py.allow_threads(|| {
rt().block_on(async {
let df = self._ctx.sql(sql).await?;
let df = self.ctx.sql(sql).await?;
df.collect().await
})
.map_err(PythonError::from)
Expand Down

0 comments on commit 822606c

Please sign in to comment.