diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 37a7996f80..20f73a7ae4 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -23,8 +23,9 @@ use async_trait::async_trait; use aws_sdk_glue::operation::create_table::CreateTableError; use aws_sdk_glue::operation::update_table::UpdateTableError; use aws_sdk_glue::types::TableInput; -use iceberg::io::{ - FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, +use iceberg::io::FileIO; +use iceberg::io::storage::config::{ + S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, }; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index f6e2060c0f..a8d2139dfd 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -22,7 +22,9 @@ use std::collections::HashMap; -use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::io::storage::config::{ + S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, +}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{ diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index bc036d0c6b..9d0788b63c 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -22,7 +22,9 @@ use std::collections::HashMap; -use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::io::storage::config::{ + S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, +}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 32c11bf203..c2f4654fd0 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -54,6 +54,8 @@ use crate::types::{ pub const REST_CATALOG_PROP_URI: &str = "uri"; /// REST catalog warehouse location pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; +/// Disable header redaction in error logs (defaults to false for security) +pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction"; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -330,6 +332,17 @@ impl RestCatalogConfig { params } + /// Check if header redaction is disabled in error logs. + /// + /// Returns true if the `disable-header-redaction` property is set to "true". + /// Defaults to false for security (headers are redacted by default). + pub(crate) fn disable_header_redaction(&self) -> bool { + self.props + .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION) + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false) + } + /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server). pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self { if let Some(uri) = config.overrides.remove("uri") { @@ -430,7 +443,11 @@ impl RestCatalog { match http_response.status() { StatusCode::OK => deserialize_catalog_response(http_response).await, - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + client.disable_header_redaction(), + ) + .await), } } @@ -534,7 +551,13 @@ impl RestCatalog { "Tried to load a table that does not exist", )); } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => { + return Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await); + } }; // Build config with proper precedence, with each next config overriding previous one: @@ -634,7 +657,11 @@ impl RestCatalog { ErrorKind::Unexpected, "Tried to load credentials for a table that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -694,7 +721,13 @@ impl Catalog for RestCatalog { "The parent parameter of the namespace provided does not exist", )); } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => { + return Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await); + } } } @@ -729,7 +762,11 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to create a namespace that already exists", )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -753,7 +790,11 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to get a namespace that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -770,7 +811,11 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(true), StatusCode::NOT_FOUND => Ok(false), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -801,7 +846,11 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to drop a namespace that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -838,7 +887,13 @@ impl Catalog for RestCatalog { "Tried to list tables of a namespace that does not exist", )); } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => { + return Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await); + } } } @@ -892,7 +947,13 @@ impl Catalog for RestCatalog { "The table already exists", )); } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => { + return Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await); + } }; let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( @@ -949,7 +1010,11 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to drop a table that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -967,7 +1032,11 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(true), StatusCode::NOT_FOUND => Ok(false), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -996,7 +1065,11 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to rename a table to a name that already exists", )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), } } @@ -1040,7 +1113,13 @@ impl Catalog for RestCatalog { "The given table already exists.", )); } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => { + return Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await); + } }; let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( @@ -1112,7 +1191,13 @@ impl Catalog for RestCatalog { "A server-side gateway timeout occurred; the commit state is unknown.", )); } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => { + return Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await); + } }; // TODO: Support vended credentials here. diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index a7a9852750..e08be19f0a 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -57,6 +57,8 @@ pub(crate) struct HttpClient { extra_headers: HeaderMap, /// Extra oauth parameters to be added to each authentication request. extra_oauth_params: HashMap, + /// Whether to disable header redaction in error logs (defaults to false for security). + disable_header_redaction: bool, } impl Debug for HttpClient { @@ -80,6 +82,7 @@ impl HttpClient { authenticator: None, extra_headers, extra_oauth_params: cfg.extra_oauth_params(), + disable_header_redaction: cfg.disable_header_redaction(), }) } @@ -108,6 +111,7 @@ impl HttpClient { } else { self.extra_oauth_params }, + disable_header_redaction: cfg.disable_header_redaction(), }) } @@ -337,6 +341,11 @@ impl HttpClient { self.execute(request).await } } + + /// Returns whether header redaction is disabled for this client. + pub(crate) fn disable_header_redaction(&self) -> bool { + self.disable_header_redaction + } } /// Deserializes a catalog response into the given [`DeserializedOwned`] type. @@ -357,14 +366,64 @@ pub(crate) async fn deserialize_catalog_response( }) } +/// Headers that contain sensitive information and should be excluded from logs. +const SENSITIVE_HEADERS: &[&str] = &[ + "authorization", + "proxy-authorization", + "set-cookie", + "cookie", + "x-api-key", + "x-auth-token", +]; + +/// Returns true if the header name is considered sensitive. +fn is_sensitive_header(name: &str) -> bool { + let name_lower = name.to_lowercase(); + SENSITIVE_HEADERS.iter().any(|h| name_lower == *h) +} + +/// Redacts sensitive headers and returns a debug-formatted string. +/// +/// If `disable_redaction` is true, returns all headers without redaction. +/// Otherwise, replaces sensitive header values with "[REDACTED]". +fn format_headers_redacted(headers: &HeaderMap, disable_redaction: bool) -> String { + if disable_redaction { + // Return all headers as-is without redaction + let all: HashMap<&str, &str> = headers + .iter() + .filter_map(|(name, value)| value.to_str().ok().map(|v| (name.as_str(), v))) + .collect(); + return format!("{all:?}"); + } + + // Redact sensitive headers by replacing their values with "[REDACTED]" + let redacted: HashMap<&str, &str> = headers + .iter() + .filter_map(|(name, value)| { + if is_sensitive_header(name.as_str()) { + Some((name.as_str(), "[REDACTED]")) + } else { + value.to_str().ok().map(|v| (name.as_str(), v)) + } + }) + .collect(); + format!("{redacted:?}") +} + /// Deserializes a unexpected catalog response into an error. -pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error { +pub(crate) async fn deserialize_unexpected_catalog_error( + response: Response, + disable_header_redaction: bool, +) -> Error { let err = Error::new( ErrorKind::Unexpected, "Received response with unexpected status code", ) .with_context("status", response.status().to_string()) - .with_context("headers", format!("{:?}", response.headers())); + .with_context( + "headers", + format_headers_redacted(response.headers(), disable_header_redaction), + ); let bytes = match response.bytes().await { Ok(bytes) => bytes, @@ -376,3 +435,124 @@ pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> } err.with_context("json", String::from_utf8_lossy(&bytes)) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_headers_redacted_empty() { + let headers = HeaderMap::new(); + let result = format_headers_redacted(&headers, false); + assert_eq!(result, "{}"); + } + + #[test] + fn test_format_headers_redacted_non_sensitive() { + let mut headers = HeaderMap::new(); + headers.insert("content-type", "application/json".parse().unwrap()); + headers.insert("x-request-id", "abc123".parse().unwrap()); + + let result = format_headers_redacted(&headers, false); + + assert!(result.contains("content-type")); + assert!(result.contains("application/json")); + assert!(result.contains("x-request-id")); + assert!(result.contains("abc123")); + } + + #[test] + fn test_format_headers_redacted_filters_sensitive() { + let mut headers = HeaderMap::new(); + headers.insert("authorization", "Bearer secret-token".parse().unwrap()); + headers.insert("content-type", "application/json".parse().unwrap()); + + let result = format_headers_redacted(&headers, false); + + // Sensitive header should be present but with redacted value + assert!(result.contains("authorization")); + assert!(result.contains("[REDACTED]")); + // Sensitive value should NOT be present + assert!(!result.contains("secret-token")); + // Non-sensitive header should be present with actual value + assert!(result.contains("content-type")); + assert!(result.contains("application/json")); + } + + #[test] + fn test_format_headers_redacted_filters_set_cookie() { + let mut headers = HeaderMap::new(); + headers.insert( + "set-cookie", + "CF_Authorization=sensitive-session-token; Path=/; Secure;" + .parse() + .unwrap(), + ); + headers.insert("server", "cloudflare".parse().unwrap()); + + let result = format_headers_redacted(&headers, false); + + // Sensitive header should be present but with redacted value + assert!(result.contains("set-cookie")); + assert!(result.contains("[REDACTED]")); + // Sensitive value should NOT be present + assert!(!result.contains("sensitive-session-token")); + // Non-sensitive header should be present with actual value + assert!(result.contains("server")); + assert!(result.contains("cloudflare")); + } + + #[test] + fn test_format_headers_redacted_filters_all_sensitive() { + let mut headers = HeaderMap::new(); + headers.insert("authorization", "Bearer token".parse().unwrap()); + headers.insert("proxy-authorization", "Basic creds".parse().unwrap()); + headers.insert("set-cookie", "session=abc".parse().unwrap()); + headers.insert("cookie", "session=abc".parse().unwrap()); + headers.insert("x-api-key", "api-key-123".parse().unwrap()); + headers.insert("x-auth-token", "auth-token-456".parse().unwrap()); + headers.insert("x-request-id", "req-123".parse().unwrap()); + + let result = format_headers_redacted(&headers, false); + + // All sensitive headers should be present but with redacted values + assert!(result.contains("authorization")); + assert!(result.contains("proxy-authorization")); + assert!(result.contains("set-cookie")); + assert!(result.contains("cookie")); + assert!(result.contains("x-api-key")); + assert!(result.contains("x-auth-token")); + assert!(result.contains("[REDACTED]")); + + // Ensure no sensitive values leaked + assert!(!result.contains("Bearer token")); + assert!(!result.contains("Basic creds")); + assert!(!result.contains("session=abc")); + assert!(!result.contains("api-key-123")); + assert!(!result.contains("auth-token-456")); + + // Non-sensitive header should be present with actual value + assert!(result.contains("x-request-id")); + assert!(result.contains("req-123")); + } + + #[test] + fn test_format_headers_with_redaction_disabled() { + let mut headers = HeaderMap::new(); + headers.insert("authorization", "Bearer secret-token".parse().unwrap()); + headers.insert("x-api-key", "api-key-123".parse().unwrap()); + headers.insert("content-type", "application/json".parse().unwrap()); + + let result = format_headers_redacted(&headers, true); + + // When redaction is disabled, all headers and values should be present + assert!(result.contains("authorization")); + assert!(result.contains("Bearer secret-token")); + assert!(result.contains("x-api-key")); + assert!(result.contains("api-key-123")); + assert!(result.contains("content-type")); + assert!(result.contains("application/json")); + // [REDACTED] should NOT be present when redaction is disabled + assert!(!result.contains("[REDACTED]")); + } +} diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 3606fac99a..a6ac60bc79 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -448,9 +448,9 @@ impl Catalog for S3TablesCatalog { .await .map_err(from_aws_sdk_error)?; - // prepare metadata location. the warehouse location is generated by s3tables catalog, + // prepare table location. the warehouse location is generated by s3tables catalog, // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3 - let metadata_location = match &creation.location { + let table_location = match &creation.location { Some(_) => { return Err(Error::new( ErrorKind::DataInvalid, @@ -467,16 +467,17 @@ impl Catalog for S3TablesCatalog { .send() .await .map_err(from_aws_sdk_error)?; - let warehouse_location = get_resp.warehouse_location().to_string(); - MetadataLocation::new_with_table_location(warehouse_location).to_string() + get_resp.warehouse_location().to_string() } }; // write metadata to file - creation.location = Some(metadata_location.clone()); + creation.location = Some(table_location.clone()); let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; + let metadata_location = + MetadataLocation::new_with_table_location(table_location).to_string(); metadata.write_to(&self.file_io, &metadata_location).await?; // update metadata location diff --git a/crates/examples/src/oss_backend.rs b/crates/examples/src/oss_backend.rs index 8f0b866d23..a5e2776cb2 100644 --- a/crates/examples/src/oss_backend.rs +++ b/crates/examples/src/oss_backend.rs @@ -18,6 +18,10 @@ use std::collections::HashMap; use futures::stream::StreamExt; +use iceberg::io::storage::config::{ + OSS_ACCESS_KEY_ID as OSS_ACCESS_KEY_ID_PROP, + OSS_ACCESS_KEY_SECRET as OSS_ACCESS_KEY_SECRET_PROP, OSS_ENDPOINT as OSS_ENDPOINT_PROP, +}; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder}; @@ -47,16 +51,13 @@ async fn main() { "rest", HashMap::from([ (REST_CATALOG_PROP_URI.to_string(), REST_URI.to_string()), + (OSS_ENDPOINT_PROP.to_string(), OSS_ENDPOINT.to_string()), ( - iceberg::io::OSS_ENDPOINT.to_string(), - OSS_ENDPOINT.to_string(), - ), - ( - iceberg::io::OSS_ACCESS_KEY_ID.to_string(), + OSS_ACCESS_KEY_ID_PROP.to_string(), OSS_ACCESS_KEY_ID.to_string(), ), ( - iceberg::io::OSS_ACCESS_KEY_SECRET.to_string(), + OSS_ACCESS_KEY_SECRET_PROP.to_string(), OSS_ACCESS_KEY_SECRET.to_string(), ), ]), diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 1ad71d8531..267f295369 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -23,8 +23,7 @@ use std::sync::Arc; use bytes::Bytes; use url::Url; -use super::opendal::OpenDalStorage; -use super::storage::Storage; +use super::storage::{OpenDalStorage, Storage}; use crate::{Error, ErrorKind, Result}; /// FileIO implementation, used to manipulate files in underlying storage. diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index c6af7db1e6..3da6d407ca 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -23,7 +23,8 @@ //! //! ```rust //! use iceberg::Result; -//! use iceberg::io::{FileIOBuilder, S3_REGION}; +//! use iceberg::io::FileIOBuilder; +//! use iceberg::io::storage::config::S3_REGION; //! //! # fn test() -> Result<()> { //! // Build a memory file io. @@ -42,7 +43,8 @@ //! //! ```rust //! use iceberg::Result; -//! use iceberg::io::{FileIO, S3_REGION}; +//! use iceberg::io::FileIO; +//! use iceberg::io::storage::config::S3_REGION; //! //! # fn test() -> Result<()> { //! // Build a memory file io. @@ -66,23 +68,19 @@ //! - `new_input`: Create input file for reading. //! - `new_output`: Create output file for writing. -mod config; mod file_io; -mod local_fs; -mod memory; -mod opendal; mod refreshable_accessor; mod refreshable_storage; -mod storage; +pub mod storage; mod storage_credential; -pub use config::*; pub use file_io::*; #[cfg(feature = "storage-s3")] -pub use opendal::CustomAwsCredentialLoader; -pub use opendal::{OpenDalStorage, OpenDalStorageFactory}; +pub use storage::opendal::CustomAwsCredentialLoader; +pub use storage::opendal::{OpenDalStorage, OpenDalStorageFactory}; pub use storage::{Storage, StorageConfig, StorageFactory}; pub use storage_credential::*; + pub(crate) mod object_cache; pub(crate) fn is_truthy(value: &str) -> bool { diff --git a/crates/iceberg/src/io/refreshable_storage.rs b/crates/iceberg/src/io/refreshable_storage.rs index 3a79f02b94..b4cbd1069f 100644 --- a/crates/iceberg/src/io/refreshable_storage.rs +++ b/crates/iceberg/src/io/refreshable_storage.rs @@ -23,8 +23,8 @@ use opendal::Operator; use opendal::raw::*; use tokio::sync::Mutex as AsyncMutex; -use super::opendal::OpenDalStorage; use super::refreshable_accessor::RefreshableAccessor; +use super::storage::opendal::OpenDalStorage; use crate::catalog::TableIdent; use crate::io::file_io::Extensions; use crate::io::{StorageCredential, StorageCredentialsLoader}; diff --git a/crates/iceberg/src/io/config/azdls.rs b/crates/iceberg/src/io/storage/config/azdls.rs similarity index 100% rename from crates/iceberg/src/io/config/azdls.rs rename to crates/iceberg/src/io/storage/config/azdls.rs diff --git a/crates/iceberg/src/io/config/gcs.rs b/crates/iceberg/src/io/storage/config/gcs.rs similarity index 100% rename from crates/iceberg/src/io/config/gcs.rs rename to crates/iceberg/src/io/storage/config/gcs.rs diff --git a/crates/iceberg/src/io/config/mod.rs b/crates/iceberg/src/io/storage/config/mod.rs similarity index 100% rename from crates/iceberg/src/io/config/mod.rs rename to crates/iceberg/src/io/storage/config/mod.rs diff --git a/crates/iceberg/src/io/config/oss.rs b/crates/iceberg/src/io/storage/config/oss.rs similarity index 100% rename from crates/iceberg/src/io/config/oss.rs rename to crates/iceberg/src/io/storage/config/oss.rs diff --git a/crates/iceberg/src/io/config/s3.rs b/crates/iceberg/src/io/storage/config/s3.rs similarity index 100% rename from crates/iceberg/src/io/config/s3.rs rename to crates/iceberg/src/io/storage/config/s3.rs diff --git a/crates/iceberg/src/io/local_fs.rs b/crates/iceberg/src/io/storage/local_fs.rs similarity index 99% rename from crates/iceberg/src/io/local_fs.rs rename to crates/iceberg/src/io/storage/local_fs.rs index 0a55199f70..d6dd5b433b 100644 --- a/crates/iceberg/src/io/local_fs.rs +++ b/crates/iceberg/src/io/storage/local_fs.rs @@ -31,7 +31,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use super::{ +use crate::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; diff --git a/crates/iceberg/src/io/memory.rs b/crates/iceberg/src/io/storage/memory.rs similarity index 99% rename from crates/iceberg/src/io/memory.rs rename to crates/iceberg/src/io/storage/memory.rs index 39f1f5db9d..cb01ee4709 100644 --- a/crates/iceberg/src/io/memory.rs +++ b/crates/iceberg/src/io/storage/memory.rs @@ -30,7 +30,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use super::{ +use crate::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage/mod.rs similarity index 93% rename from crates/iceberg/src/io/storage.rs rename to crates/iceberg/src/io/storage/mod.rs index 15cc85ab10..f3786667e3 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -17,15 +17,25 @@ //! Storage interfaces for Iceberg. +pub mod config; +mod local_fs; +mod memory; +pub mod opendal; + use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +pub use config::*; +pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; +pub use memory::{MemoryStorage, MemoryStorageFactory}; +#[cfg(feature = "storage-s3")] +pub use opendal::CustomAwsCredentialLoader; +pub use opendal::{OpenDalStorage, OpenDalStorageFactory}; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; -pub use crate::io::config::StorageConfig; /// Trait for storage operations in Iceberg. /// diff --git a/crates/iceberg/src/io/opendal/azdls.rs b/crates/iceberg/src/io/storage/opendal/azdls.rs similarity index 99% rename from crates/iceberg/src/io/opendal/azdls.rs rename to crates/iceberg/src/io/storage/opendal/azdls.rs index c957fd62a3..ea77c8a667 100644 --- a/crates/iceberg/src/io/opendal/azdls.rs +++ b/crates/iceberg/src/io/storage/opendal/azdls.rs @@ -24,7 +24,7 @@ use opendal::services::AzdlsConfig; use serde::{Deserialize, Serialize}; use url::Url; -use crate::io::config::{ +use super::super::config::{ ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET, ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID, }; diff --git a/crates/iceberg/src/io/opendal/fs.rs b/crates/iceberg/src/io/storage/opendal/fs.rs similarity index 100% rename from crates/iceberg/src/io/opendal/fs.rs rename to crates/iceberg/src/io/storage/opendal/fs.rs diff --git a/crates/iceberg/src/io/opendal/gcs.rs b/crates/iceberg/src/io/storage/opendal/gcs.rs similarity index 99% rename from crates/iceberg/src/io/opendal/gcs.rs rename to crates/iceberg/src/io/storage/opendal/gcs.rs index 5c6145d32b..d30051859b 100644 --- a/crates/iceberg/src/io/opendal/gcs.rs +++ b/crates/iceberg/src/io/storage/opendal/gcs.rs @@ -22,7 +22,7 @@ use opendal::Operator; use opendal::services::GcsConfig; use url::Url; -use crate::io::config::{ +use super::super::config::{ GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, }; diff --git a/crates/iceberg/src/io/opendal/memory.rs b/crates/iceberg/src/io/storage/opendal/memory.rs similarity index 100% rename from crates/iceberg/src/io/opendal/memory.rs rename to crates/iceberg/src/io/storage/opendal/memory.rs diff --git a/crates/iceberg/src/io/opendal/mod.rs b/crates/iceberg/src/io/storage/opendal/mod.rs similarity index 99% rename from crates/iceberg/src/io/opendal/mod.rs rename to crates/iceberg/src/io/storage/opendal/mod.rs index f1a1dce7d2..3913de5a8c 100644 --- a/crates/iceberg/src/io/opendal/mod.rs +++ b/crates/iceberg/src/io/storage/opendal/mod.rs @@ -39,13 +39,13 @@ use opendal::{Operator, Scheme}; pub use s3::CustomAwsCredentialLoader; use serde::{Deserialize, Serialize}; -use super::file_io::Extensions; -use super::refreshable_storage::RefreshableOpenDalStorageBuilder; -use super::{ +use crate::catalog::TableIdent; +use crate::io::file_io::Extensions; +use crate::io::refreshable_storage::RefreshableOpenDalStorageBuilder; +use crate::io::{ FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, MetadataLocation, OutputFile, Storage, StorageConfig, StorageCredential, StorageCredentialsLoader, StorageFactory, }; -use crate::catalog::TableIdent; use crate::{Error, ErrorKind, Result}; #[cfg(feature = "storage-azdls")] @@ -216,7 +216,7 @@ pub enum OpenDalStorage { /// The refreshable storage backend. /// `None` only after deserialization (cannot be reconstructed from serialized form). #[serde(skip)] - backend: Option>, + backend: Option>, }, } diff --git a/crates/iceberg/src/io/opendal/oss.rs b/crates/iceberg/src/io/storage/opendal/oss.rs similarity index 95% rename from crates/iceberg/src/io/opendal/oss.rs rename to crates/iceberg/src/io/storage/opendal/oss.rs index 83fc1424aa..64210b0f9a 100644 --- a/crates/iceberg/src/io/opendal/oss.rs +++ b/crates/iceberg/src/io/storage/opendal/oss.rs @@ -21,7 +21,7 @@ use opendal::services::OssConfig; use opendal::{Configurator, Operator}; use url::Url; -use crate::io::config::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; +use super::super::config::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; use crate::{Error, ErrorKind, Result}; /// Parse iceberg props to oss config. diff --git a/crates/iceberg/src/io/opendal/s3.rs b/crates/iceberg/src/io/storage/opendal/s3.rs similarity index 99% rename from crates/iceberg/src/io/opendal/s3.rs rename to crates/iceberg/src/io/storage/opendal/s3.rs index bf7399e01b..8d46c2a698 100644 --- a/crates/iceberg/src/io/opendal/s3.rs +++ b/crates/iceberg/src/io/storage/opendal/s3.rs @@ -25,7 +25,7 @@ pub use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; use url::Url; -use crate::io::config::{ +use super::super::config::{ CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN, S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD, S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY, diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index 005b0f3979..42f44ef822 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -24,7 +24,8 @@ mod tests { use std::collections::HashMap; use bytes::Bytes; - use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH}; + use iceberg::io::storage::config::{GCS_NO_AUTH, GCS_SERVICE_PATH}; + use iceberg::io::{FileIO, FileIOBuilder}; use iceberg_test_utils::{get_gcs_endpoint, set_up}; static FAKE_GCS_BUCKET: &str = "test-bucket"; diff --git a/crates/iceberg/tests/file_io_s3_test.rs b/crates/iceberg/tests/file_io_s3_test.rs index f28538e73e..c00c27955a 100644 --- a/crates/iceberg/tests/file_io_s3_test.rs +++ b/crates/iceberg/tests/file_io_s3_test.rs @@ -24,10 +24,10 @@ mod tests { use std::sync::Arc; use async_trait::async_trait; - use iceberg::io::{ - CustomAwsCredentialLoader, FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, + use iceberg::io::storage::config::{ + S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, }; + use iceberg::io::{CustomAwsCredentialLoader, FileIO, FileIOBuilder}; use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; diff --git a/crates/integration_tests/src/lib.rs b/crates/integration_tests/src/lib.rs index 4bf8f4d19c..8d18545ddb 100644 --- a/crates/integration_tests/src/lib.rs +++ b/crates/integration_tests/src/lib.rs @@ -18,7 +18,9 @@ use std::collections::HashMap; use std::sync::OnceLock; -use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::io::storage::config::{ + S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, +}; use iceberg_catalog_rest::REST_CATALOG_PROP_URI; use iceberg_test_utils::{get_minio_endpoint, get_rest_catalog_endpoint, set_up}; diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 2f0d08fbd9..21920c9ce6 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -188,18 +188,22 @@ services: # Provision service - creates test data via Spark provision: - image: python:3.12-slim + image: ghcr.io/astral-sh/uv:python3.12-bookworm-slim networks: iceberg_test: depends_on: spark-iceberg: condition: service_healthy - entrypoint: ["/bin/sh", "-c", "pip install -q 'pyspark[connect]==4.0.1' && python3 /opt/spark/provision.py && touch /tmp/provision_complete && tail -f /dev/null"] + entrypoint: ["/bin/sh", "-c", "uv run /opt/spark/provision.py && touch /tmp/provision_complete && tail -f /dev/null"] volumes: - ./spark/provision.py:/opt/spark/provision.py:ro + - uv-cache:/root/.cache/uv healthcheck: test: ["CMD-SHELL", "[ -f /tmp/provision_complete ]"] interval: 2s timeout: 2s retries: 90 start_period: 20s + +volumes: + uv-cache: diff --git a/dev/spark/provision.py b/dev/spark/provision.py index c53a1dd842..40f9ba0f38 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -15,6 +15,13 @@ # specific language governing permissions and limitations # under the License. +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "pyspark[connect]==4.0.1", +# ] +# /// + from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr