Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
self.inner.register_store(url, store)
}

fn deregister_store(
&self,
url: &Url,
) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
self.inner.deregister_store(url)
}

fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
self.inner.get_store(url)
}
Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,13 @@ impl SessionContext {
self.runtime_env().register_object_store(url, object_store)
}

/// Deregisters an [`ObjectStore`] associated with the specific URL prefix.
///
/// See [`RuntimeEnv::deregister_object_store`] for more details.
pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
self.runtime_env().deregister_object_store(url)
}

/// Registers the [`RecordBatch`] as the specified table name
pub fn register_batch(
&self,
Expand Down
22 changes: 21 additions & 1 deletion datafusion/execution/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
//! and query data inside these systems.

use dashmap::DashMap;
use datafusion_common::{exec_err, internal_datafusion_err, DataFusionError, Result};
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, Result,
};
#[cfg(not(target_arch = "wasm32"))]
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
Expand Down Expand Up @@ -154,6 +156,13 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>>;

/// Deregister the store previously registered with the same key. Returns the
/// deregistered store if it existed.
#[allow(unused_variables)]
fn deregister_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
not_impl_err!("ObjectStoreRegistry::deregister_store is not implemented for this ObjectStoreRegistry")
}

/// Get a suitable store for the provided URL. For example:
///
/// - URL with scheme `file:///` or no scheme will return the default LocalFS store
Expand Down Expand Up @@ -230,6 +239,17 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
self.object_stores.insert(s, store)
}

fn deregister_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
let s = get_url_key(url);
let (_, object_store) = self.object_stores
.remove(&s)
.ok_or_else(|| {
internal_datafusion_err!("Failed to deregister object store. No suitable object store found for {url}. See `RuntimeEnv::register_object_store`")
})?;

Ok(object_store)
}

fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
let s = get_url_key(url);
self.object_stores
Expand Down
8 changes: 6 additions & 2 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ impl RuntimeEnv {
/// ```
///
/// # Example: Register remote URL object store like [Github](https://github.com)
///
///
/// ```
/// # use std::sync::Arc;
/// # use url::Url;
Expand All @@ -141,6 +139,12 @@ impl RuntimeEnv {
self.object_store_registry.register_store(url, object_store)
}

/// Deregisters a custom `ObjectStore` previously registered for a specific url.
/// See [`ObjectStoreRegistry::deregister_store`] for more details.
pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
self.object_store_registry.deregister_store(url)
}

/// Retrieves a `ObjectStore` instance for a url by consulting the
/// registry. See [`ObjectStoreRegistry::get_store`] for more
/// details.
Expand Down