Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add deltalake-gcp crate #2061

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ datafusion = [
"parquet",
]
datafusion-ext = ["datafusion"]
gcs = ["object_store/gcp"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
unity-experimental = ["reqwest", "hyper"]
Expand Down
31 changes: 31 additions & 0 deletions crates/deltalake-gcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "deltalake-gcp"
version = "0.1.0"
edition = "2021"

[dependencies]
deltalake-core = { path = "../deltalake-core" }
lazy_static = "1"

# workspace depenndecies
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true, features = ["gcp"]}
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../deltalake-test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }
tempfile = "3"

[features]
integration_test = []
163 changes: 163 additions & 0 deletions crates/deltalake-gcp/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! Auxiliary module for generating a valig Google cloud configuration.
//!
//! Google offers few ways to authenticate against storage accounts and
//! provide credentials for a service principal. Some of this configutaion may
//! partially be specified in the environment. This module establishes a structured
//! way how we discover valid credentials and some heuristics on how they are prioritized.
use std::collections::{hash_map::Entry, HashMap};
use std::str::FromStr;

use object_store::gcp::GoogleConfigKey;
use object_store::Error as ObjectStoreError;

use crate::error::Result;

lazy_static::lazy_static! {
static ref CREDENTIAL_KEYS: Vec<GoogleConfigKey> =
Vec::from_iter([
GoogleConfigKey::ServiceAccountKey,
GoogleConfigKey::ApplicationCredentials,
]);
}

/// Credential
enum GcpCredential {
/// Using the service account key
ServiceAccountKey,
/// Using application credentials
ApplicationCredentials,
}

impl GcpCredential {
/// required configuration keys for variant
fn keys(&self) -> Vec<GoogleConfigKey> {
match self {
Self::ServiceAccountKey => Vec::from_iter([GoogleConfigKey::ServiceAccountKey]),
Self::ApplicationCredentials => {
Vec::from_iter([GoogleConfigKey::ApplicationCredentials])
}
}
}
}

/// Helper struct to create full configuration from passed options and environment
///
/// Main concern is to pick the desired credential for connecting to starage backend
/// based on a provided configuration and configuration set in the environment.
pub(crate) struct GcpConfigHelper {
config: HashMap<GoogleConfigKey, String>,
env_config: HashMap<GoogleConfigKey, String>,
priority: Vec<GcpCredential>,
}

impl GcpConfigHelper {
/// Create a new [`ConfigHelper`]
pub fn try_new(
config: impl IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
) -> Result<Self> {
let mut env_config = HashMap::new();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if key.starts_with("GOOGLE_") {
if let Ok(config_key) = GoogleConfigKey::from_str(&key.to_ascii_lowercase()) {
env_config.insert(config_key, value.to_string());
}
}
}
}

Ok(Self {
config: config
.into_iter()
.map(|(key, value)| Ok((GoogleConfigKey::from_str(key.as_ref())?, value.into())))
.collect::<Result<_, ObjectStoreError>>()?,
env_config,
priority: Vec::from_iter([
GcpCredential::ServiceAccountKey,
GcpCredential::ApplicationCredentials,
]),
})
}

/// Check if all credential keys are contained in passed config
fn has_full_config(&self, cred: &GcpCredential) -> bool {
cred.keys().iter().all(|key| self.config.contains_key(key))
}

/// Check if any credential keys are contained in passed config
fn has_any_config(&self, cred: &GcpCredential) -> bool {
cred.keys().iter().any(|key| self.config.contains_key(key))
}

/// Check if all credential keys can be provided using the env
fn has_full_config_with_env(&self, cred: &GcpCredential) -> bool {
cred.keys()
.iter()
.all(|key| self.config.contains_key(key) || self.env_config.contains_key(key))
}

/// Generate a cofiguration augmented with options from the environment
pub fn build(mut self) -> Result<HashMap<GoogleConfigKey, String>> {
let mut has_credential = false;

// try using only passed config options
if !has_credential {
for cred in &self.priority {
if self.has_full_config(cred) {
has_credential = true;
break;
}
}
}

// try partially avaialbe credentials augmented by environment
if !has_credential {
for cred in &self.priority {
if self.has_any_config(cred) && self.has_full_config_with_env(cred) {
for key in cred.keys() {
if let Entry::Vacant(e) = self.config.entry(key) {
e.insert(self.env_config.get(&key).unwrap().to_owned());
}
}
has_credential = true;
break;
}
}
}

// try getting credentials only from the environment
if !has_credential {
for cred in &self.priority {
if self.has_full_config_with_env(cred) {
for key in cred.keys() {
if let Entry::Vacant(e) = self.config.entry(key) {
e.insert(self.env_config.get(&key).unwrap().to_owned());
}
}
has_credential = true;
break;
}
}
}

let omit_keys = if has_credential {
CREDENTIAL_KEYS.clone()
} else {
Vec::new()
};

// Add keys from the environment to the configuration, as e.g. client configuration options.
// NOTE We have to specifically configure omitting keys, since workload identity can
// work purely using defaults, but partial config may be present in the environment.
// Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store)
for key in self.env_config.keys() {
if !omit_keys.contains(key) {
if let Entry::Vacant(e) = self.config.entry(*key) {
e.insert(self.env_config.get(key).unwrap().to_owned());
}
}
}

Ok(self.config)
}
}
21 changes: 21 additions & 0 deletions crates/deltalake-gcp/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use deltalake_core::errors::DeltaTableError;

pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("failed to parse config: {0}")]
Parse(String),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),
}

impl From<Error> for DeltaTableError {
fn from(e: Error) -> Self {
match e {
Error::Parse(msg) => DeltaTableError::Generic(msg),
Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e },
}
}
}
68 changes: 68 additions & 0 deletions crates/deltalake-gcp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::gcp::GoogleConfigKey;
use object_store::parse_url_opts;
use url::Url;

mod config;
pub mod error;

trait GcpOptions {
fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String>;
}

impl GcpOptions for StorageOptions {
fn as_gcp_options(&self) -> HashMap<GoogleConfigKey, String> {
self.0
.iter()
.filter_map(|(key, value)| {
Some((
GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
value.clone(),
))
})
.collect()
}
}

#[derive(Clone, Default, Debug)]
pub struct GcpFactory {}

impl ObjectStoreFactory for GcpFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?;
let (store, prefix) = parse_url_opts(url, config)?;
Ok((url_prefix_handler(store, prefix.clone())?, prefix))
}
}

impl LogStoreFactory for GcpFactory {
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(default_logstore(store, location, options))
}
}

/// Register an [ObjectStoreFactory] for common Google Cloud [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let factory = Arc::new(GcpFactory {});
let scheme = &"gs";
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), factory.clone());
logstores().insert(url.clone(), factory.clone());
}
Loading
Loading