Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions crates/catalog/rest/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ impl HttpClient {
// Try authenticator first (highest priority)
if let Some(authenticator) = &self.authenticator {
let token = authenticator.get_token().await?;
// Cache the token so that subsequent requests can use it without calling the authenticator
*self.token.lock().await = Some(token.clone());
Self::set_bearer_token(req, &token, "Invalid custom token")?;
return Ok(());
}
Expand Down Expand Up @@ -284,41 +286,53 @@ impl HttpClient {
Ok(self.client.execute(request).await?)
}

// Queries the Iceberg REST catalog after authentication with the given `Request` and
// returns a `Response`.
// Queries the Iceberg REST catalog with authentication and returns a `Response`.
//
// If a custom authenticator is configured, the first request is sent without authentication.
// If it fails with a 401/403 permission denied error, the custom authenticator is called
// to get a fresh token and the request is retried.
// For custom authenticators:
// - On the first request, fetches a token from the authenticator and caches it.
// - On subsequent requests, reuses the cached token without calling the authenticator.
// - If a request returns 401/403, invalidates the cache and fetches a fresh token.
//
// For other authentication methods (static token, OAuth credentials), authentication
// is applied to all requests as before.
pub async fn query_catalog(&self, mut request: Request) -> Result<Response> {
let has_custom_authenticator = self.authenticator.is_some();
let token_is_set = self.token.lock().await.is_some();
if self.authenticator.is_some() {
// For custom authenticators, use cached token if available
let token_is_set = self.token.lock().await.is_some();

if token_is_set {
// We have a cached token, use it by applying the cached authorization
// without calling the authenticator again
let cached_token = self.token.lock().await.clone();
if let Some(token) = cached_token {
HttpClient::set_bearer_token(&mut request, &token, "Invalid cached token")?;
}
} else {
// No cached token, fetch one from the authenticator
self.authenticate(&mut request).await?;
}

if has_custom_authenticator && token_is_set {
// For custom authenticators with a cached token, try without authentication first
// to avoid unnecessary token fetches
let cloned_request = request
.try_clone()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Unable to clone request"))?;
let response = self.execute(cloned_request).await?;
// Send request with authentication
let response =
self.execute(request.try_clone().ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "Unable to clone request")
})?)
.await?;

// Check if we got a permission denied error
if matches!(
response.status(),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN
) {
// Retry with authentication from the custom authenticator
// Token was rejected, invalidate and get a fresh one from the authenticator
self.invalidate_token().await?;
self.authenticate(&mut request).await?;
return self.execute(request).await;
}

Ok(response)
} else {
// For custom authenticators without a token, or other auth methods:
// authenticate on every request
// Other auth methods: authenticate on every request
self.authenticate(&mut request).await?;
self.execute(request).await
}
Expand Down
87 changes: 59 additions & 28 deletions crates/catalog/rest/tests/rest_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,33 +530,58 @@ async fn test_authenticator_token_refresh() {
count: token_request_count_clone,
});

let catalog_with_auth = get_catalog(Some(authenticator)).await;
let catalog_with_auth = get_catalog(Some(authenticator.clone())).await;

// Perform multiple operations that should trigger token requests
let ns1 = Namespace::with_properties(
NamespaceIdent::from_strs(["test_refresh_1"]).unwrap(),
HashMap::new(),
);
// Clean up from any previous test runs
let ns1_ident = NamespaceIdent::from_strs(["test_refresh_1"]).unwrap();
let ns2_ident = NamespaceIdent::from_strs(["test_refresh_2"]).unwrap();
cleanup_namespace(&catalog_with_auth, &ns1_ident).await;
cleanup_namespace(&catalog_with_auth, &ns2_ident).await;

// Perform multiple operations that should reuse the cached token
let ns1 = Namespace::with_properties(ns1_ident.clone(), HashMap::new());
catalog_with_auth
.create_namespace(ns1.name(), HashMap::new())
.await
.unwrap();

let ns2 = Namespace::with_properties(
NamespaceIdent::from_strs(["test_refresh_2"]).unwrap(),
HashMap::new(),
);
let ns2 = Namespace::with_properties(ns2_ident.clone(), HashMap::new());
catalog_with_auth
.create_namespace(ns2.name(), HashMap::new())
.await
.unwrap();

// Verify authenticator was called multiple times
// With lazy authentication, the token is fetched once and cached for reuse
// across multiple operations, rather than being called on every request
let count = *token_request_count.lock().unwrap();
assert!(
count >= 2,
"Authenticator should have been called at least twice, but was called {count} times"
assert_eq!(
count, 1,
"Authenticator should have been called once for lazy token caching, but was called {count} times"
);

// Test that token is refreshed when invalidated
catalog_with_auth.invalidate_token().await.unwrap();

let ns3_ident = NamespaceIdent::from_strs(["test_refresh_3"]).unwrap();
cleanup_namespace(&catalog_with_auth, &ns3_ident).await;

let ns3 = Namespace::with_properties(ns3_ident.clone(), HashMap::new());
catalog_with_auth
.create_namespace(ns3.name(), HashMap::new())
.await
.unwrap();

// After invalidating and making another request, authenticator should be called again
let count = *token_request_count.lock().unwrap();
assert_eq!(
count, 2,
"Authenticator should have been called twice (once initial, once after invalidation), but was called {count} times"
);

// Clean up
cleanup_namespace(&catalog_with_auth, &ns1_ident).await;
cleanup_namespace(&catalog_with_auth, &ns2_ident).await;
cleanup_namespace(&catalog_with_auth, &ns3_ident).await;
}

#[tokio::test]
Expand All @@ -570,40 +595,46 @@ async fn test_authenticator_persists_across_operations() {

let catalog_with_auth = get_catalog(Some(authenticator)).await;

// Clean up from any previous test runs
let ns_ident = NamespaceIdent::from_strs(["test_persist", "auth"]).unwrap();
let parent_ident = NamespaceIdent::from_strs(["test_persist"]).unwrap();
cleanup_namespace(&catalog_with_auth, &ns_ident).await;

// Create a namespace
let ns = Namespace::with_properties(
NamespaceIdent::from_strs(["test_persist", "auth"]).unwrap(),
HashMap::new(),
);
let ns = Namespace::with_properties(ns_ident.clone(), HashMap::new());
catalog_with_auth
.create_namespace(ns.name(), HashMap::new())
.await
.unwrap();

let count_after_create = *operation_count.lock().unwrap();

// List the namespace children (should use the same authenticator)
// List the namespace children (should reuse the cached token from the create operation)
// We need to list children of "test_persist" to find "auth"
let list_result = catalog_with_auth
.list_namespaces(Some(&NamespaceIdent::from_strs(["test_persist"]).unwrap()))
.list_namespaces(Some(&parent_ident))
.await
.unwrap();
assert!(
list_result.contains(&NamespaceIdent::from_strs(["test_persist", "auth"]).unwrap()),
list_result.contains(&ns_ident),
"Namespace {:?} not found in list {:?}",
ns.name(),
list_result
);

let count_after_list = *operation_count.lock().unwrap();

// Verify authenticator was used for both operations
assert!(
count_after_create > 0,
"Authenticator should be used for create"
// With lazy authentication, the token is fetched once on the first operation
// and then reused for subsequent operations without calling the authenticator again
assert_eq!(
count_after_create, 1,
"Authenticator should be called once for the create operation"
);
assert!(
count_after_list > count_after_create,
"Authenticator should be used for list operation too"
assert_eq!(
count_after_list, 1,
"Authenticator should still have been called only once (token is cached and reused for list)"
);

// Clean up
cleanup_namespace(&catalog_with_auth, &ns_ident).await;
}
Loading