diff --git a/Project.toml b/Project.toml index 6349166..ae73c97 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyIceberg" uuid = "390bdf5b-b624-43dc-a846-0ef7a3405804" -version = "0.6.1" +version = "0.6.2" [deps] Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index b8f6d5d..1283b64 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.8.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1#fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=c4598e09cdf754417848b341104fc66ac006f9e4#c4598e09cdf754417848b341104fc66ac006f9e4" dependencies = [ "anyhow", "apache-avro", @@ -1587,7 +1587,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.8.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1#fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=c4598e09cdf754417848b341104fc66ac006f9e4#c4598e09cdf754417848b341104fc66ac006f9e4" dependencies = [ "async-trait", "chrono", @@ -1606,7 +1606,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.7.4" +version = "0.7.5" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index a6e5e7a..81d7d23 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.7.4" +version = "0.7.5" edition = "2021" [lib] @@ -12,8 +12,8 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1", features = ["storage-azdls"] } -iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "fd2d63eaf05a1fe3316a54fe5ecfbf17121881e1" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "c4598e09cdf754417848b341104fc66ac006f9e4", features = ["storage-azdls"] } +iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "c4598e09cdf754417848b341104fc66ac006f9e4" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 4e9cd16..2352eae 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -6,11 +6,12 @@ use crate::response::{ use crate::IcebergTable; use anyhow::Result; use async_trait::async_trait; +use iceberg::io::{StorageCredential, StorageCredentialsLoader}; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{CustomAuthenticator, RestCatalog, RestCatalogBuilder}; use std::collections::HashMap; use std::ffi::{c_char, c_void}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, OnceLock, Weak}; // FFI exports use object_store_ffi::{ @@ -130,13 +131,49 @@ impl CustomAuthenticator for FFITokenAuthenticator { } } +/// Credential loader that calls the REST catalog's load_table_credentials endpoint +/// to obtain storage credentials for table data access. +/// +/// Uses a `Weak` reference to break the circular dependency: +/// `Arc` → (owns) `Arc` → (weak) `RestCatalog` +#[derive(Debug)] +struct RestCredentialsLoader { + catalog: OnceLock>, +} + +#[async_trait] +impl StorageCredentialsLoader for RestCredentialsLoader { + async fn load_credentials( + &self, + table_ident: &TableIdent, + location: &str, + ) -> iceberg::Result { + let catalog = self + .catalog + .get() + .and_then(|w| w.upgrade()) + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Catalog reference is not available") + })?; + let response = catalog.load_table_credentials(table_ident).await?; + response + .storage_credentials + .into_iter() + .filter(|c| location.starts_with(&c.prefix)) + .max_by_key(|c| c.prefix.len()) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "No matching credential for location")) + } +} + /// Opaque catalog handle for FFI -/// Stores a RestCatalog instance wrapped in a Box for safe memory management. +/// Stores a RestCatalog instance wrapped in an Arc for safe memory management. /// Also stores the authenticator to allow setting it before catalog creation. pub struct IcebergCatalog { - catalog: Option>, + catalog: Option>, /// Stores a pending authenticator to be applied before first use authenticator: Option>, + /// Whether to attach a storage credentials loader after catalog creation + use_credentials_loader: bool, } // SAFETY: Send and Sync are safe because: @@ -152,6 +189,7 @@ impl Default for IcebergCatalog { IcebergCatalog { catalog: None, authenticator: None, + use_credentials_loader: false, } } } @@ -172,8 +210,28 @@ impl IcebergCatalog { builder = builder.with_token_authenticator(authenticator.clone()); } - let catalog = builder.load("rest", catalog_props).await?; - self.catalog = Some(Box::new(catalog)); + let mut catalog = builder.load("rest", catalog_props).await?; + + if self.use_credentials_loader { + // Create loader with empty catalog reference + let loader = Arc::new(RestCredentialsLoader { + catalog: OnceLock::new(), + }); + let loader_ref = Arc::clone(&loader); + + // Attach loader to catalog while we still have &mut access + catalog.set_storage_credentials_loader(loader); + + // Wrap catalog in Arc + let catalog_arc = Arc::new(catalog); + + // Fill the loader's weak reference to the catalog + let _ = loader_ref.catalog.set(Arc::downgrade(&catalog_arc)); + + self.catalog = Some(catalog_arc); + } else { + self.catalog = Some(Arc::new(catalog)); + } Ok(self) } @@ -210,7 +268,7 @@ impl IcebergCatalog { /// /// Returns Some(&RestCatalog) if initialized, None otherwise. pub fn get_catalog(&self) -> Option<&RestCatalog> { - self.catalog.as_ref().map(|c| c.as_ref()) + self.catalog.as_deref() } /// Load a table by namespace and name @@ -566,6 +624,24 @@ pub extern "C" fn iceberg_catalog_set_token_authenticator( } } +/// Enable the storage credentials loader for the catalog. +/// Must be called before iceberg_rest_catalog_create. +/// When enabled, the catalog will use a loader that calls load_table_credentials +/// to obtain storage credentials for table data access. +#[no_mangle] +pub extern "C" fn iceberg_catalog_set_storage_credentials_loader( + catalog: *mut IcebergCatalog, +) -> CResult { + if catalog.is_null() { + return CResult::Error; + } + + let catalog_ref = unsafe { &mut *catalog }; + catalog_ref.use_credentials_loader = true; + + CResult::Ok +} + // Create a new table in the catalog export_runtime_op!( iceberg_catalog_create_table, diff --git a/src/catalog.jl b/src/catalog.jl index 42d691a..c372173 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -129,7 +129,11 @@ mutable struct NestedStringListResponse end """ - catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog + catalog_create_rest( + uri::String; + properties::Dict{String,String}=Dict{String,String}(), + use_credentials_loader::Bool=false, + )::Catalog Create a REST catalog connection. @@ -137,6 +141,10 @@ Create a REST catalog connection. - `uri::String`: URI of the Iceberg REST catalog server (e.g., "http://localhost:8181") - `properties::Dict{String,String}`: Optional key-value properties for catalog configuration. By default (empty dict), no additional properties are passed. +- `use_credentials_loader::Bool=false`: If true, enables a storage credentials loader that + calls the catalog's `load_table_credentials` endpoint to obtain storage credentials. + When a PermissionDenied error occurs during data access, the loader automatically + refreshes credentials. # Returns - A `Catalog` handle for use in other catalog operations @@ -144,15 +152,29 @@ Create a REST catalog connection. # Example ```julia catalog = catalog_create_rest("http://polaris:8181") +catalog = catalog_create_rest("http://polaris:8181"; use_credentials_loader=true) ``` """ -function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{String,String}()) +function catalog_create_rest( + uri::String; properties::Dict{String,String}=Dict{String,String}(), + use_credentials_loader::Bool=false, +) # Create an empty catalog (no authenticator) catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} if catalog_ptr == C_NULL throw(IcebergException("Failed to create empty catalog")) end + # Enable storage credentials loader if requested + if use_credentials_loader + result = @ccall rust_lib.iceberg_catalog_set_storage_credentials_loader( + catalog_ptr::Ptr{Cvoid} + )::Cint + if result != 0 + throw(IcebergException("Failed to set storage credentials loader")) + end + end + # Initialize the catalog with REST connection # Convert properties dict to array of PropertyEntry structs property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties] @@ -177,7 +199,12 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S end """ - catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog + catalog_create_rest( + authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, + uri::String; + properties::Dict{String,String}=Dict{String,String}(), + use_credentials_loader::Bool=false, + )::Catalog Create a REST catalog connection with custom token authentication and token caching support. @@ -211,13 +238,27 @@ end catalog = catalog_create_rest(FunctionWrapper{Union{String,Nothing},Tuple{}}(get_token), "http://polaris:8181") ``` """ -function catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}()) +function catalog_create_rest( + authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, + uri::String; properties::Dict{String,String}=Dict{String,String}(), + use_credentials_loader::Bool=false, +) # Step 1: Create an empty catalog catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} if catalog_ptr == C_NULL throw(IcebergException("Failed to create empty catalog")) end + # Step 1b: Enable storage credentials loader if requested + if use_credentials_loader + result = @ccall rust_lib.iceberg_catalog_set_storage_credentials_loader( + catalog_ptr::Ptr{Cvoid} + )::Cint + if result != 0 + throw(IcebergException("Failed to set storage credentials loader")) + end + end + # Step 2: Wrap the authenticator in a Ref for stable memory address authenticator_ref = Ref(authenticator) @@ -826,4 +867,4 @@ function invalidate_catalog_token!(catalog::Catalog) @throw_on_error(response, "invalidate_catalog_token", IcebergException) return nothing -end \ No newline at end of file +end diff --git a/test/catalog_tests.jl b/test/catalog_tests.jl index b9c9d53..6ceb846 100644 --- a/test/catalog_tests.jl +++ b/test/catalog_tests.jl @@ -10,6 +10,7 @@ using RustyIceberg: Field, Schema, PartitionSpec, SortOrder, PartitionField @testset "Catalog API" begin println("Testing catalog API...") + without_aws_env() do # Test connecting to Polaris REST catalog # This requires the catalog to be running via docker-compose catalog_uri = get_catalog_uri() @@ -74,12 +75,15 @@ using RustyIceberg: Field, Schema, PartitionSpec, SortOrder, PartitionField end end + end # without_aws_env + println("✅ Catalog API tests completed!") end @testset "Catalog API with Token Authentication" begin println("Testing catalog API with token-based authentication...") + without_aws_env() do # Token endpoint catalog_uri = get_catalog_uri() token_endpoint = get_token_endpoint() @@ -162,12 +166,15 @@ end end end + end # without_aws_env + println("✅ Catalog API with token authentication tests completed!") end @testset "Catalog API with Custom Authenticator Function" begin println("Testing catalog API with custom authenticator function...") + without_aws_env() do # Token endpoint catalog_uri = get_catalog_uri() token_endpoint = get_token_endpoint() @@ -300,12 +307,15 @@ end end end + end # without_aws_env + println("✅ Catalog API with custom authenticator function tests completed!") end @testset "Catalog Table Loading" begin println("Testing catalog table loading...") + without_aws_env() do catalog_uri = get_catalog_uri() catalog = nothing @@ -379,12 +389,15 @@ end println("✅ All resources cleaned up successfully") end + end # without_aws_env + println("✅ Catalog table loading tests completed!") end @testset "Catalog Table Loading with Credentials" begin println("Testing catalog table loading with vended credentials...") + without_aws_env() do catalog_uri = get_catalog_uri() catalog = nothing @@ -463,12 +476,158 @@ end println("✅ All resources cleaned up successfully") end + end # without_aws_env + println("✅ Catalog table loading with credentials tests completed!") end +@testset "Catalog Table Loading fails without credentials" begin + println("Testing that table loading fails without S3 credentials or credentials loader...") + + catalog_uri = get_catalog_uri() + + without_aws_env() do + catalog = nothing + table = C_NULL + scan = C_NULL + stream = C_NULL + + try + # Create catalog WITHOUT S3 credentials and WITHOUT credentials loader + s3_config = get_s3_config() + props = get_catalog_properties_minimal() + props["s3.endpoint"] = s3_config["endpoint"] + props["s3.region"] = s3_config["region"] + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props, use_credentials_loader=false) + @test catalog !== nothing + println("✅ Catalog created (no credentials loader)") + + # Without S3 credentials, one of the following steps should fail: + # load_table, new_scan, scan!, or next_batch + credential_error_caught = false + try + table = RustyIceberg.load_table(catalog, ["tpch.sf01"], "customer"; load_credentials=false) + scan = RustyIceberg.new_scan(table) + RustyIceberg.select_columns!(scan, ["c_custkey"]) + stream = RustyIceberg.scan!(scan) + RustyIceberg.next_batch(stream) + error("Expected a credential/access error but none was thrown") + catch e + @test e isa RustyIceberg.IcebergException + msg = lowercase(e.msg) + @test occursin("credential", msg) || occursin("access", msg) || occursin("forbidden", msg) || occursin("permission", msg) + credential_error_caught = true + println("✅ Correctly failed without credentials: $(e.msg)") + end + @test credential_error_caught + finally + if stream != C_NULL + RustyIceberg.free_stream(stream) + end + if scan != C_NULL + RustyIceberg.free_scan!(scan) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + end + println("✅ All resources cleaned up successfully") + end + end + + println("✅ Catalog table loading failure test completed!") +end + +@testset "Catalog Table Loading with Storage Credentials Loader" begin + println("Testing catalog table loading with storage credentials loader...") + + catalog_uri = get_catalog_uri() + + without_aws_env() do + catalog = nothing + table = C_NULL + scan = C_NULL + stream = C_NULL + batch = nothing + + try + # Create catalog WITHOUT S3 credentials but WITH credentials loader + # The loader will call load_table_credentials when a PermissionDenied error occurs + s3_config = get_s3_config() + props = get_catalog_properties_minimal() + props["s3.endpoint"] = s3_config["endpoint"] + props["s3.region"] = s3_config["region"] + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props, use_credentials_loader=true) + @test catalog !== nothing + println("✅ Catalog created successfully with storage credentials loader") + + # Load the customer table WITHOUT load_credentials=true + # No vended credentials are fetched during load. + # When the scan tries to access S3, it will get PermissionDenied, + # triggering the credentials loader to call load_table_credentials. + println("Attempting to load customer table from tpch.sf01 (no vended credentials)...") + table = RustyIceberg.load_table(catalog, ["tpch.sf01"], "customer"; load_credentials=false) + @test table != C_NULL + println("✅ Customer table loaded successfully") + + # Create a scan on the loaded table + println("Creating scan on loaded customer table...") + scan = RustyIceberg.new_scan(table) + @test scan != C_NULL + println("✅ Scan created successfully on loaded table") + + # Select specific columns + println("Selecting specific columns from customer table...") + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", "c_nationkey"]) + println("✅ Column selection completed") + + # Execute the scan - this is where the credentials loader should kick in + println("Executing scan (credentials loader should be invoked on PermissionDenied)...") + stream = RustyIceberg.scan!(scan) + @test stream != C_NULL + println("✅ Scan executed successfully") + + # Read the first batch to verify data was accessible via loaded credentials + println("Reading first batch from loaded customer table...") + batch_ptr = RustyIceberg.next_batch(stream) + + if batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + if batch.data != C_NULL && batch.length > 0 + println("✅ Successfully read first batch with $(batch.length) bytes of Arrow IPC data") + + @test batch.length > 0 + println("✅ Batch contains valid Arrow data - storage credentials loader worked!") + + RustyIceberg.free_batch(batch_ptr) + end + end + finally + if stream != C_NULL + RustyIceberg.free_stream(stream) + end + if scan != C_NULL + RustyIceberg.free_scan!(scan) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + end + println("✅ All resources cleaned up successfully") + end + end + + println("✅ Catalog table loading with storage credentials loader tests completed!") +end + @testset "Catalog Incremental Scan" begin println("Testing catalog incremental scan...") + without_aws_env() do catalog_uri = get_catalog_uri() catalog = nothing @@ -586,12 +745,15 @@ end println("✅ All resources cleaned up successfully") end + end # without_aws_env + println("✅ Catalog incremental scan tests completed!") end @testset "Catalog Table Creation" begin println("Testing catalog table creation...") + without_aws_env() do catalog_uri = get_catalog_uri() catalog = nothing @@ -799,5 +961,7 @@ end end end + end # without_aws_env + println("✅ Catalog table creation and drop tests completed!") end diff --git a/test/test_config.jl b/test/test_config.jl index 21c1b17..09a3377 100644 --- a/test/test_config.jl +++ b/test/test_config.jl @@ -105,3 +105,30 @@ function get_catalog_properties_minimal() "warehouse" => get_warehouse_name() ) end + +const AWS_ENV_VARS = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION", + "AWS_REGION", "AWS_ENDPOINT_URL", "AWS_SESSION_TOKEN"] + +""" + without_aws_env(f) + +Run `f()` with all AWS environment variables unset, then restore them. +Useful for tests that need to verify credentials are obtained through +the catalog rather than from ambient environment. +""" +function without_aws_env(f) + saved = Dict{String,String}() + for var in AWS_ENV_VARS + if haskey(ENV, var) + saved[var] = ENV[var] + delete!(ENV, var) + end + end + try + f() + finally + for (var, val) in saved + ENV[var] = val + end + end +end