diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 20f73a7ae4..37a7996f80 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -23,9 +23,8 @@ use async_trait::async_trait; use aws_sdk_glue::operation::create_table::CreateTableError; use aws_sdk_glue::operation::update_table::UpdateTableError; use aws_sdk_glue::types::TableInput; -use iceberg::io::FileIO; -use iceberg::io::storage::config::{ - S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, +use iceberg::io::{ + FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, }; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index a8d2139dfd..f6e2060c0f 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -22,9 +22,7 @@ use std::collections::HashMap; -use iceberg::io::storage::config::{ - S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{ diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 9d0788b63c..bc036d0c6b 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -22,9 +22,7 @@ use std::collections::HashMap; -use iceberg::io::storage::config::{ - S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index c2f4654fd0..32c11bf203 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -54,8 +54,6 @@ use crate::types::{ pub const REST_CATALOG_PROP_URI: &str = "uri"; /// REST catalog warehouse location pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; -/// Disable header redaction in error logs (defaults to false for security) -pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction"; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -332,17 +330,6 @@ impl RestCatalogConfig { params } - /// Check if header redaction is disabled in error logs. - /// - /// Returns true if the `disable-header-redaction` property is set to "true". - /// Defaults to false for security (headers are redacted by default). - pub(crate) fn disable_header_redaction(&self) -> bool { - self.props - .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION) - .map(|v| v.eq_ignore_ascii_case("true")) - .unwrap_or(false) - } - /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server). pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self { if let Some(uri) = config.overrides.remove("uri") { @@ -443,11 +430,7 @@ impl RestCatalog { match http_response.status() { StatusCode::OK => deserialize_catalog_response(http_response).await, - _ => Err(deserialize_unexpected_catalog_error( - http_response, - client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -551,13 +534,7 @@ impl RestCatalog { "Tried to load a table that does not exist", )); } - _ => { - return Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await); - } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), }; // Build config with proper precedence, with each next config overriding previous one: @@ -657,11 +634,7 @@ impl RestCatalog { ErrorKind::Unexpected, "Tried to load credentials for a table that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -721,13 +694,7 @@ impl Catalog for RestCatalog { "The parent parameter of the namespace provided does not exist", )); } - _ => { - return Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await); - } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -762,11 +729,7 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to create a namespace that already exists", )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -790,11 +753,7 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to get a namespace that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -811,11 +770,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(true), StatusCode::NOT_FOUND => Ok(false), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -846,11 +801,7 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to drop a namespace that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -887,13 +838,7 @@ impl Catalog for RestCatalog { "Tried to list tables of a namespace that does not exist", )); } - _ => { - return Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await); - } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -947,13 +892,7 @@ impl Catalog for RestCatalog { "The table already exists", )); } - _ => { - return Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await); - } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), }; let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( @@ -1010,11 +949,7 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to drop a table that does not exist", )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -1032,11 +967,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(true), StatusCode::NOT_FOUND => Ok(false), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -1065,11 +996,7 @@ impl Catalog for RestCatalog { ErrorKind::Unexpected, "Tried to rename a table to a name that already exists", )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } @@ -1113,13 +1040,7 @@ impl Catalog for RestCatalog { "The given table already exists.", )); } - _ => { - return Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await); - } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), }; let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( @@ -1191,13 +1112,7 @@ impl Catalog for RestCatalog { "A server-side gateway timeout occurred; the commit state is unknown.", )); } - _ => { - return Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await); - } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), }; // TODO: Support vended credentials here. diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index e08be19f0a..a7a9852750 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -57,8 +57,6 @@ pub(crate) struct HttpClient { extra_headers: HeaderMap, /// Extra oauth parameters to be added to each authentication request. extra_oauth_params: HashMap, - /// Whether to disable header redaction in error logs (defaults to false for security). - disable_header_redaction: bool, } impl Debug for HttpClient { @@ -82,7 +80,6 @@ impl HttpClient { authenticator: None, extra_headers, extra_oauth_params: cfg.extra_oauth_params(), - disable_header_redaction: cfg.disable_header_redaction(), }) } @@ -111,7 +108,6 @@ impl HttpClient { } else { self.extra_oauth_params }, - disable_header_redaction: cfg.disable_header_redaction(), }) } @@ -341,11 +337,6 @@ impl HttpClient { self.execute(request).await } } - - /// Returns whether header redaction is disabled for this client. - pub(crate) fn disable_header_redaction(&self) -> bool { - self.disable_header_redaction - } } /// Deserializes a catalog response into the given [`DeserializedOwned`] type. @@ -366,64 +357,14 @@ pub(crate) async fn deserialize_catalog_response( }) } -/// Headers that contain sensitive information and should be excluded from logs. -const SENSITIVE_HEADERS: &[&str] = &[ - "authorization", - "proxy-authorization", - "set-cookie", - "cookie", - "x-api-key", - "x-auth-token", -]; - -/// Returns true if the header name is considered sensitive. -fn is_sensitive_header(name: &str) -> bool { - let name_lower = name.to_lowercase(); - SENSITIVE_HEADERS.iter().any(|h| name_lower == *h) -} - -/// Redacts sensitive headers and returns a debug-formatted string. -/// -/// If `disable_redaction` is true, returns all headers without redaction. -/// Otherwise, replaces sensitive header values with "[REDACTED]". -fn format_headers_redacted(headers: &HeaderMap, disable_redaction: bool) -> String { - if disable_redaction { - // Return all headers as-is without redaction - let all: HashMap<&str, &str> = headers - .iter() - .filter_map(|(name, value)| value.to_str().ok().map(|v| (name.as_str(), v))) - .collect(); - return format!("{all:?}"); - } - - // Redact sensitive headers by replacing their values with "[REDACTED]" - let redacted: HashMap<&str, &str> = headers - .iter() - .filter_map(|(name, value)| { - if is_sensitive_header(name.as_str()) { - Some((name.as_str(), "[REDACTED]")) - } else { - value.to_str().ok().map(|v| (name.as_str(), v)) - } - }) - .collect(); - format!("{redacted:?}") -} - /// Deserializes a unexpected catalog response into an error. -pub(crate) async fn deserialize_unexpected_catalog_error( - response: Response, - disable_header_redaction: bool, -) -> Error { +pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error { let err = Error::new( ErrorKind::Unexpected, "Received response with unexpected status code", ) .with_context("status", response.status().to_string()) - .with_context( - "headers", - format_headers_redacted(response.headers(), disable_header_redaction), - ); + .with_context("headers", format!("{:?}", response.headers())); let bytes = match response.bytes().await { Ok(bytes) => bytes, @@ -435,124 +376,3 @@ pub(crate) async fn deserialize_unexpected_catalog_error( } err.with_context("json", String::from_utf8_lossy(&bytes)) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_format_headers_redacted_empty() { - let headers = HeaderMap::new(); - let result = format_headers_redacted(&headers, false); - assert_eq!(result, "{}"); - } - - #[test] - fn test_format_headers_redacted_non_sensitive() { - let mut headers = HeaderMap::new(); - headers.insert("content-type", "application/json".parse().unwrap()); - headers.insert("x-request-id", "abc123".parse().unwrap()); - - let result = format_headers_redacted(&headers, false); - - assert!(result.contains("content-type")); - assert!(result.contains("application/json")); - assert!(result.contains("x-request-id")); - assert!(result.contains("abc123")); - } - - #[test] - fn test_format_headers_redacted_filters_sensitive() { - let mut headers = HeaderMap::new(); - headers.insert("authorization", "Bearer secret-token".parse().unwrap()); - headers.insert("content-type", "application/json".parse().unwrap()); - - let result = format_headers_redacted(&headers, false); - - // Sensitive header should be present but with redacted value - assert!(result.contains("authorization")); - assert!(result.contains("[REDACTED]")); - // Sensitive value should NOT be present - assert!(!result.contains("secret-token")); - // Non-sensitive header should be present with actual value - assert!(result.contains("content-type")); - assert!(result.contains("application/json")); - } - - #[test] - fn test_format_headers_redacted_filters_set_cookie() { - let mut headers = HeaderMap::new(); - headers.insert( - "set-cookie", - "CF_Authorization=sensitive-session-token; Path=/; Secure;" - .parse() - .unwrap(), - ); - headers.insert("server", "cloudflare".parse().unwrap()); - - let result = format_headers_redacted(&headers, false); - - // Sensitive header should be present but with redacted value - assert!(result.contains("set-cookie")); - assert!(result.contains("[REDACTED]")); - // Sensitive value should NOT be present - assert!(!result.contains("sensitive-session-token")); - // Non-sensitive header should be present with actual value - assert!(result.contains("server")); - assert!(result.contains("cloudflare")); - } - - #[test] - fn test_format_headers_redacted_filters_all_sensitive() { - let mut headers = HeaderMap::new(); - headers.insert("authorization", "Bearer token".parse().unwrap()); - headers.insert("proxy-authorization", "Basic creds".parse().unwrap()); - headers.insert("set-cookie", "session=abc".parse().unwrap()); - headers.insert("cookie", "session=abc".parse().unwrap()); - headers.insert("x-api-key", "api-key-123".parse().unwrap()); - headers.insert("x-auth-token", "auth-token-456".parse().unwrap()); - headers.insert("x-request-id", "req-123".parse().unwrap()); - - let result = format_headers_redacted(&headers, false); - - // All sensitive headers should be present but with redacted values - assert!(result.contains("authorization")); - assert!(result.contains("proxy-authorization")); - assert!(result.contains("set-cookie")); - assert!(result.contains("cookie")); - assert!(result.contains("x-api-key")); - assert!(result.contains("x-auth-token")); - assert!(result.contains("[REDACTED]")); - - // Ensure no sensitive values leaked - assert!(!result.contains("Bearer token")); - assert!(!result.contains("Basic creds")); - assert!(!result.contains("session=abc")); - assert!(!result.contains("api-key-123")); - assert!(!result.contains("auth-token-456")); - - // Non-sensitive header should be present with actual value - assert!(result.contains("x-request-id")); - assert!(result.contains("req-123")); - } - - #[test] - fn test_format_headers_with_redaction_disabled() { - let mut headers = HeaderMap::new(); - headers.insert("authorization", "Bearer secret-token".parse().unwrap()); - headers.insert("x-api-key", "api-key-123".parse().unwrap()); - headers.insert("content-type", "application/json".parse().unwrap()); - - let result = format_headers_redacted(&headers, true); - - // When redaction is disabled, all headers and values should be present - assert!(result.contains("authorization")); - assert!(result.contains("Bearer secret-token")); - assert!(result.contains("x-api-key")); - assert!(result.contains("api-key-123")); - assert!(result.contains("content-type")); - assert!(result.contains("application/json")); - // [REDACTED] should NOT be present when redaction is disabled - assert!(!result.contains("[REDACTED]")); - } -} diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index a6ac60bc79..3606fac99a 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -448,9 +448,9 @@ impl Catalog for S3TablesCatalog { .await .map_err(from_aws_sdk_error)?; - // prepare table location. the warehouse location is generated by s3tables catalog, + // prepare metadata location. the warehouse location is generated by s3tables catalog, // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3 - let table_location = match &creation.location { + let metadata_location = match &creation.location { Some(_) => { return Err(Error::new( ErrorKind::DataInvalid, @@ -467,17 +467,16 @@ impl Catalog for S3TablesCatalog { .send() .await .map_err(from_aws_sdk_error)?; - get_resp.warehouse_location().to_string() + let warehouse_location = get_resp.warehouse_location().to_string(); + MetadataLocation::new_with_table_location(warehouse_location).to_string() } }; // write metadata to file - creation.location = Some(table_location.clone()); + creation.location = Some(metadata_location.clone()); let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = - MetadataLocation::new_with_table_location(table_location).to_string(); metadata.write_to(&self.file_io, &metadata_location).await?; // update metadata location diff --git a/crates/examples/src/oss_backend.rs b/crates/examples/src/oss_backend.rs index a5e2776cb2..8f0b866d23 100644 --- a/crates/examples/src/oss_backend.rs +++ b/crates/examples/src/oss_backend.rs @@ -18,10 +18,6 @@ use std::collections::HashMap; use futures::stream::StreamExt; -use iceberg::io::storage::config::{ - OSS_ACCESS_KEY_ID as OSS_ACCESS_KEY_ID_PROP, - OSS_ACCESS_KEY_SECRET as OSS_ACCESS_KEY_SECRET_PROP, OSS_ENDPOINT as OSS_ENDPOINT_PROP, -}; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder}; @@ -51,13 +47,16 @@ async fn main() { "rest", HashMap::from([ (REST_CATALOG_PROP_URI.to_string(), REST_URI.to_string()), - (OSS_ENDPOINT_PROP.to_string(), OSS_ENDPOINT.to_string()), ( - OSS_ACCESS_KEY_ID_PROP.to_string(), + iceberg::io::OSS_ENDPOINT.to_string(), + OSS_ENDPOINT.to_string(), + ), + ( + iceberg::io::OSS_ACCESS_KEY_ID.to_string(), OSS_ACCESS_KEY_ID.to_string(), ), ( - OSS_ACCESS_KEY_SECRET_PROP.to_string(), + iceberg::io::OSS_ACCESS_KEY_SECRET.to_string(), OSS_ACCESS_KEY_SECRET.to_string(), ), ]), diff --git a/crates/iceberg/src/io/storage/config/azdls.rs b/crates/iceberg/src/io/config/azdls.rs similarity index 100% rename from crates/iceberg/src/io/storage/config/azdls.rs rename to crates/iceberg/src/io/config/azdls.rs diff --git a/crates/iceberg/src/io/storage/config/gcs.rs b/crates/iceberg/src/io/config/gcs.rs similarity index 100% rename from crates/iceberg/src/io/storage/config/gcs.rs rename to crates/iceberg/src/io/config/gcs.rs diff --git a/crates/iceberg/src/io/storage/config/mod.rs b/crates/iceberg/src/io/config/mod.rs similarity index 100% rename from crates/iceberg/src/io/storage/config/mod.rs rename to crates/iceberg/src/io/config/mod.rs diff --git a/crates/iceberg/src/io/storage/config/oss.rs b/crates/iceberg/src/io/config/oss.rs similarity index 100% rename from crates/iceberg/src/io/storage/config/oss.rs rename to crates/iceberg/src/io/config/oss.rs diff --git a/crates/iceberg/src/io/storage/config/s3.rs b/crates/iceberg/src/io/config/s3.rs similarity index 100% rename from crates/iceberg/src/io/storage/config/s3.rs rename to crates/iceberg/src/io/config/s3.rs diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 267f295369..1ad71d8531 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -23,7 +23,8 @@ use std::sync::Arc; use bytes::Bytes; use url::Url; -use super::storage::{OpenDalStorage, Storage}; +use super::opendal::OpenDalStorage; +use super::storage::Storage; use crate::{Error, ErrorKind, Result}; /// FileIO implementation, used to manipulate files in underlying storage. diff --git a/crates/iceberg/src/io/storage/local_fs.rs b/crates/iceberg/src/io/local_fs.rs similarity index 99% rename from crates/iceberg/src/io/storage/local_fs.rs rename to crates/iceberg/src/io/local_fs.rs index d6dd5b433b..0a55199f70 100644 --- a/crates/iceberg/src/io/storage/local_fs.rs +++ b/crates/iceberg/src/io/local_fs.rs @@ -31,7 +31,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use crate::io::{ +use super::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; diff --git a/crates/iceberg/src/io/storage/memory.rs b/crates/iceberg/src/io/memory.rs similarity index 99% rename from crates/iceberg/src/io/storage/memory.rs rename to crates/iceberg/src/io/memory.rs index cb01ee4709..39f1f5db9d 100644 --- a/crates/iceberg/src/io/storage/memory.rs +++ b/crates/iceberg/src/io/memory.rs @@ -30,7 +30,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use crate::io::{ +use super::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 3da6d407ca..c6af7db1e6 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -23,8 +23,7 @@ //! //! ```rust //! use iceberg::Result; -//! use iceberg::io::FileIOBuilder; -//! use iceberg::io::storage::config::S3_REGION; +//! use iceberg::io::{FileIOBuilder, S3_REGION}; //! //! # fn test() -> Result<()> { //! // Build a memory file io. @@ -43,8 +42,7 @@ //! //! ```rust //! use iceberg::Result; -//! use iceberg::io::FileIO; -//! use iceberg::io::storage::config::S3_REGION; +//! use iceberg::io::{FileIO, S3_REGION}; //! //! # fn test() -> Result<()> { //! // Build a memory file io. @@ -68,19 +66,23 @@ //! - `new_input`: Create input file for reading. //! - `new_output`: Create output file for writing. +mod config; mod file_io; +mod local_fs; +mod memory; +mod opendal; mod refreshable_accessor; mod refreshable_storage; -pub mod storage; +mod storage; mod storage_credential; +pub use config::*; pub use file_io::*; #[cfg(feature = "storage-s3")] -pub use storage::opendal::CustomAwsCredentialLoader; -pub use storage::opendal::{OpenDalStorage, OpenDalStorageFactory}; +pub use opendal::CustomAwsCredentialLoader; +pub use opendal::{OpenDalStorage, OpenDalStorageFactory}; pub use storage::{Storage, StorageConfig, StorageFactory}; pub use storage_credential::*; - pub(crate) mod object_cache; pub(crate) fn is_truthy(value: &str) -> bool { diff --git a/crates/iceberg/src/io/storage/opendal/azdls.rs b/crates/iceberg/src/io/opendal/azdls.rs similarity index 99% rename from crates/iceberg/src/io/storage/opendal/azdls.rs rename to crates/iceberg/src/io/opendal/azdls.rs index ea77c8a667..c957fd62a3 100644 --- a/crates/iceberg/src/io/storage/opendal/azdls.rs +++ b/crates/iceberg/src/io/opendal/azdls.rs @@ -24,7 +24,7 @@ use opendal::services::AzdlsConfig; use serde::{Deserialize, Serialize}; use url::Url; -use super::super::config::{ +use crate::io::config::{ ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET, ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID, }; diff --git a/crates/iceberg/src/io/storage/opendal/fs.rs b/crates/iceberg/src/io/opendal/fs.rs similarity index 100% rename from crates/iceberg/src/io/storage/opendal/fs.rs rename to crates/iceberg/src/io/opendal/fs.rs diff --git a/crates/iceberg/src/io/storage/opendal/gcs.rs b/crates/iceberg/src/io/opendal/gcs.rs similarity index 99% rename from crates/iceberg/src/io/storage/opendal/gcs.rs rename to crates/iceberg/src/io/opendal/gcs.rs index d30051859b..5c6145d32b 100644 --- a/crates/iceberg/src/io/storage/opendal/gcs.rs +++ b/crates/iceberg/src/io/opendal/gcs.rs @@ -22,7 +22,7 @@ use opendal::Operator; use opendal::services::GcsConfig; use url::Url; -use super::super::config::{ +use crate::io::config::{ GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, }; diff --git a/crates/iceberg/src/io/storage/opendal/memory.rs b/crates/iceberg/src/io/opendal/memory.rs similarity index 100% rename from crates/iceberg/src/io/storage/opendal/memory.rs rename to crates/iceberg/src/io/opendal/memory.rs diff --git a/crates/iceberg/src/io/storage/opendal/mod.rs b/crates/iceberg/src/io/opendal/mod.rs similarity index 99% rename from crates/iceberg/src/io/storage/opendal/mod.rs rename to crates/iceberg/src/io/opendal/mod.rs index 3913de5a8c..f1a1dce7d2 100644 --- a/crates/iceberg/src/io/storage/opendal/mod.rs +++ b/crates/iceberg/src/io/opendal/mod.rs @@ -39,13 +39,13 @@ use opendal::{Operator, Scheme}; pub use s3::CustomAwsCredentialLoader; use serde::{Deserialize, Serialize}; -use crate::catalog::TableIdent; -use crate::io::file_io::Extensions; -use crate::io::refreshable_storage::RefreshableOpenDalStorageBuilder; -use crate::io::{ +use super::file_io::Extensions; +use super::refreshable_storage::RefreshableOpenDalStorageBuilder; +use super::{ FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, MetadataLocation, OutputFile, Storage, StorageConfig, StorageCredential, StorageCredentialsLoader, StorageFactory, }; +use crate::catalog::TableIdent; use crate::{Error, ErrorKind, Result}; #[cfg(feature = "storage-azdls")] @@ -216,7 +216,7 @@ pub enum OpenDalStorage { /// The refreshable storage backend. /// `None` only after deserialization (cannot be reconstructed from serialized form). #[serde(skip)] - backend: Option>, + backend: Option>, }, } diff --git a/crates/iceberg/src/io/storage/opendal/oss.rs b/crates/iceberg/src/io/opendal/oss.rs similarity index 95% rename from crates/iceberg/src/io/storage/opendal/oss.rs rename to crates/iceberg/src/io/opendal/oss.rs index 64210b0f9a..83fc1424aa 100644 --- a/crates/iceberg/src/io/storage/opendal/oss.rs +++ b/crates/iceberg/src/io/opendal/oss.rs @@ -21,7 +21,7 @@ use opendal::services::OssConfig; use opendal::{Configurator, Operator}; use url::Url; -use super::super::config::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; +use crate::io::config::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; use crate::{Error, ErrorKind, Result}; /// Parse iceberg props to oss config. diff --git a/crates/iceberg/src/io/storage/opendal/s3.rs b/crates/iceberg/src/io/opendal/s3.rs similarity index 99% rename from crates/iceberg/src/io/storage/opendal/s3.rs rename to crates/iceberg/src/io/opendal/s3.rs index 8d46c2a698..bf7399e01b 100644 --- a/crates/iceberg/src/io/storage/opendal/s3.rs +++ b/crates/iceberg/src/io/opendal/s3.rs @@ -25,7 +25,7 @@ pub use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; use url::Url; -use super::super::config::{ +use crate::io::config::{ CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN, S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD, S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY, diff --git a/crates/iceberg/src/io/refreshable_storage.rs b/crates/iceberg/src/io/refreshable_storage.rs index b4cbd1069f..3a79f02b94 100644 --- a/crates/iceberg/src/io/refreshable_storage.rs +++ b/crates/iceberg/src/io/refreshable_storage.rs @@ -23,8 +23,8 @@ use opendal::Operator; use opendal::raw::*; use tokio::sync::Mutex as AsyncMutex; +use super::opendal::OpenDalStorage; use super::refreshable_accessor::RefreshableAccessor; -use super::storage::opendal::OpenDalStorage; use crate::catalog::TableIdent; use crate::io::file_io::Extensions; use crate::io::{StorageCredential, StorageCredentialsLoader}; diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage.rs similarity index 93% rename from crates/iceberg/src/io/storage/mod.rs rename to crates/iceberg/src/io/storage.rs index f3786667e3..15cc85ab10 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage.rs @@ -17,25 +17,15 @@ //! Storage interfaces for Iceberg. -pub mod config; -mod local_fs; -mod memory; -pub mod opendal; - use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -pub use config::*; -pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; -pub use memory::{MemoryStorage, MemoryStorageFactory}; -#[cfg(feature = "storage-s3")] -pub use opendal::CustomAwsCredentialLoader; -pub use opendal::{OpenDalStorage, OpenDalStorageFactory}; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; +pub use crate::io::config::StorageConfig; /// Trait for storage operations in Iceberg. /// diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index 42f44ef822..005b0f3979 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -24,8 +24,7 @@ mod tests { use std::collections::HashMap; use bytes::Bytes; - use iceberg::io::storage::config::{GCS_NO_AUTH, GCS_SERVICE_PATH}; - use iceberg::io::{FileIO, FileIOBuilder}; + use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH}; use iceberg_test_utils::{get_gcs_endpoint, set_up}; static FAKE_GCS_BUCKET: &str = "test-bucket"; diff --git a/crates/iceberg/tests/file_io_s3_test.rs b/crates/iceberg/tests/file_io_s3_test.rs index c00c27955a..f28538e73e 100644 --- a/crates/iceberg/tests/file_io_s3_test.rs +++ b/crates/iceberg/tests/file_io_s3_test.rs @@ -24,10 +24,10 @@ mod tests { use std::sync::Arc; use async_trait::async_trait; - use iceberg::io::storage::config::{ - S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + use iceberg::io::{ + CustomAwsCredentialLoader, FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, + S3_SECRET_ACCESS_KEY, }; - use iceberg::io::{CustomAwsCredentialLoader, FileIO, FileIOBuilder}; use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; diff --git a/crates/integration_tests/src/lib.rs b/crates/integration_tests/src/lib.rs index 8d18545ddb..4bf8f4d19c 100644 --- a/crates/integration_tests/src/lib.rs +++ b/crates/integration_tests/src/lib.rs @@ -18,9 +18,7 @@ use std::collections::HashMap; use std::sync::OnceLock; -use iceberg::io::storage::config::{ - S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg_catalog_rest::REST_CATALOG_PROP_URI; use iceberg_test_utils::{get_minio_endpoint, get_rest_catalog_endpoint, set_up}; diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 21920c9ce6..2f0d08fbd9 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -188,22 +188,18 @@ services: # Provision service - creates test data via Spark provision: - image: ghcr.io/astral-sh/uv:python3.12-bookworm-slim + image: python:3.12-slim networks: iceberg_test: depends_on: spark-iceberg: condition: service_healthy - entrypoint: ["/bin/sh", "-c", "uv run /opt/spark/provision.py && touch /tmp/provision_complete && tail -f /dev/null"] + entrypoint: ["/bin/sh", "-c", "pip install -q 'pyspark[connect]==4.0.1' && python3 /opt/spark/provision.py && touch /tmp/provision_complete && tail -f /dev/null"] volumes: - ./spark/provision.py:/opt/spark/provision.py:ro - - uv-cache:/root/.cache/uv healthcheck: test: ["CMD-SHELL", "[ -f /tmp/provision_complete ]"] interval: 2s timeout: 2s retries: 90 start_period: 20s - -volumes: - uv-cache: diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 40f9ba0f38..c53a1dd842 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -15,13 +15,6 @@ # specific language governing permissions and limitations # under the License. -# /// script -# requires-python = ">=3.12" -# dependencies = [ -# "pyspark[connect]==4.0.1", -# ] -# /// - from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr