diff --git a/python/deltalake/query.py b/python/deltalake/query.py index 06e5144d24..f724161e3e 100644 --- a/python/deltalake/query.py +++ b/python/deltalake/query.py @@ -9,8 +9,15 @@ from deltalake.table import DeltaTable from deltalake.warnings import ExperimentalWarning - class QueryBuilder: + """ + QueryBuilder is an experimental API which exposes Apache DataFusion SQL to Python users of the deltalake library. + + This API is subject to change. + + >>> qb = QueryBuilder() + """ + def __init__(self) -> None: warnings.warn( "QueryBuilder is experimental and subject to change", @@ -19,7 +26,20 @@ def __init__(self) -> None: self._query_builder = PyQueryBuilder() def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: - """Add a table to the query builder.""" + """ + Add a table to the query builder instance by name. The `table_name` + will be how the referenced `DeltaTable` can be referenced in SQL + queries. + + For example: + + >>> tmp = getfixture('tmp_path') + >>> import pyarrow as pa + >>> from deltalake import DeltaTable, QueryBuilder + >>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())])) + >>> qb = QueryBuilder().register('test', dt) + >>> assert qb is not None + """ self._query_builder.register( table_name=table_name, delta_table=delta_table._table, @@ -27,5 +47,17 @@ def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: return self def execute(self, sql: str) -> List[pyarrow.RecordBatch]: - """Execute the query and return a list of record batches.""" + """ + Execute the query and return a list of record batches + + For example: + + >>> tmp = getfixture('tmp_path') + >>> import pyarrow as pa + >>> from deltalake import DeltaTable, QueryBuilder + >>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())])) + >>> qb = QueryBuilder().register('test', dt) + >>> results = qb.execute('SELECT * FROM test') + >>> assert results is not None + """ return self._query_builder.execute(sql) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 116b1b0cf1..ee5261ab09 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -13,7 +13,7 @@ use std::sync::Arc; const DEFAULT_MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub(crate) struct FsConfig { pub(crate) root_url: String, pub(crate) options: HashMap, diff --git a/python/src/query.rs b/python/src/query.rs index af3da38eee..55889c567f 100644 --- a/python/src/query.rs +++ b/python/src/query.rs @@ -9,9 +9,14 @@ use pyo3::prelude::*; use crate::{error::PythonError, utils::rt, RawDeltaTable}; +/// PyQueryBuilder supports the _experimental_ `QueryBuilder` Pythoh interface which allows users +/// to take advantage of the [Apache DataFusion](https://datafusion.apache.org) engine already +/// present in the Python package. #[pyclass(module = "deltalake._internal")] +#[derive(Default)] pub(crate) struct PyQueryBuilder { - _ctx: SessionContext, + /// DataFusion [SessionContext] to hold mappings of registered tables + ctx: SessionContext, } #[pymethods] @@ -19,11 +24,16 @@ impl PyQueryBuilder { #[new] pub fn new() -> Self { let config = DeltaSessionConfig::default().into(); - let _ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new_with_config(config); - PyQueryBuilder { _ctx } + PyQueryBuilder { ctx } } + /// Register the given [RawDeltaTable] into the [SessionContext] using the provided + /// `table_name` + /// + /// Once called, the provided `delta_table` will be referencable in SQL queries so long as + /// another table of the same name is not registered over it. pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> { let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?; let log_store = delta_table._table.log_store(); @@ -37,17 +47,22 @@ impl PyQueryBuilder { .map_err(PythonError::from)?, ); - self._ctx + self.ctx .register_table(table_name, provider) .map_err(PythonError::from)?; Ok(()) } + /// Execute the given SQL command within the [SessionContext] of this instance + /// + /// **NOTE:** Since this function returns a materialized Python list of `RecordBatch` + /// instances, it may result unexpected memory consumption for queries which return large data + /// sets. pub fn execute(&self, py: Python, sql: &str) -> PyResult { let batches = py.allow_threads(|| { rt().block_on(async { - let df = self._ctx.sql(sql).await?; + let df = self.ctx.sql(sql).await?; df.collect().await }) .map_err(PythonError::from)