From 0a4ba318d2d8d157d4322f168d00a535398674f1 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 13 Feb 2026 11:17:26 +0100 Subject: [PATCH 1/3] builder method for storage credentials loader --- crates/catalog/rest/src/catalog.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 29d7a3082f..4c35e722ec 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -148,6 +148,18 @@ impl RestCatalogBuilder { self.0.authenticator = Some(authenticator); self } + + /// Set a custom storage credentials loader. + /// + /// The loader will be used to obtain storage credentials instead of expecting + /// them to be vended from the catalog. + pub fn with_storage_credentials_loader( + mut self, + loader: Arc, + ) -> Self { + self.0.storage_credentials_loader = Some(loader); + self + } } /// Trait for custom storage credential loader. From e468b2ee9035a27871bf9cfde1b2c9e427839f77 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 13 Feb 2026 12:04:57 +0100 Subject: [PATCH 2/3] pass tableident to loader, and catalog setter for credentials loader --- crates/catalog/rest/src/catalog.rs | 42 ++++++++++++------- crates/iceberg/src/io/opendal/mod.rs | 17 +++++++- crates/iceberg/src/io/refreshable_accessor.rs | 16 ++++++- crates/iceberg/src/io/refreshable_storage.rs | 38 +++++++++++++++-- crates/iceberg/src/io/storage_credential.rs | 10 ++++- 5 files changed, 99 insertions(+), 24 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 4c35e722ec..50efc88d3e 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -149,17 +149,6 @@ impl RestCatalogBuilder { self } - /// Set a custom storage credentials loader. - /// - /// The loader will be used to obtain storage credentials instead of expecting - /// them to be vended from the catalog. - pub fn with_storage_credentials_loader( - mut self, - loader: Arc, - ) -> Self { - self.0.storage_credentials_loader = Some(loader); - self - } } /// Trait for custom storage credential loader. @@ -388,6 +377,18 @@ impl RestCatalog { } } + /// Set a custom storage credentials loader. + /// + /// This is intended to be called after catalog construction, so the loader + /// can hold a reference to the catalog (e.g., `Arc`) and call + /// catalog-specific methods like `load_table_with_credentials`. + pub fn set_storage_credentials_loader( + &mut self, + loader: Arc, + ) { + self.user_config.storage_credentials_loader = Some(loader); + } + /// Add an extension to the file IO builder. pub fn with_file_io_extension(mut self, ext: T) -> Self { self.file_io_extensions.add(ext); @@ -442,6 +443,7 @@ impl RestCatalog { metadata_location: Option<&str>, extra_config: Option>, storage_credential: Option, + table_ident: Option<&TableIdent>, ) -> Result { let mut props = self.context().await?.config.props.clone(); if let Some(config) = extra_config { @@ -470,6 +472,9 @@ impl RestCatalog { file_io_builder = file_io_builder.with_extension(MetadataLocation(loc.to_string())); } + if let Some(ident) = table_ident { + file_io_builder = file_io_builder.with_extension(ident.clone()); + } file_io_builder = file_io_builder.with_extension(loader.clone()); } @@ -582,7 +587,7 @@ impl RestCatalog { &self.user_config.storage_credentials_loader { let credential = storage_credentials_loader - .load_credentials(response.metadata_location.as_deref().unwrap_or("")) + .load_credentials(table_ident, response.metadata_location.as_deref().unwrap_or("")) .await?; config.extend(credential.config.clone()); Some(credential) @@ -595,6 +600,7 @@ impl RestCatalog { response.metadata_location.as_deref(), Some(config), final_credential, + Some(table_ident), ) .await?; @@ -903,7 +909,7 @@ impl Catalog for RestCatalog { // TODO: Support vended credentials here. let file_io = self - .load_file_io(Some(metadata_location), Some(config), None) + .load_file_io(Some(metadata_location), Some(config), None, None) .await?; let table_builder = Table::builder() @@ -1045,7 +1051,7 @@ impl Catalog for RestCatalog { // TODO: Support vended credentials here. let file_io = self - .load_file_io(Some(metadata_location), None, None) + .load_file_io(Some(metadata_location), None, None, None) .await?; Table::builder() @@ -1112,7 +1118,7 @@ impl Catalog for RestCatalog { // TODO: Support vended credentials here. let file_io = self - .load_file_io(Some(&response.metadata_location), None, None) + .load_file_io(Some(&response.metadata_location), None, None, None) .await?; Table::builder() @@ -3046,7 +3052,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for DummyCredentialLoader { - async fn load_credentials(&self, _location: &str) -> Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> Result { self.was_called.store(true, Ordering::SeqCst); let mut config = HashMap::new(); config.insert("custom.key".to_string(), "custom.value".to_string()); diff --git a/crates/iceberg/src/io/opendal/mod.rs b/crates/iceberg/src/io/opendal/mod.rs index 7a7b5d7b42..f1a1dce7d2 100644 --- a/crates/iceberg/src/io/opendal/mod.rs +++ b/crates/iceberg/src/io/opendal/mod.rs @@ -45,6 +45,7 @@ use super::{ FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, MetadataLocation, OutputFile, Storage, StorageConfig, StorageCredential, StorageCredentialsLoader, StorageFactory, }; +use crate::catalog::TableIdent; use crate::{Error, ErrorKind, Result}; #[cfg(feature = "storage-azdls")] @@ -233,12 +234,22 @@ impl OpenDalStorage { .get::() .map(|l| l.0.clone()) .unwrap_or_default(); + let table_ident = extensions + .get::() + .map(|arc| (*arc).clone()) + .unwrap_or_else(|| { + TableIdent::new( + crate::NamespaceIdent::new("unknown".to_string()), + "unknown".to_string(), + ) + }); let backend = RefreshableOpenDalStorageBuilder::new() .scheme(scheme_str) .base_props(props) .credentials_loader(Arc::clone(&loader)) .initial_credentials(initial_creds) .location(location) + .table_ident(table_ident) .extensions(extensions) .build()?; return Ok(Self::Refreshable { @@ -534,7 +545,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for TestCredentialLoader { - async fn load_credentials(&self, _location: &str) -> crate::Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> crate::Result { Ok(StorageCredential { prefix: "s3://test/".to_string(), config: HashMap::new(), diff --git a/crates/iceberg/src/io/refreshable_accessor.rs b/crates/iceberg/src/io/refreshable_accessor.rs index 25372dd7e2..7b1b96d06f 100644 --- a/crates/iceberg/src/io/refreshable_accessor.rs +++ b/crates/iceberg/src/io/refreshable_accessor.rs @@ -227,8 +227,10 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; + use crate::catalog::TableIdent; use crate::io::refreshable_storage::RefreshableOpenDalStorageBuilder; use crate::io::{StorageCredential, StorageCredentialsLoader}; + use crate::NamespaceIdent; // --- Test helpers --- @@ -259,7 +261,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for SequenceLoader { - async fn load_credentials(&self, _location: &str) -> crate::Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> crate::Result { self.call_count.fetch_add(1, Ordering::SeqCst); let mut responses = self.responses.lock().unwrap(); Ok(responses.pop_front().unwrap_or_else(dummy_credential)) @@ -382,6 +388,10 @@ mod tests { .scheme("memory".to_string()) .base_props(HashMap::new()) .credentials_loader(Arc::clone(&loader)) + .table_ident(TableIdent::new( + NamespaceIdent::new("test_ns".to_string()), + "test_table".to_string(), + )) .build() .expect("Failed to build storage"); @@ -477,6 +487,10 @@ mod tests { .scheme("memory".to_string()) .base_props(HashMap::new()) .credentials_loader(Arc::clone(&loader) as _) + .table_ident(TableIdent::new( + NamespaceIdent::new("test_ns".to_string()), + "test_table".to_string(), + )) .build() .expect("Failed to build storage"); diff --git a/crates/iceberg/src/io/refreshable_storage.rs b/crates/iceberg/src/io/refreshable_storage.rs index 7fe1c0b424..abd4b8480d 100644 --- a/crates/iceberg/src/io/refreshable_storage.rs +++ b/crates/iceberg/src/io/refreshable_storage.rs @@ -25,6 +25,7 @@ use tokio::sync::Mutex as AsyncMutex; use super::opendal::OpenDalStorage; use super::refreshable_accessor::RefreshableAccessor; +use crate::catalog::TableIdent; use crate::io::file_io::Extensions; use crate::io::{StorageCredential, StorageCredentialsLoader}; use crate::{Error, ErrorKind, Result}; @@ -52,6 +53,9 @@ pub struct RefreshableOpenDalStorage { /// Metadata location passed to `load_credentials` location: String, + /// Table identifier passed to `load_credentials` + table_ident: TableIdent, + /// Cached AccessorInfo (created lazily from first operator) cached_info: Mutex>>, @@ -88,6 +92,7 @@ impl RefreshableOpenDalStorage { credentials_loader: Arc, initial_credentials: Option, location: String, + table_ident: TableIdent, extensions: Extensions, ) -> Result { // Build initial inner_storage from base_props + initial_credentials @@ -104,6 +109,7 @@ impl RefreshableOpenDalStorage { credentials_loader, extensions, location, + table_ident, cached_info: Mutex::new(None), credential_version: AtomicU64::new(0), refresh_lock: AsyncMutex::new(()), @@ -197,7 +203,7 @@ impl RefreshableOpenDalStorage { // We are the one who should call the loader let new_creds = self .credentials_loader - .load_credentials(&self.location) + .load_credentials(&self.table_ident, &self.location) .await?; self.do_refresh(new_creds)?; Ok(self.credential_version.load(Ordering::Acquire)) @@ -212,6 +218,7 @@ pub struct RefreshableOpenDalStorageBuilder { credentials_loader: Option>, initial_credentials: Option, location: String, + table_ident: Option, extensions: Extensions, } @@ -251,6 +258,12 @@ impl RefreshableOpenDalStorageBuilder { self } + /// Set the table identifier passed to `load_credentials` + pub fn table_ident(mut self, table_ident: TableIdent) -> Self { + self.table_ident = Some(table_ident); + self + } + /// Set the extensions pub fn extensions(mut self, extensions: Extensions) -> Self { self.extensions = extensions; @@ -268,6 +281,9 @@ impl RefreshableOpenDalStorageBuilder { })?, self.initial_credentials, self.location, + self.table_ident.ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "table_ident is required") + })?, self.extensions, )?)) } @@ -279,6 +295,7 @@ mod tests { use super::*; use crate::io::StorageCredential; + use crate::NamespaceIdent; // --- Test helpers --- @@ -288,7 +305,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for SimpleLoader { - async fn load_credentials(&self, _location: &str) -> Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> Result { Ok(StorageCredential { prefix: "memory:/refreshed/".to_string(), config: HashMap::from([("refreshed_key".to_string(), "refreshed_val".to_string())]), @@ -322,7 +343,11 @@ mod tests { #[async_trait::async_trait] impl StorageCredentialsLoader for TrackingRefreshLoader { - async fn load_credentials(&self, _location: &str) -> Result { + async fn load_credentials( + &self, + _table_ident: &TableIdent, + _location: &str, + ) -> Result { let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1; Ok(StorageCredential { prefix: format!("memory:/refresh-{n}/"), @@ -331,6 +356,10 @@ mod tests { } } + fn test_table_ident() -> TableIdent { + TableIdent::new(NamespaceIdent::new("test_ns".to_string()), "test_table".to_string()) + } + fn build_memory_refreshable( loader: Arc, ) -> Arc { @@ -338,6 +367,7 @@ mod tests { .scheme("memory".to_string()) .base_props(HashMap::new()) .credentials_loader(loader) + .table_ident(test_table_ident()) .build() .expect("Failed to build RefreshableOpenDalStorage for memory") } @@ -346,7 +376,7 @@ mod tests { async fn refresh(storage: &RefreshableOpenDalStorage) -> Result<()> { let new_creds = storage .credentials_loader - .load_credentials(&storage.location) + .load_credentials(&storage.table_ident, &storage.location) .await?; storage.do_refresh(new_creds) } diff --git a/crates/iceberg/src/io/storage_credential.rs b/crates/iceberg/src/io/storage_credential.rs index 3e33c7bbf1..85f83a7cbe 100644 --- a/crates/iceberg/src/io/storage_credential.rs +++ b/crates/iceberg/src/io/storage_credential.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::fmt::Debug; +use crate::catalog::TableIdent; use crate::Result; /// Storage credentials for accessing cloud storage. @@ -54,7 +55,7 @@ pub struct MetadataLocation(pub String); /// /// #[async_trait::async_trait] /// impl StorageCredentialsLoader for MyCredentialLoader { -/// async fn load_credentials(&self, location: &str) -> iceberg::Result { +/// async fn load_credentials(&self, _table_ident: &iceberg::TableIdent, location: &str) -> iceberg::Result { /// // Fetch fresh credentials from your credential service /// let mut config = HashMap::new(); /// config.insert("access_key_id".to_string(), "fresh-key".to_string()); @@ -85,6 +86,11 @@ pub trait StorageCredentialsLoader: Send + Sync + Debug { /// Load storage credentials using custom user-defined logic. /// /// # Arguments + /// * `table_ident` - The table identifier for which credentials are being loaded /// * `location` - The full path being accessed (e.g., "s3://bucket/path/file.parquet") - async fn load_credentials(&self, location: &str) -> Result; + async fn load_credentials( + &self, + table_ident: &TableIdent, + location: &str, + ) -> Result; } From c64aa3aaa103298a160c19b5cdfd97d818f1d03c Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 13 Feb 2026 12:53:06 +0100 Subject: [PATCH 3/3] cargo fmt --- crates/catalog/rest/src/catalog.rs | 11 +++++------ crates/iceberg/src/io/refreshable_accessor.rs | 2 +- crates/iceberg/src/io/refreshable_storage.rs | 12 +++++++----- crates/iceberg/src/io/storage_credential.rs | 8 ++++++-- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 50efc88d3e..32c11bf203 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -148,7 +148,6 @@ impl RestCatalogBuilder { self.0.authenticator = Some(authenticator); self } - } /// Trait for custom storage credential loader. @@ -382,10 +381,7 @@ impl RestCatalog { /// This is intended to be called after catalog construction, so the loader /// can hold a reference to the catalog (e.g., `Arc`) and call /// catalog-specific methods like `load_table_with_credentials`. - pub fn set_storage_credentials_loader( - &mut self, - loader: Arc, - ) { + pub fn set_storage_credentials_loader(&mut self, loader: Arc) { self.user_config.storage_credentials_loader = Some(loader); } @@ -587,7 +583,10 @@ impl RestCatalog { &self.user_config.storage_credentials_loader { let credential = storage_credentials_loader - .load_credentials(table_ident, response.metadata_location.as_deref().unwrap_or("")) + .load_credentials( + table_ident, + response.metadata_location.as_deref().unwrap_or(""), + ) .await?; config.extend(credential.config.clone()); Some(credential) diff --git a/crates/iceberg/src/io/refreshable_accessor.rs b/crates/iceberg/src/io/refreshable_accessor.rs index 7b1b96d06f..8863ea54b7 100644 --- a/crates/iceberg/src/io/refreshable_accessor.rs +++ b/crates/iceberg/src/io/refreshable_accessor.rs @@ -227,10 +227,10 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; + use crate::NamespaceIdent; use crate::catalog::TableIdent; use crate::io::refreshable_storage::RefreshableOpenDalStorageBuilder; use crate::io::{StorageCredential, StorageCredentialsLoader}; - use crate::NamespaceIdent; // --- Test helpers --- diff --git a/crates/iceberg/src/io/refreshable_storage.rs b/crates/iceberg/src/io/refreshable_storage.rs index abd4b8480d..3a79f02b94 100644 --- a/crates/iceberg/src/io/refreshable_storage.rs +++ b/crates/iceberg/src/io/refreshable_storage.rs @@ -281,9 +281,8 @@ impl RefreshableOpenDalStorageBuilder { })?, self.initial_credentials, self.location, - self.table_ident.ok_or_else(|| { - Error::new(ErrorKind::DataInvalid, "table_ident is required") - })?, + self.table_ident + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "table_ident is required"))?, self.extensions, )?)) } @@ -294,8 +293,8 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; - use crate::io::StorageCredential; use crate::NamespaceIdent; + use crate::io::StorageCredential; // --- Test helpers --- @@ -357,7 +356,10 @@ mod tests { } fn test_table_ident() -> TableIdent { - TableIdent::new(NamespaceIdent::new("test_ns".to_string()), "test_table".to_string()) + TableIdent::new( + NamespaceIdent::new("test_ns".to_string()), + "test_table".to_string(), + ) } fn build_memory_refreshable( diff --git a/crates/iceberg/src/io/storage_credential.rs b/crates/iceberg/src/io/storage_credential.rs index 85f83a7cbe..fb5fe0ef24 100644 --- a/crates/iceberg/src/io/storage_credential.rs +++ b/crates/iceberg/src/io/storage_credential.rs @@ -18,8 +18,8 @@ use std::collections::HashMap; use std::fmt::Debug; -use crate::catalog::TableIdent; use crate::Result; +use crate::catalog::TableIdent; /// Storage credentials for accessing cloud storage. /// @@ -55,7 +55,11 @@ pub struct MetadataLocation(pub String); /// /// #[async_trait::async_trait] /// impl StorageCredentialsLoader for MyCredentialLoader { -/// async fn load_credentials(&self, _table_ident: &iceberg::TableIdent, location: &str) -> iceberg::Result { +/// async fn load_credentials( +/// &self, +/// _table_ident: &iceberg::TableIdent, +/// location: &str, +/// ) -> iceberg::Result { /// // Fetch fresh credentials from your credential service /// let mut config = HashMap::new(); /// config.insert("access_key_id".to_string(), "fresh-key".to_string());