diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index d7f69bb8f6..4ba2bedb40 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -135,7 +135,6 @@ datafusion = [ "parquet", ] datafusion-ext = ["datafusion"] -gcs = ["object_store/gcp"] json = ["parquet/json"] python = ["arrow/pyarrow"] unity-experimental = ["reqwest", "hyper"] diff --git a/crates/deltalake-gcp/Cargo.toml b/crates/deltalake-gcp/Cargo.toml new file mode 100644 index 0000000000..7a49aa0f19 --- /dev/null +++ b/crates/deltalake-gcp/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "deltalake-gcp" +version = "0.1.0" +edition = "2021" + +[dependencies] +deltalake-core = { path = "../deltalake-core" } +lazy_static = "1" + +# workspace depenndecies +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +tracing = { workspace = true } +object_store = { workspace = true, features = ["gcp"]} +thiserror = { workspace = true } +tokio = { workspace = true } +regex = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +chrono = { workspace = true } +serial_test = "3" +deltalake-test = { path = "../deltalake-test" } +pretty_env_logger = "*" +rand = "0.8" +serde_json = { workspace = true } +tempfile = "3" + +[features] +integration_test = [] diff --git a/crates/deltalake-gcp/src/config.rs b/crates/deltalake-gcp/src/config.rs new file mode 100644 index 0000000000..fbc99c7edd --- /dev/null +++ b/crates/deltalake-gcp/src/config.rs @@ -0,0 +1,163 @@ +//! Auxiliary module for generating a valig Google cloud configuration. +//! +//! Google offers few ways to authenticate against storage accounts and +//! provide credentials for a service principal. Some of this configutaion may +//! partially be specified in the environment. This module establishes a structured +//! way how we discover valid credentials and some heuristics on how they are prioritized. +use std::collections::{hash_map::Entry, HashMap}; +use std::str::FromStr; + +use object_store::gcp::GoogleConfigKey; +use object_store::Error as ObjectStoreError; + +use crate::error::Result; + +lazy_static::lazy_static! { + static ref CREDENTIAL_KEYS: Vec = + Vec::from_iter([ + GoogleConfigKey::ServiceAccountKey, + GoogleConfigKey::ApplicationCredentials, + ]); +} + +/// Credential +enum GcpCredential { + /// Using the service account key + ServiceAccountKey, + /// Using application credentials + ApplicationCredentials, +} + +impl GcpCredential { + /// required configuration keys for variant + fn keys(&self) -> Vec { + match self { + Self::ServiceAccountKey => Vec::from_iter([GoogleConfigKey::ServiceAccountKey]), + Self::ApplicationCredentials => { + Vec::from_iter([GoogleConfigKey::ApplicationCredentials]) + } + } + } +} + +/// Helper struct to create full configuration from passed options and environment +/// +/// Main concern is to pick the desired credential for connecting to starage backend +/// based on a provided configuration and configuration set in the environment. +pub(crate) struct GcpConfigHelper { + config: HashMap, + env_config: HashMap, + priority: Vec, +} + +impl GcpConfigHelper { + /// Create a new [`ConfigHelper`] + pub fn try_new( + config: impl IntoIterator, impl Into)>, + ) -> Result { + let mut env_config = HashMap::new(); + for (os_key, os_value) in std::env::vars_os() { + if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { + if key.starts_with("GOOGLE_") { + if let Ok(config_key) = GoogleConfigKey::from_str(&key.to_ascii_lowercase()) { + env_config.insert(config_key, value.to_string()); + } + } + } + } + + Ok(Self { + config: config + .into_iter() + .map(|(key, value)| Ok((GoogleConfigKey::from_str(key.as_ref())?, value.into()))) + .collect::>()?, + env_config, + priority: Vec::from_iter([ + GcpCredential::ServiceAccountKey, + GcpCredential::ApplicationCredentials, + ]), + }) + } + + /// Check if all credential keys are contained in passed config + fn has_full_config(&self, cred: &GcpCredential) -> bool { + cred.keys().iter().all(|key| self.config.contains_key(key)) + } + + /// Check if any credential keys are contained in passed config + fn has_any_config(&self, cred: &GcpCredential) -> bool { + cred.keys().iter().any(|key| self.config.contains_key(key)) + } + + /// Check if all credential keys can be provided using the env + fn has_full_config_with_env(&self, cred: &GcpCredential) -> bool { + cred.keys() + .iter() + .all(|key| self.config.contains_key(key) || self.env_config.contains_key(key)) + } + + /// Generate a cofiguration augmented with options from the environment + pub fn build(mut self) -> Result> { + let mut has_credential = false; + + // try using only passed config options + if !has_credential { + for cred in &self.priority { + if self.has_full_config(cred) { + has_credential = true; + break; + } + } + } + + // try partially avaialbe credentials augmented by environment + if !has_credential { + for cred in &self.priority { + if self.has_any_config(cred) && self.has_full_config_with_env(cred) { + for key in cred.keys() { + if let Entry::Vacant(e) = self.config.entry(key) { + e.insert(self.env_config.get(&key).unwrap().to_owned()); + } + } + has_credential = true; + break; + } + } + } + + // try getting credentials only from the environment + if !has_credential { + for cred in &self.priority { + if self.has_full_config_with_env(cred) { + for key in cred.keys() { + if let Entry::Vacant(e) = self.config.entry(key) { + e.insert(self.env_config.get(&key).unwrap().to_owned()); + } + } + has_credential = true; + break; + } + } + } + + let omit_keys = if has_credential { + CREDENTIAL_KEYS.clone() + } else { + Vec::new() + }; + + // Add keys from the environment to the configuration, as e.g. client configuration options. + // NOTE We have to specifically configure omitting keys, since workload identity can + // work purely using defaults, but partial config may be present in the environment. + // Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store) + for key in self.env_config.keys() { + if !omit_keys.contains(key) { + if let Entry::Vacant(e) = self.config.entry(*key) { + e.insert(self.env_config.get(key).unwrap().to_owned()); + } + } + } + + Ok(self.config) + } +} diff --git a/crates/deltalake-gcp/src/error.rs b/crates/deltalake-gcp/src/error.rs new file mode 100644 index 0000000000..aca1321c3d --- /dev/null +++ b/crates/deltalake-gcp/src/error.rs @@ -0,0 +1,21 @@ +use deltalake_core::errors::DeltaTableError; + +pub(crate) type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub(crate) enum Error { + #[error("failed to parse config: {0}")] + Parse(String), + + #[error(transparent)] + ObjectStore(#[from] object_store::Error), +} + +impl From for DeltaTableError { + fn from(e: Error) -> Self { + match e { + Error::Parse(msg) => DeltaTableError::Generic(msg), + Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e }, + } + } +} diff --git a/crates/deltalake-gcp/src/lib.rs b/crates/deltalake-gcp/src/lib.rs new file mode 100644 index 0000000000..6fe040d398 --- /dev/null +++ b/crates/deltalake-gcp/src/lib.rs @@ -0,0 +1,68 @@ +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; +use deltalake_core::storage::{ + factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions, +}; +use deltalake_core::{DeltaResult, Path}; +use object_store::gcp::GoogleConfigKey; +use object_store::parse_url_opts; +use url::Url; + +mod config; +pub mod error; + +trait GcpOptions { + fn as_gcp_options(&self) -> HashMap; +} + +impl GcpOptions for StorageOptions { + fn as_gcp_options(&self) -> HashMap { + self.0 + .iter() + .filter_map(|(key, value)| { + Some(( + GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?, + value.clone(), + )) + }) + .collect() + } +} + +#[derive(Clone, Default, Debug)] +pub struct GcpFactory {} + +impl ObjectStoreFactory for GcpFactory { + fn parse_url_opts( + &self, + url: &Url, + options: &StorageOptions, + ) -> DeltaResult<(ObjectStoreRef, Path)> { + let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?; + let (store, prefix) = parse_url_opts(url, config)?; + Ok((url_prefix_handler(store, prefix.clone())?, prefix)) + } +} + +impl LogStoreFactory for GcpFactory { + fn with_options( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageOptions, + ) -> DeltaResult> { + Ok(default_logstore(store, location, options)) + } +} + +/// Register an [ObjectStoreFactory] for common Google Cloud [Url] schemes +pub fn register_handlers(_additional_prefixes: Option) { + let factory = Arc::new(GcpFactory {}); + let scheme = &"gs"; + let url = Url::parse(&format!("{}://", scheme)).unwrap(); + factories().insert(url.clone(), factory.clone()); + logstores().insert(url.clone(), factory.clone()); +} diff --git a/crates/deltalake-gcp/tests/context.rs b/crates/deltalake-gcp/tests/context.rs new file mode 100644 index 0000000000..b96bd1f41b --- /dev/null +++ b/crates/deltalake-gcp/tests/context.rs @@ -0,0 +1,159 @@ +use chrono::Utc; +use deltalake_core::errors::DeltaTableError; +use deltalake_core::logstore::LogStore; +use deltalake_core::table::builder::DeltaTableBuilder; +use deltalake_gcp::register_handlers; +use deltalake_test::utils::*; +use futures::StreamExt; +use std::collections::HashMap; +use std::process::ExitStatus; +use std::sync::Arc; +use tempfile::TempDir; + +/// Kinds of storage integration +#[derive(Debug)] +pub struct GcpIntegration { + bucket_name: String, + temp_dir: TempDir, +} + +impl Default for GcpIntegration { + fn default() -> Self { + register_handlers(None); + Self { + bucket_name: format!("test-delta-table-{}", Utc::now().timestamp()), + temp_dir: TempDir::new().unwrap(), + } + } +} + +/// Synchronize the contents of two object stores +pub async fn sync_stores( + from_store: Arc, + to_store: Arc, +) -> Result<(), DeltaTableError> { + let from_store = from_store.object_store().clone(); + let to_store = to_store.object_store().clone(); + // TODO if a table is copied within the same root store (i.e bucket), using copy would be MUCH more efficient + let mut meta_stream = from_store.list(None); + while let Some(file) = meta_stream.next().await { + if let Ok(meta) = file { + let bytes = from_store.get(&meta.location).await?.bytes().await?; + to_store.put(&meta.location, bytes).await?; + } + } + Ok(()) +} + +pub async fn copy_table( + from: impl AsRef, + from_options: Option>, + to: impl AsRef, + to_options: Option>, + allow_http: bool, +) -> Result<(), DeltaTableError> { + let from_store = DeltaTableBuilder::from_uri(from) + .with_storage_options(from_options.unwrap_or_default()) + .with_allow_http(allow_http) + .build_storage()?; + let to_store = DeltaTableBuilder::from_uri(to) + .with_storage_options(to_options.unwrap_or_default()) + .with_allow_http(allow_http) + .build_storage()?; + sync_stores(from_store, to_store).await +} + +impl StorageIntegration for GcpIntegration { + fn prepare_env(&self) { + gs_cli::prepare_env(); + let base_url = std::env::var("GOOGLE_BASE_URL").unwrap(); + let token = serde_json::json!({"gcs_base_url": base_url, "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""}); + let account_path = self.temp_dir.path().join("gcs.json"); + println!("accoutn_path: {account_path:?}"); + std::fs::write(&account_path, serde_json::to_vec(&token).unwrap()).unwrap(); + std::env::set_var( + "GOOGLE_SERVICE_ACCOUNT", + account_path.as_path().to_str().unwrap(), + ); + } + + fn create_bucket(&self) -> std::io::Result { + gs_cli::create_bucket(self.bucket_name()) + } + + fn bucket_name(&self) -> String { + self.bucket_name.clone() + } + + fn root_uri(&self) -> String { + format!("gs://{}", self.bucket_name()) + } + + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { + use futures::executor::block_on; + + let to = format!("{}/{}", self.root_uri(), destination); + let _ = block_on(copy_table(source.to_owned(), None, to, None, true)); + Ok(ExitStatus::default()) + } +} + +impl GcpIntegration { + fn delete_bucket(&self) -> std::io::Result { + gs_cli::delete_bucket(self.bucket_name.clone()) + } +} + +/// small wrapper around google api +pub mod gs_cli { + use super::set_env_if_not_set; + use std::process::{Command, ExitStatus}; + + pub fn create_bucket(container_name: impl AsRef) -> std::io::Result { + let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") + .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); + let payload = serde_json::json!({ "name": container_name.as_ref() }); + let mut child = Command::new("curl") + .args([ + "--insecure", + "-v", + "-X", + "POST", + "--data-binary", + &serde_json::to_string(&payload)?, + "-H", + "Content-Type: application/json", + &endpoint, + ]) + .spawn() + .expect("curl command is installed"); + child.wait() + } + + pub fn delete_bucket(container_name: impl AsRef) -> std::io::Result { + let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") + .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); + let payload = serde_json::json!({ "name": container_name.as_ref() }); + let mut child = Command::new("curl") + .args([ + "--insecure", + "-v", + "-X", + "DELETE", + "--data-binary", + &serde_json::to_string(&payload)?, + "-H", + "Content-Type: application/json", + &endpoint, + ]) + .spawn() + .expect("curl command is installed"); + child.wait() + } + + /// prepare_env + pub fn prepare_env() { + set_env_if_not_set("GOOGLE_BASE_URL", "http://localhost:4443"); + set_env_if_not_set("GOOGLE_ENDPOINT_URL", "http://localhost:4443/storage/v1/b"); + } +} diff --git a/crates/deltalake-gcp/tests/integration.rs b/crates/deltalake-gcp/tests/integration.rs new file mode 100644 index 0000000000..d52e2bddc9 --- /dev/null +++ b/crates/deltalake-gcp/tests/integration.rs @@ -0,0 +1,38 @@ +#![cfg(feature = "integration_test")] + +use deltalake_test::read::read_table_paths; +use deltalake_test::{test_concurrent_writes, test_read_tables, IntegrationContext, TestResult}; +use serial_test::serial; + +mod context; +use context::*; + +static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; +/// TEST_PREFIXES as they should appear in object stores. +static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"]; + +#[tokio::test] +#[serial] +#[ignore = "The GCP tests currently hang"] +async fn test_read_tables_gcp() -> TestResult { + let context = IntegrationContext::new(Box::new(GcpIntegration::default()))?; + + test_read_tables(&context).await?; + + for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) { + read_table_paths(&context, prefix, prefix_encoded).await?; + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +#[ignore = "The GCP tests currently hang"] +async fn test_concurrency_gcp() -> TestResult { + let context = IntegrationContext::new(Box::new(GcpIntegration::default()))?; + + test_concurrent_writes(&context).await?; + + Ok(()) +} diff --git a/crates/deltalake-test/src/utils.rs b/crates/deltalake-test/src/utils.rs index 53adc52713..431d7afaca 100644 --- a/crates/deltalake-test/src/utils.rs +++ b/crates/deltalake-test/src/utils.rs @@ -204,61 +204,6 @@ pub fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { }; } -/// small wrapper around google api -pub mod gs_cli { - use super::set_env_if_not_set; - use serde_json::json; - use std::process::{Command, ExitStatus}; - - pub fn create_bucket(container_name: impl AsRef) -> std::io::Result { - let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") - .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); - let payload = json!({ "name": container_name.as_ref() }); - let mut child = Command::new("curl") - .args([ - "--insecure", - "-v", - "-X", - "POST", - "--data-binary", - &serde_json::to_string(&payload)?, - "-H", - "Content-Type: application/json", - &endpoint, - ]) - .spawn() - .expect("curl command is installed"); - child.wait() - } - - pub fn delete_bucket(container_name: impl AsRef) -> std::io::Result { - let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") - .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); - let payload = json!({ "name": container_name.as_ref() }); - let mut child = Command::new("curl") - .args([ - "--insecure", - "-v", - "-X", - "DELETE", - "--data-binary", - &serde_json::to_string(&payload)?, - "-H", - "Content-Type: application/json", - &endpoint, - ]) - .spawn() - .expect("curl command is installed"); - child.wait() - } - - /// prepare_env - pub fn prepare_env() { - set_env_if_not_set("GOOGLE_BASE_URL", "http://localhost:4443"); - set_env_if_not_set("GOOGLE_ENDPOINT_URL", "http://localhost:4443/storage/v1/b"); - } -} - /// small wrapper around hdfs cli pub mod hdfs_cli { use std::env; diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 97bddf2c58..b0bcac777d 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -16,6 +16,7 @@ edition = "2021" deltalake-core = { path = "../deltalake-core" } deltalake-aws = { path = "../deltalake-aws", default-features = false, optional = true } deltalake-azure = { path = "../deltalake-azure", optional = true } +deltalake-gcp = { path = "../deltalake-gcp", optional = true } deltalake-catalog-glue = { path = "../deltalake-catalog-glue", optional = true } [features] @@ -26,7 +27,7 @@ arrow = ["deltalake-core/arrow"] default = ["arrow"] datafusion = ["deltalake-core/datafusion"] datafusion-ext = ["datafusion"] -gcs = ["deltalake-core/gcs"] +gcs = ["deltalake-gcp"] glue = ["deltalake-catalog-glue"] hdfs = [] json = ["deltalake-core/json"] diff --git a/crates/deltalake/src/lib.rs b/crates/deltalake/src/lib.rs index 98825f2c22..38dc5d52dc 100644 --- a/crates/deltalake/src/lib.rs +++ b/crates/deltalake/src/lib.rs @@ -7,3 +7,5 @@ pub use deltalake_core::*; pub use deltalake_aws as aws; #[cfg(feature = "azure")] pub use deltalake_azure as azure; +#[cfg(feature = "gcs")] +pub use deltalake_gcp as gcp; diff --git a/python/src/lib.rs b/python/src/lib.rs index 8318d0bab9..52621492b9 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1504,6 +1504,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { deltalake::aws::register_handlers(None); deltalake::azure::register_handlers(None); + deltalake::gcp::register_handlers(None); m.add("DeltaError", py.get_type::())?; m.add("CommitFailedError", py.get_type::())?;