Skip to content

Commit

Permalink
feat: improve storage location handling (#1065)
Browse files Browse the repository at this point in the history
# Description

This PR contains some improvements and refactoring for handling storage
locations.

- Removes the `StorageLocation` struct (a left-over from previous clean
up)
- allows for creating tables using local file paths (including relative)
- persists options during serialization (this will not work for custom
storage backends, but still extends what the previous approach could do)
- adopts `PrefixObjectStore` from upstream crate in favour of
maintaining that logic here.
- run `cargo clippy --fix` on `/rust`

# Related Issue(s)

Closes #998

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored Jan 15, 2023
1 parent 3c99d97 commit 8e0b8cb
Show file tree
Hide file tree
Showing 14 changed files with 489 additions and 537 deletions.
274 changes: 92 additions & 182 deletions rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,14 @@ use std::sync::Arc;

use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
use crate::schema::DeltaDataTypeVersion;
use crate::storage::config::{StorageLocation, StorageOptions};
use crate::storage::file::FileStorageBackend;
use crate::storage::config::StorageOptions;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};

use chrono::{DateTime, FixedOffset, Utc};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{DynObjectStore, Error as ObjectStoreError, Result as ObjectStoreResult};
use object_store::DynObjectStore;
use serde::{Deserialize, Serialize};
use url::Url;

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use crate::storage::s3::{S3StorageBackend, S3StorageOptions};
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "azure")]
use object_store::azure::MicrosoftAzureBuilder;
#[cfg(feature = "gcs")]
use object_store::gcp::GoogleCloudStorageBuilder;

#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
enum BuilderError {
Expand All @@ -38,6 +26,8 @@ enum BuilderError {
Decode(String),
#[error("Delta-rs must be build with feature '{feature}' to support url: {url}.")]
MissingFeature { feature: &'static str, url: String },
#[error("Failed to parse table uri")]
TableUri(#[from] url::ParseError),
}

impl From<BuilderError> for DeltaTableError {
Expand Down Expand Up @@ -98,7 +88,7 @@ pub struct DeltaTableLoadOptions {
/// table root uri
pub table_uri: String,
/// backend to access storage system
pub storage_backend: Option<(Arc<DynObjectStore>, Path)>,
pub storage_backend: Option<(Arc<DynObjectStore>, Url)>,
/// specify the version we are going to load: a time stamp, a version, or just the newest
/// available version
pub version: DeltaVersion,
Expand Down Expand Up @@ -168,7 +158,7 @@ impl DeltaTableBuilder {
}

/// specify the timestamp given as ISO-8601/RFC-3339 timestamp
pub fn with_datestring(self, date_string: impl AsRef<str>) -> Result<Self, DeltaTableError> {
pub fn with_datestring(self, date_string: impl AsRef<str>) -> DeltaResult<Self> {
let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(
date_string.as_ref(),
)?);
Expand All @@ -183,13 +173,14 @@ impl DeltaTableBuilder {

/// Set the storage backend.
///
/// `table_root` denotes the [object_store::path::Path] within the store to the root of the delta.
/// This is required since we cannot infer the relative location of the table from the `table_uri`
/// For non-standard object store implementations.
///
/// If a backend is not provided then it is derived from `table_uri`.
pub fn with_storage_backend(mut self, storage: Arc<DynObjectStore>, table_root: &Path) -> Self {
self.options.storage_backend = Some((storage, table_root.clone()));
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storagle location of `storage`.
pub fn with_storage_backend(mut self, storage: Arc<DynObjectStore>, location: Url) -> Self {
self.options.storage_backend = Some((storage, location));
self
}

Expand All @@ -214,53 +205,49 @@ impl DeltaTableBuilder {
self
}

/// Build a delta storage backend for the given config
pub fn build_storage(self) -> Result<ObjectStoreRef, DeltaTableError> {
let (storage, storage_url) = match self.options.storage_backend {
// Some(storage) => storage,
None => get_storage_backend(
&self.options.table_uri,
self.storage_options,
self.allow_http,
)?,
_ => todo!(),
/// Storage options for configuring backend object store
pub fn storage_options(&self) -> StorageOptions {
let mut storage_options = self.storage_options.clone().unwrap_or_default();
if let Some(allow) = self.allow_http {
storage_options.insert(
"allow_http".into(),
if allow { "true" } else { "false" }.into(),
);
};
let object_store = Arc::new(DeltaObjectStore::new(storage_url, storage));
Ok(object_store)
storage_options.into()
}

/// Build a delta storage backend for the given config
pub fn build_storage(self) -> DeltaResult<ObjectStoreRef> {
match self.options.storage_backend {
Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new(
storage,
ensure_table_uri(location.as_str())?,
))),
None => {
let location = ensure_table_uri(&self.options.table_uri)?;
Ok(Arc::new(DeltaObjectStore::try_new(
location,
self.storage_options(),
)?))
}
}
}

/// Build the [`DeltaTable`] from specified options.
///
/// This will not load the log, i.e. the table is not initialized. To get an initialized
/// table use the `load` function
pub fn build(self) -> Result<DeltaTable, DeltaTableError> {
let (storage, storage_url) = match self.options.storage_backend {
Some((store, path)) => {
let mut uri = self.options.table_uri + path.as_ref();
if !uri.contains(':') {
uri = format!("file://{}", uri);
}
let url = Url::parse(uri.as_str())
.map_err(|_| DeltaTableError::Generic(format!("Can't parse uri: {}", uri)))?;
let url = StorageLocation::new(url);
(store, url)
}
None => get_storage_backend(
&self.options.table_uri,
self.storage_options,
self.allow_http,
)?,
};
pub fn build(self) -> DeltaResult<DeltaTable> {
let config = DeltaTableConfig {
require_tombstones: self.options.require_tombstones,
require_files: self.options.require_files,
};
let object_store = Arc::new(DeltaObjectStore::new(storage_url, storage));
Ok(DeltaTable::new(object_store, config))
Ok(DeltaTable::new(self.build_storage()?, config))
}

/// Build the [`DeltaTable`] and load its state
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
pub async fn load(self) -> DeltaResult<DeltaTable> {
let version = self.options.version.clone();
let mut table = self.build()?;
match version {
Expand All @@ -272,132 +259,6 @@ impl DeltaTableBuilder {
}
}

enum ObjectStoreKind {
Local,
InMemory,
S3,
Google,
Azure,
}

impl ObjectStoreKind {
pub fn parse_url(url: &Url) -> ObjectStoreResult<Self> {
match url.scheme() {
"file" => Ok(ObjectStoreKind::Local),
"memory" => Ok(ObjectStoreKind::InMemory),
"az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure),
"s3" | "s3a" => Ok(ObjectStoreKind::S3),
"gs" => Ok(ObjectStoreKind::Google),
"https" => {
let host = url.host_str().unwrap_or_default();
if host.contains("amazonaws.com") {
Ok(ObjectStoreKind::S3)
} else if host.contains("dfs.core.windows.net")
|| host.contains("blob.core.windows.net")
{
Ok(ObjectStoreKind::Azure)
} else {
Err(ObjectStoreError::NotImplemented)
}
}
_ => Err(ObjectStoreError::NotImplemented),
}
}
}

/// Create a new storage backend used in Delta table
pub(crate) fn get_storage_backend(
table_uri: impl AsRef<str>,
// annotation needed for some feature builds
#[allow(unused_variables)] options: Option<HashMap<String, String>>,
#[allow(unused_variables)] allow_http: Option<bool>,
) -> DeltaResult<(Arc<DynObjectStore>, StorageLocation)> {
let storage_url = StorageLocation::parse(table_uri)?;
let mut options = options.unwrap_or_default();
if let Some(allow) = allow_http {
options.insert(
"allow_http".into(),
if allow { "true" } else { "false" }.into(),
);
}
let _options = StorageOptions::new(options);

match ObjectStoreKind::parse_url(&storage_url.url)? {
ObjectStoreKind::Local => Ok((Arc::new(FileStorageBackend::new()), storage_url)),
ObjectStoreKind::InMemory => Ok((Arc::new(InMemory::new()), storage_url)),
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
ObjectStoreKind::S3 => {
let store = AmazonS3Builder::new()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_s3_options())?
.with_allow_http(_options.allow_http())
.build()
.or_else(|_| {
AmazonS3Builder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_s3_options())?
.with_allow_http(_options.allow_http())
.build()
})?;
Ok((
Arc::new(S3StorageBackend::try_new(
Arc::new(store),
S3StorageOptions::from_map(&_options.0),
)?),
storage_url,
))
}
#[cfg(not(any(feature = "s3", feature = "s3-rustls")))]
ObjectStoreKind::S3 => Err(BuilderError::MissingFeature {
feature: "s3",
url: storage_url.as_ref().into(),
}
.into()),
#[cfg(feature = "azure")]
ObjectStoreKind::Azure => {
let store = MicrosoftAzureBuilder::new()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_azure_options())?
.with_allow_http(_options.allow_http())
.build()
.or_else(|_| {
MicrosoftAzureBuilder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_azure_options())?
.with_allow_http(_options.allow_http())
.build()
})?;
Ok((Arc::new(store), storage_url))
}
#[cfg(not(feature = "azure"))]
ObjectStoreKind::Azure => Err(BuilderError::MissingFeature {
feature: "azure",
url: storage_url.as_ref().into(),
}
.into()),
#[cfg(feature = "gcs")]
ObjectStoreKind::Google => {
let store = GoogleCloudStorageBuilder::new()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_gcs_options())?
.build()
.or_else(|_| {
GoogleCloudStorageBuilder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_gcs_options())?
.build()
})?;
Ok((Arc::new(store), storage_url))
}
#[cfg(not(feature = "gcs"))]
ObjectStoreKind::Google => Err(BuilderError::MissingFeature {
feature: "gcs",
url: storage_url.as_ref().into(),
}
.into()),
}
}

/// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions].
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
/// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename.
Expand Down Expand Up @@ -432,7 +293,7 @@ pub mod s3_storage_options {
/// Hence, the `connection closed before message completed` could occur.
/// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise.
pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS";
/// The `pool_idle_timeout` for the as3_storage_optionsws sts client. See
/// The `pool_idle_timeout` for the as3_storage_options sts client. See
/// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`.
pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS";
/// The number of retries for S3 GET requests failed with 500 Internal Server Error.
Expand Down Expand Up @@ -488,3 +349,52 @@ pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<Str
map.get(key)
.map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned()))
}

pub(crate) fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();
if let Ok(path) = std::fs::canonicalize(table_uri) {
return Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()));
}
if let Ok(url) = Url::parse(table_uri) {
return Ok(match url.scheme() {
"file" => url,
_ => {
let mut new_url = url.clone();
new_url.set_path(url.path().trim_end_matches('/'));
new_url
}
});
}
// The table uri still might be a relative paths that does not exist.
std::fs::create_dir_all(table_uri)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?;
let path = std::fs::canonicalize(table_uri)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?;
Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_ensure_table_uri() {
// parse an exisiting relative directory
let uri = ensure_table_uri(".");
assert!(uri.is_ok());
let _uri = ensure_table_uri("./nonexistent");
assert!(uri.is_ok());
let uri = ensure_table_uri("s3://container/path");
assert!(uri.is_ok());
let uri = ensure_table_uri("file:///").unwrap();
assert_eq!("file:///", uri.as_str());
let uri = ensure_table_uri("memory://").unwrap();
assert_eq!("memory://", uri.as_str());
let uri = ensure_table_uri("s3://tests/data/delta-0.8.0/").unwrap();
assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str());
let _uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap();
assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str())
}
}
16 changes: 14 additions & 2 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ pub enum DeltaTableError {
/// Error returned when user attempts to commit actions that don't belong to the next version.
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion),
/// A Feature is missing to perform operation
#[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
MissingFeature {
/// Name of the missiing feature
feature: &'static str,
/// Storage location url
url: String,
},
/// A Feature is missing to perform operation
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
Expand Down Expand Up @@ -1511,10 +1522,11 @@ mod tests {

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[test]
fn normalize_table_uri() {
fn normalize_table_uri_s3() {
std::env::set_var("AWS_DEFAULT_REGION", "us-east-1");
for table_uri in [
"s3://tests/data/delta-0.8.0/",
// "s3://tests/data/delta-0.8.0//",
"s3://tests/data/delta-0.8.0//",
"s3://tests/data/delta-0.8.0",
]
.iter()
Expand Down
Loading

0 comments on commit 8e0b8cb

Please sign in to comment.