Skip to content

Commit

Permalink
Re-introduce the GCP suppor code in deltalake-gcp
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Jan 19, 2024
1 parent 5eda27c commit ef8db1a
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 57 deletions.
1 change: 0 additions & 1 deletion crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ datafusion = [
"parquet",
]
datafusion-ext = ["datafusion"]
gcs = ["object_store/gcp"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
unity-experimental = ["reqwest", "hyper"]
Expand Down
31 changes: 31 additions & 0 deletions crates/deltalake-gcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "deltalake-gcp"
version = "0.1.0"
edition = "2021"

[dependencies]
deltalake-core = { path = "../deltalake-core" }
lazy_static = "1"

# workspace depenndecies
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true, features = ["gcp"]}
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../deltalake-test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }
tempfile = "3"

[features]
integration_test = []
163 changes: 163 additions & 0 deletions crates/deltalake-gcp/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! Auxiliary module for generating a valig Google cloud configuration.
//!
//! Google offers few ways to authenticate against storage accounts and
//! provide credentials for a service principal. Some of this configutaion may
//! partially be specified in the environment. This module establishes a structured
//! way how we discover valid credentials and some heuristics on how they are prioritized.
use std::collections::{hash_map::Entry, HashMap};
use std::str::FromStr;

use object_store::gcp::GoogleConfigKey;
use object_store::Error as ObjectStoreError;

use crate::error::Result;

lazy_static::lazy_static! {
static ref CREDENTIAL_KEYS: Vec<GoogleConfigKey> =
Vec::from_iter([
GoogleConfigKey::ServiceAccountKey,
GoogleConfigKey::ApplicationCredentials,
]);
}

/// Credential
enum GcpCredential {
/// Using the service account key
ServiceAccountKey,
/// Using application credentials
ApplicationCredentials,
}

impl GcpCredential {
/// required configuration keys for variant
fn keys(&self) -> Vec<GoogleConfigKey> {
match self {
Self::ServiceAccountKey => Vec::from_iter([GoogleConfigKey::ServiceAccountKey]),
Self::ApplicationCredentials => {
Vec::from_iter([GoogleConfigKey::ApplicationCredentials])
}
}
}
}

/// Helper struct to create full configuration from passed options and environment
///
/// Main concern is to pick the desired credential for connecting to starage backend
/// based on a provided configuration and configuration set in the environment.
pub(crate) struct GcpConfigHelper {
config: HashMap<GoogleConfigKey, String>,
env_config: HashMap<GoogleConfigKey, String>,
priority: Vec<GcpCredential>,
}

impl GcpConfigHelper {
/// Create a new [`ConfigHelper`]
pub fn try_new(
config: impl IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
) -> Result<Self> {
let mut env_config = HashMap::new();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if key.starts_with("GOOGLE_") {
if let Ok(config_key) = GoogleConfigKey::from_str(&key.to_ascii_lowercase()) {
env_config.insert(config_key, value.to_string());
}
}
}
}

Ok(Self {
config: config
.into_iter()
.map(|(key, value)| Ok((GoogleConfigKey::from_str(key.as_ref())?, value.into())))
.collect::<Result<_, ObjectStoreError>>()?,
env_config,
priority: Vec::from_iter([
GcpCredential::ServiceAccountKey,
GcpCredential::ApplicationCredentials,
]),
})
}

/// Check if all credential keys are contained in passed config
fn has_full_config(&self, cred: &GcpCredential) -> bool {
cred.keys().iter().all(|key| self.config.contains_key(key))
}

/// Check if any credential keys are contained in passed config
fn has_any_config(&self, cred: &GcpCredential) -> bool {
cred.keys().iter().any(|key| self.config.contains_key(key))
}

/// Check if all credential keys can be provided using the env
fn has_full_config_with_env(&self, cred: &GcpCredential) -> bool {
cred.keys()
.iter()
.all(|key| self.config.contains_key(key) || self.env_config.contains_key(key))
}

/// Generate a cofiguration augmented with options from the environment
pub fn build(mut self) -> Result<HashMap<GoogleConfigKey, String>> {
let mut has_credential = false;

// try using only passed config options
if !has_credential {
for cred in &self.priority {
if self.has_full_config(cred) {
has_credential = true;
break;
}
}
}

// try partially avaialbe credentials augmented by environment
if !has_credential {
for cred in &self.priority {
if self.has_any_config(cred) && self.has_full_config_with_env(cred) {
for key in cred.keys() {
if let Entry::Vacant(e) = self.config.entry(key) {
e.insert(self.env_config.get(&key).unwrap().to_owned());
}
}
has_credential = true;
break;
}
}
}

// try getting credentials only from the environment
if !has_credential {
for cred in &self.priority {
if self.has_full_config_with_env(cred) {
for key in cred.keys() {
if let Entry::Vacant(e) = self.config.entry(key) {
e.insert(self.env_config.get(&key).unwrap().to_owned());
}
}
has_credential = true;
break;
}
}
}

let omit_keys = if has_credential {
CREDENTIAL_KEYS.clone()
} else {
Vec::new()
};

// Add keys from the environment to the configuration, as e.g. client configuration options.
// NOTE We have to specifically configure omitting keys, since workload identity can
// work purely using defaults, but partial config may be present in the environment.
// Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store)
for key in self.env_config.keys() {
if !omit_keys.contains(key) {
if let Entry::Vacant(e) = self.config.entry(*key) {
e.insert(self.env_config.get(key).unwrap().to_owned());
}
}
}

Ok(self.config)
}
}
21 changes: 21 additions & 0 deletions crates/deltalake-gcp/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use deltalake_core::errors::DeltaTableError;

pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("failed to parse config: {0}")]
Parse(String),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),
}

impl From<Error> for DeltaTableError {
fn from(e: Error) -> Self {
match e {
Error::Parse(msg) => DeltaTableError::Generic(msg),
Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e },
}
}
}
68 changes: 68 additions & 0 deletions crates/deltalake-gcp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::gcp::GoogleConfigKey;
use object_store::parse_url_opts;
use url::Url;

mod config;
pub mod error;

trait GcpOptions {
fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String>;
}

impl GcpOptions for StorageOptions {
fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String> {
self.0
.iter()
.filter_map(|(key, value)| {
Some((
GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
value.clone(),
))
})
.collect()
}
}

#[derive(Clone, Default, Debug)]
pub struct GcpFactory {}

impl ObjectStoreFactory for GcpFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?;
let (store, prefix) = parse_url_opts(url, config)?;
Ok((url_prefix_handler(store, prefix.clone())?, prefix))
}
}

impl LogStoreFactory for GcpFactory {
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(default_logstore(store, location, options))
}
}

/// Register an [ObjectStoreFactory] for common Google Cloud [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let factory = Arc::new(GcpFactory {});
let scheme = &"gs";
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), factory.clone());
logstores().insert(url.clone(), factory.clone());
}
Loading

0 comments on commit ef8db1a

Please sign in to comment.