Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"rust/lance-linalg",
"rust/lance-namespace",
"rust/lance-namespace-impls",
"rust/lance-namespace-datafusion",
"rust/lance-table",
"rust/lance-test-macros",
"rust/lance-testing",
Expand Down
30 changes: 30 additions & 0 deletions rust/lance-namespace-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "lance-namespace-datafusion"
description = "Lance namespace integration with Apache DataFusion catalogs and schemas"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
keywords.workspace = true
categories.workspace = true
rust-version.workspace = true

[dependencies]
async-trait.workspace = true
dashmap = "6"
datafusion.workspace = true
lance.workspace = true
lance-namespace.workspace = true
tokio.workspace = true

[dev-dependencies]
arrow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
datafusion-sql.workspace = true
lance-namespace-impls.workspace = true
tempfile.workspace = true

[lints]
workspace = true
46 changes: 46 additions & 0 deletions rust/lance-namespace-datafusion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Lance Namespace-DataFusion Integration

This crate provides a bridge between Lance Namespaces and Apache DataFusion, allowing Lance tables to be queried as if they were native DataFusion catalogs, schemas, and tables.

It exposes a `SessionBuilder` that constructs a DataFusion `SessionContext` with `CatalogProvider` and `SchemaProvider` implementations backed by a `lance_namespace::LanceNamespace` instance.

## Features

- **Dynamic Catalogs**: Maps top-level Lance namespaces to DataFusion catalogs.
- **Dynamic Schemas**: Maps child namespaces to DataFusion schemas.
- **Lazy Table Loading**: Tables are loaded on-demand from the namespace when queried.
- **Read-Only**: This integration focuses solely on providing read access (SQL `SELECT`) to Lance datasets. DML operations are not included.

## Usage

First, build a `LanceNamespace` (e.g., from a directory), then use the `SessionBuilder` to create a `SessionContext`.

```rust,ignore
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use lance_namespace_datafusion::SessionBuilder;
use lance_namespace::LanceNamespace;
use lance_namespace_impls::DirectoryNamespaceBuilder;

async fn run_query() {
// 1. Create a Lance Namespace
let temp_dir = tempfile::tempdir().unwrap();
let ns: Arc<dyn LanceNamespace> = Arc::new(
DirectoryNamespaceBuilder::new(temp_dir.path().to_string_lossy().to_string())
.build()
.await
.unwrap(),
);

// 2. Build a DataFusion SessionContext
let ctx = SessionBuilder::new()
.with_root(ns.into())
.build()
.await
.unwrap();

// 3. Run a SQL query
let df = ctx.sql("SELECT * FROM my_catalog.my_schema.my_table").await.unwrap();
df.show().await.unwrap();
}
```
145 changes: 145 additions & 0 deletions rust/lance-namespace-datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use dashmap::DashMap;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
use datafusion::error::Result;

use crate::namespace_level::NamespaceLevel;
use crate::schema::LanceSchemaProvider;
#[allow(unused_imports)]
use crate::SessionBuilder;

/// A dynamic [`CatalogProviderList`] that maps Lance namespaces to catalogs.
///
/// The underlying namespace must be a four-level namespace. It is explicitly configured
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must be a four-level namespace

Why is this necessary? I wonder if we can work around that.

For example, it's a three level namespace, it could be:

DEFAULT > LVL1 > LVL2 > LVL3

And then if it's a two level namespace, it could be

DEFAULT > DEFAULT > LVL1 > LVL2

There might be some other standard name besides default that would make more sense (maybe what other DataFusion plugins do), but you get the idea.

Copy link
Contributor Author

@majin1102 majin1102 Jan 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the docs might be somewhat misleading(have updated it). Let me clarify:

  1. First, DataFusion only has three-level metadata. The CatalogProviderList is optional. As mentioned, we use SessionBuilder::with_root to configure a four-level namespace (let’s call this the root). In this case, all three-level child namespaces under the root are automatically registered as LanceCatalogProviders. This four-level namespace acts like a “catalog of catalogs” and is purely for convenience—instead of calling add_catalog() for each catalog individually.

  2. We can always use SessionBuilder::add_catalog to manually register a catalog provider, regardless of whether the four-level namespace is configured.

  3. I think what you’re really concerned about might be whether we can write queries against two- or three-level tables (e.g., SELECT * FROM db.tb or SELECT * FROM tb) when the four-level namespace is in use. The answer is yes. To do this, we need to configure the default catalog and schema names. If we don’t set them explicitly, they default to "datafusion" (for catalog) and "public" (for schema). That means db.tb would be interpreted as datafusion.db.tb, and tb as datafusion.public.tb.

  4. I agree this is an important point. I’ve added methods to configure the default catalog and schema, and included examples in the tests.

/// via [`SessionBuilder::with_root`], and each child namespace under this root is
/// automatically registered as a [`LanceCatalogProvider`].
///
/// This `CatalogProviderList` is optional when building a DataFusion `SessionContext`.
/// If not provided, you can still configure catalogs using
/// [`SessionBuilder::add_catalog`] or set a default catalog via
/// [`SessionBuilder::with_default_catalog`].
#[derive(Debug, Clone)]
pub struct LanceCatalogProviderList {
/// Root Lance namespace used to resolve catalogs / schemas / tables.
#[allow(dead_code)]
ns_level: NamespaceLevel,
/// Catalogs that have been loaded from the root namespace.
///
/// Note: The values in this map may become stale over time, as there is currently
/// no mechanism to automatically refresh or invalidate cached catalog providers.
catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}

impl LanceCatalogProviderList {
pub async fn try_new(namespace: NamespaceLevel) -> Result<Self> {
let catalogs = DashMap::new();
for child_namespace in namespace.children().await? {
let catalog_name = child_namespace.name().to_string();
let catalog_provider = Arc::new(LanceCatalogProvider::try_new(child_namespace).await?);
catalogs.insert(catalog_name, catalog_provider as Arc<dyn CatalogProvider>);
}

Ok(Self {
ns_level: namespace,
catalogs,
})
}
}

impl CatalogProviderList for LanceCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}

/// Adds a new catalog to this catalog list.
/// If a catalog of the same name existed before, it is replaced in the list and returned.
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.insert(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
self.catalogs
.iter()
.map(|entry| entry.key().clone())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs
.get(name)
.map(|entry| Arc::clone(entry.value()))
}
}

/// A dynamic [`CatalogProvider`] that exposes the immediate child namespaces
/// of a Lance namespace as database schemas.
///
/// The underlying namespace must be a three-level namespace. It is either explicitly
/// registered via [`SessionBuilder::add_catalog`], or automatically created as part of
/// the catalog hierarchy when [`SessionBuilder::with_root`] is used.
/// Child namespaces are automatically loaded as [`LanceSchemaProvider`] instances.
#[derive(Debug, Clone)]
pub struct LanceCatalogProvider {
#[allow(dead_code)]
ns_level: NamespaceLevel,
/// Note: The values in this map may become stale over time, as there is currently
/// no mechanism to automatically refresh or invalidate cached schema providers.
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}

impl LanceCatalogProvider {
pub async fn try_new(namespace: NamespaceLevel) -> Result<Self> {
let schemas = DashMap::new();
for child_namespace in namespace.children().await? {
let schema_name = child_namespace.name().to_string();
let schema_provider = Arc::new(LanceSchemaProvider::try_new(child_namespace).await?);
schemas.insert(schema_name, schema_provider as Arc<dyn SchemaProvider>);
}

Ok(Self {
ns_level: namespace,
schemas,
})
}
}

impl CatalogProvider for LanceCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.schemas
.iter()
.map(|entry| entry.key().clone())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}

fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas
.get(schema_name)
.map(|entry| Arc::clone(entry.value()))
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.to_string(), schema))
}
}
10 changes: 10 additions & 0 deletions rust/lance-namespace-datafusion/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use datafusion::error::DataFusionError;
use lance::Error;

/// Converts a lance error into a datafusion error.
pub fn to_datafusion_error(error: Error) -> DataFusionError {
DataFusionError::External(error.into())
}
13 changes: 13 additions & 0 deletions rust/lance-namespace-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod catalog;
pub mod error;
pub mod namespace_level;
pub mod schema;
pub mod session_builder;

pub use catalog::{LanceCatalogProvider, LanceCatalogProviderList};
pub use namespace_level::NamespaceLevel;
pub use schema::LanceSchemaProvider;
pub use session_builder::SessionBuilder;
Loading
Loading