diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 29d7a3082f..32c11bf203 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -376,6 +376,15 @@ impl RestCatalog { } } + /// Set a custom storage credentials loader. + /// + /// This is intended to be called after catalog construction, so the loader + /// can hold a reference to the catalog (e.g., `Arc`) and call + /// catalog-specific methods like `load_table_with_credentials`. + pub fn set_storage_credentials_loader(&mut self, loader: Arc) { + self.user_config.storage_credentials_loader = Some(loader); + } + /// Add an extension to the file IO builder. pub fn with_file_io_extension(mut self, ext: T) -> Self { self.file_io_extensions.add(ext); @@ -430,6 +439,7 @@ impl RestCatalog { metadata_location: Option<&str>, extra_config: Option>, storage_credential: Option, + table_ident: Option<&TableIdent>, ) -> Result { let mut props = self.context().await?.config.props.clone(); if let Some(config) = extra_config { @@ -458,6 +468,9 @@ impl RestCatalog { file_io_builder = file_io_builder.with_extension(MetadataLocation(loc.to_string())); } + if let Some(ident) = table_ident { + file_io_builder = file_io_builder.with_extension(ident.clone()); + } file_io_builder = file_io_builder.with_extension(loader.clone()); } @@ -570,7 +583,10 @@ impl RestCatalog { &self.user_config.storage_credentials_loader { let credential = storage_credentials_loader - .load_credentials(response.metadata_location.as_deref().unwrap_or("")) + .load_credentials( + table_ident, + response.metadata_location.as_deref().unwrap_or(""), + ) .await?; config.extend(credential.config.clone()); Some(credential) @@ -583,6 +599,7 @@ impl RestCatalog { response.metadata_location.as_deref(), Some(config), final_credential, + Some(table_ident), ) .await?; @@ -891,7 +908,7 @@ impl Catalog for RestCatalog { // TODO: Support vended credentials here. let file_io = self - .load_file_io(Some(metadata_location), Some(config), None) + .load_file_io(Some(metadata_location), Some(config), None, None) .await?; let table_builder = Table::builder() @@ -1033,7 +1050,7 @@ impl Catalog for RestCatalog { // TODO: Support vended credentials here. let file_io = self - .load_file_io(Some(metadata_location), None, None) + .load_file_io(Some(metadata_location), None, None, None) .await?; Table::builder() @@ -1100,7 +1117,7 @@ impl Catalog for RestCatalog { // TODO: Support vended credentials here. let file_io = self - .load_file_io(Some(&response.metadata_location), None, None) + .load_file_io(Some(&response.metadata_location), None, None, None) .await?; Table::builder() @@ -3034,7 +3051,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for DummyCredentialLoader { - async fn load_credentials(&self, _location: &str) -> Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> Result { self.was_called.store(true, Ordering::SeqCst); let mut config = HashMap::new(); config.insert("custom.key".to_string(), "custom.value".to_string()); diff --git a/crates/iceberg/src/io/opendal/mod.rs b/crates/iceberg/src/io/opendal/mod.rs index 7a7b5d7b42..f1a1dce7d2 100644 --- a/crates/iceberg/src/io/opendal/mod.rs +++ b/crates/iceberg/src/io/opendal/mod.rs @@ -45,6 +45,7 @@ use super::{ FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, MetadataLocation, OutputFile, Storage, StorageConfig, StorageCredential, StorageCredentialsLoader, StorageFactory, }; +use crate::catalog::TableIdent; use crate::{Error, ErrorKind, Result}; #[cfg(feature = "storage-azdls")] @@ -233,12 +234,22 @@ impl OpenDalStorage { .get::() .map(|l| l.0.clone()) .unwrap_or_default(); + let table_ident = extensions + .get::() + .map(|arc| (*arc).clone()) + .unwrap_or_else(|| { + TableIdent::new( + crate::NamespaceIdent::new("unknown".to_string()), + "unknown".to_string(), + ) + }); let backend = RefreshableOpenDalStorageBuilder::new() .scheme(scheme_str) .base_props(props) .credentials_loader(Arc::clone(&loader)) .initial_credentials(initial_creds) .location(location) + .table_ident(table_ident) .extensions(extensions) .build()?; return Ok(Self::Refreshable { @@ -534,7 +545,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for TestCredentialLoader { - async fn load_credentials(&self, _location: &str) -> crate::Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> crate::Result { Ok(StorageCredential { prefix: "s3://test/".to_string(), config: HashMap::new(), diff --git a/crates/iceberg/src/io/refreshable_accessor.rs b/crates/iceberg/src/io/refreshable_accessor.rs index 25372dd7e2..8863ea54b7 100644 --- a/crates/iceberg/src/io/refreshable_accessor.rs +++ b/crates/iceberg/src/io/refreshable_accessor.rs @@ -227,6 +227,8 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; + use crate::NamespaceIdent; + use crate::catalog::TableIdent; use crate::io::refreshable_storage::RefreshableOpenDalStorageBuilder; use crate::io::{StorageCredential, StorageCredentialsLoader}; @@ -259,7 +261,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for SequenceLoader { - async fn load_credentials(&self, _location: &str) -> crate::Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> crate::Result { self.call_count.fetch_add(1, Ordering::SeqCst); let mut responses = self.responses.lock().unwrap(); Ok(responses.pop_front().unwrap_or_else(dummy_credential)) @@ -382,6 +388,10 @@ mod tests { .scheme("memory".to_string()) .base_props(HashMap::new()) .credentials_loader(Arc::clone(&loader)) + .table_ident(TableIdent::new( + NamespaceIdent::new("test_ns".to_string()), + "test_table".to_string(), + )) .build() .expect("Failed to build storage"); @@ -477,6 +487,10 @@ mod tests { .scheme("memory".to_string()) .base_props(HashMap::new()) .credentials_loader(Arc::clone(&loader) as _) + .table_ident(TableIdent::new( + NamespaceIdent::new("test_ns".to_string()), + "test_table".to_string(), + )) .build() .expect("Failed to build storage"); diff --git a/crates/iceberg/src/io/refreshable_storage.rs b/crates/iceberg/src/io/refreshable_storage.rs index 7fe1c0b424..3a79f02b94 100644 --- a/crates/iceberg/src/io/refreshable_storage.rs +++ b/crates/iceberg/src/io/refreshable_storage.rs @@ -25,6 +25,7 @@ use tokio::sync::Mutex as AsyncMutex; use super::opendal::OpenDalStorage; use super::refreshable_accessor::RefreshableAccessor; +use crate::catalog::TableIdent; use crate::io::file_io::Extensions; use crate::io::{StorageCredential, StorageCredentialsLoader}; use crate::{Error, ErrorKind, Result}; @@ -52,6 +53,9 @@ pub struct RefreshableOpenDalStorage { /// Metadata location passed to `load_credentials` location: String, + /// Table identifier passed to `load_credentials` + table_ident: TableIdent, + /// Cached AccessorInfo (created lazily from first operator) cached_info: Mutex>>, @@ -88,6 +92,7 @@ impl RefreshableOpenDalStorage { credentials_loader: Arc, initial_credentials: Option, location: String, + table_ident: TableIdent, extensions: Extensions, ) -> Result { // Build initial inner_storage from base_props + initial_credentials @@ -104,6 +109,7 @@ impl RefreshableOpenDalStorage { credentials_loader, extensions, location, + table_ident, cached_info: Mutex::new(None), credential_version: AtomicU64::new(0), refresh_lock: AsyncMutex::new(()), @@ -197,7 +203,7 @@ impl RefreshableOpenDalStorage { // We are the one who should call the loader let new_creds = self .credentials_loader - .load_credentials(&self.location) + .load_credentials(&self.table_ident, &self.location) .await?; self.do_refresh(new_creds)?; Ok(self.credential_version.load(Ordering::Acquire)) @@ -212,6 +218,7 @@ pub struct RefreshableOpenDalStorageBuilder { credentials_loader: Option>, initial_credentials: Option, location: String, + table_ident: Option, extensions: Extensions, } @@ -251,6 +258,12 @@ impl RefreshableOpenDalStorageBuilder { self } + /// Set the table identifier passed to `load_credentials` + pub fn table_ident(mut self, table_ident: TableIdent) -> Self { + self.table_ident = Some(table_ident); + self + } + /// Set the extensions pub fn extensions(mut self, extensions: Extensions) -> Self { self.extensions = extensions; @@ -268,6 +281,8 @@ impl RefreshableOpenDalStorageBuilder { })?, self.initial_credentials, self.location, + self.table_ident + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "table_ident is required"))?, self.extensions, )?)) } @@ -278,6 +293,7 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; + use crate::NamespaceIdent; use crate::io::StorageCredential; // --- Test helpers --- @@ -288,7 +304,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for SimpleLoader { - async fn load_credentials(&self, _location: &str) -> Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> Result { Ok(StorageCredential { prefix: "memory:/refreshed/".to_string(), config: HashMap::from([("refreshed_key".to_string(), "refreshed_val".to_string())]), @@ -322,7 +342,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for TrackingRefreshLoader { - async fn load_credentials(&self, _location: &str) -> Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> Result { let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1; Ok(StorageCredential { prefix: format!("memory:/refresh-{n}/"), @@ -331,6 +355,13 @@ mod tests { } } + fn test_table_ident() -> TableIdent { + TableIdent::new( + NamespaceIdent::new("test_ns".to_string()), + "test_table".to_string(), + ) + } + fn build_memory_refreshable( loader: Arc, ) -> Arc { @@ -338,6 +369,7 @@ mod tests { .scheme("memory".to_string()) .base_props(HashMap::new()) .credentials_loader(loader) + .table_ident(test_table_ident()) .build() .expect("Failed to build RefreshableOpenDalStorage for memory") } @@ -346,7 +378,7 @@ mod tests { async fn refresh(storage: &RefreshableOpenDalStorage) -> Result<()> { let new_creds = storage .credentials_loader - .load_credentials(&storage.location) + .load_credentials(&storage.table_ident, &storage.location) .await?; storage.do_refresh(new_creds) } diff --git a/crates/iceberg/src/io/storage_credential.rs b/crates/iceberg/src/io/storage_credential.rs index 3e33c7bbf1..fb5fe0ef24 100644 --- a/crates/iceberg/src/io/storage_credential.rs +++ b/crates/iceberg/src/io/storage_credential.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fmt::Debug; use crate::Result; +use crate::catalog::TableIdent; /// Storage credentials for accessing cloud storage. /// @@ -54,7 +55,11 @@ pub struct MetadataLocation(pub String); /// /// #[async_trait::async_trait] /// impl StorageCredentialsLoader for MyCredentialLoader { -/// async fn load_credentials(&self, location: &str) -> iceberg::Result { +/// async fn load_credentials( +/// &self, +/// _table_ident: &iceberg::TableIdent, +/// location: &str, +/// ) -> iceberg::Result { /// // Fetch fresh credentials from your credential service /// let mut config = HashMap::new(); /// config.insert("access_key_id".to_string(), "fresh-key".to_string()); @@ -85,6 +90,11 @@ pub trait StorageCredentialsLoader: Send + Sync + Debug { /// Load storage credentials using custom user-defined logic. /// /// # Arguments + /// * `table_ident` - The table identifier for which credentials are being loaded /// * `location` - The full path being accessed (e.g., "s3://bucket/path/file.parquet") - async fn load_credentials(&self, location: &str) -> Result; + async fn load_credentials( + &self, + table_ident: &TableIdent, + location: &str, + ) -> Result; }