Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "RustyIceberg"
uuid = "390bdf5b-b624-43dc-a846-0ef7a3405804"
version = "0.6.1"
version = "0.6.2"

[deps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
Expand Down
6 changes: 3 additions & 3 deletions iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iceberg_rust_ffi"
version = "0.7.4"
version = "0.7.5"
edition = "2021"

[lib]
Expand All @@ -12,8 +12,8 @@ default = ["julia"]
julia = []

[dependencies]
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1", features = ["storage-azdls"] }
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1" }
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "c4598e09cdf754417848b341104fc66ac006f9e4", features = ["storage-azdls"] }
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "c4598e09cdf754417848b341104fc66ac006f9e4" }
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Expand Down
88 changes: 82 additions & 6 deletions iceberg_rust_ffi/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use crate::response::{
use crate::IcebergTable;
use anyhow::Result;
use async_trait::async_trait;
use iceberg::io::{StorageCredential, StorageCredentialsLoader};
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableIdent};
use iceberg_catalog_rest::{CustomAuthenticator, RestCatalog, RestCatalogBuilder};
use std::collections::HashMap;
use std::ffi::{c_char, c_void};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, OnceLock, Weak};

// FFI exports
use object_store_ffi::{
Expand Down Expand Up @@ -130,13 +131,49 @@ impl CustomAuthenticator for FFITokenAuthenticator {
}
}

/// Credential loader that calls the REST catalog's load_table_credentials endpoint
/// to obtain storage credentials for table data access.
///
/// Uses a `Weak` reference to break the circular dependency:
/// `Arc<RestCatalog>` → (owns) `Arc<RestCredentialsLoader>` → (weak) `RestCatalog`
#[derive(Debug)]
struct RestCredentialsLoader {
catalog: OnceLock<Weak<RestCatalog>>,
}

#[async_trait]
impl StorageCredentialsLoader for RestCredentialsLoader {
async fn load_credentials(
&self,
table_ident: &TableIdent,
location: &str,
) -> iceberg::Result<StorageCredential> {
let catalog = self
.catalog
.get()
.and_then(|w| w.upgrade())
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "Catalog reference is not available")
})?;
let response = catalog.load_table_credentials(table_ident).await?;
response
.storage_credentials
.into_iter()
.filter(|c| location.starts_with(&c.prefix))
.max_by_key(|c| c.prefix.len())
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "No matching credential for location"))
}
}

/// Opaque catalog handle for FFI
/// Stores a RestCatalog instance wrapped in a Box for safe memory management.
/// Stores a RestCatalog instance wrapped in an Arc for safe memory management.
/// Also stores the authenticator to allow setting it before catalog creation.
pub struct IcebergCatalog {
catalog: Option<Box<RestCatalog>>,
catalog: Option<Arc<RestCatalog>>,
/// Stores a pending authenticator to be applied before first use
authenticator: Option<Arc<FFITokenAuthenticator>>,
/// Whether to attach a storage credentials loader after catalog creation
use_credentials_loader: bool,
}

// SAFETY: Send and Sync are safe because:
Expand All @@ -152,6 +189,7 @@ impl Default for IcebergCatalog {
IcebergCatalog {
catalog: None,
authenticator: None,
use_credentials_loader: false,
}
}
}
Expand All @@ -172,8 +210,28 @@ impl IcebergCatalog {
builder = builder.with_token_authenticator(authenticator.clone());
}

let catalog = builder.load("rest", catalog_props).await?;
self.catalog = Some(Box::new(catalog));
let mut catalog = builder.load("rest", catalog_props).await?;

if self.use_credentials_loader {
// Create loader with empty catalog reference
let loader = Arc::new(RestCredentialsLoader {
catalog: OnceLock::new(),
});
let loader_ref = Arc::clone(&loader);

// Attach loader to catalog while we still have &mut access
catalog.set_storage_credentials_loader(loader);

// Wrap catalog in Arc
let catalog_arc = Arc::new(catalog);

// Fill the loader's weak reference to the catalog
let _ = loader_ref.catalog.set(Arc::downgrade(&catalog_arc));

self.catalog = Some(catalog_arc);
} else {
self.catalog = Some(Arc::new(catalog));
}

Ok(self)
}
Expand Down Expand Up @@ -210,7 +268,7 @@ impl IcebergCatalog {
///
/// Returns Some(&RestCatalog) if initialized, None otherwise.
pub fn get_catalog(&self) -> Option<&RestCatalog> {
self.catalog.as_ref().map(|c| c.as_ref())
self.catalog.as_deref()
}

/// Load a table by namespace and name
Expand Down Expand Up @@ -566,6 +624,24 @@ pub extern "C" fn iceberg_catalog_set_token_authenticator(
}
}

/// Enable the storage credentials loader for the catalog.
/// Must be called before iceberg_rest_catalog_create.
/// When enabled, the catalog will use a loader that calls load_table_credentials
/// to obtain storage credentials for table data access.
#[no_mangle]
pub extern "C" fn iceberg_catalog_set_storage_credentials_loader(
catalog: *mut IcebergCatalog,
) -> CResult {
if catalog.is_null() {
return CResult::Error;
}

let catalog_ref = unsafe { &mut *catalog };
catalog_ref.use_credentials_loader = true;

CResult::Ok
}

// Create a new table in the catalog
export_runtime_op!(
iceberg_catalog_create_table,
Expand Down
51 changes: 46 additions & 5 deletions src/catalog.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,30 +129,52 @@ mutable struct NestedStringListResponse
end

"""
catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog
catalog_create_rest(
uri::String;
properties::Dict{String,String}=Dict{String,String}(),
use_credentials_loader::Bool=false,
)::Catalog

Create a REST catalog connection.

# Arguments
- `uri::String`: URI of the Iceberg REST catalog server (e.g., "http://localhost:8181")
- `properties::Dict{String,String}`: Optional key-value properties for catalog configuration.
By default (empty dict), no additional properties are passed.
- `use_credentials_loader::Bool=false`: If true, enables a storage credentials loader that
calls the catalog's `load_table_credentials` endpoint to obtain storage credentials.
When a PermissionDenied error occurs during data access, the loader automatically
refreshes credentials.

# Returns
- A `Catalog` handle for use in other catalog operations

# Example
```julia
catalog = catalog_create_rest("http://polaris:8181")
catalog = catalog_create_rest("http://polaris:8181"; use_credentials_loader=true)
```
"""
function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{String,String}())
function catalog_create_rest(
uri::String; properties::Dict{String,String}=Dict{String,String}(),
use_credentials_loader::Bool=false,
)
# Create an empty catalog (no authenticator)
catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid}
if catalog_ptr == C_NULL
throw(IcebergException("Failed to create empty catalog"))
end

# Enable storage credentials loader if requested
if use_credentials_loader
result = @ccall rust_lib.iceberg_catalog_set_storage_credentials_loader(
catalog_ptr::Ptr{Cvoid}
)::Cint
if result != 0
throw(IcebergException("Failed to set storage credentials loader"))
end
end

# Initialize the catalog with REST connection
# Convert properties dict to array of PropertyEntry structs
property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties]
Expand All @@ -177,7 +199,12 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S
end

"""
catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog
catalog_create_rest(
authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}},
uri::String;
properties::Dict{String,String}=Dict{String,String}(),
use_credentials_loader::Bool=false,
)::Catalog

Create a REST catalog connection with custom token authentication and token caching support.

Expand Down Expand Up @@ -211,13 +238,27 @@ end
catalog = catalog_create_rest(FunctionWrapper{Union{String,Nothing},Tuple{}}(get_token), "http://polaris:8181")
```
"""
function catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())
function catalog_create_rest(
authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}},
uri::String; properties::Dict{String,String}=Dict{String,String}(),
use_credentials_loader::Bool=false,
)
# Step 1: Create an empty catalog
catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid}
if catalog_ptr == C_NULL
throw(IcebergException("Failed to create empty catalog"))
end

# Step 1b: Enable storage credentials loader if requested
if use_credentials_loader
result = @ccall rust_lib.iceberg_catalog_set_storage_credentials_loader(
catalog_ptr::Ptr{Cvoid}
)::Cint
if result != 0
throw(IcebergException("Failed to set storage credentials loader"))
end
end

# Step 2: Wrap the authenticator in a Ref for stable memory address
authenticator_ref = Ref(authenticator)

Expand Down Expand Up @@ -826,4 +867,4 @@ function invalidate_catalog_token!(catalog::Catalog)

@throw_on_error(response, "invalidate_catalog_token", IcebergException)
return nothing
end
end
Loading