Skip to content

Commit

Permalink
add tests and generalize to other special characters
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Apr 27, 2023
1 parent 669884c commit 214d9ec
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 38 deletions.
161 changes: 130 additions & 31 deletions rust/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Create or load DeltaTables

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -345,38 +346,75 @@ pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<Str
.map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned()))
}

/// Attempt to create a Url from given table location.
///
/// The location could be:
/// * A valid URL, which will be parsed and returned
/// * A path to a directory, which will be created and then converted to a URL.
///
/// If it is a local path, it will be created if it doesn't exist.
///
/// Extra slashes will be removed from the end path as well.
///
/// Will return an error if the location is not valid. For example,
pub(crate) fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();
if let Ok(path) = std::fs::canonicalize(table_uri) {
return Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()));

enum UriType {
LocalPath(PathBuf),
Url(Url),
}
if let Ok(url) = Url::parse(table_uri) {
return match url.scheme() {
"file" => ensure_table_uri(
url.to_file_path()
.and_then(|path| path.to_str().map(String::from).ok_or(()))
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?,
),
_ => {
let mut new_url = url.clone();
new_url.set_path(url.path().trim_end_matches('/'));
Ok(new_url)

let uri_type: UriType = if let Ok(url) = Url::parse(table_uri) {
if url.scheme() == "file" {
UriType::LocalPath(url.to_file_path().map_err(|err| {
let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err);
DeltaTableError::InvalidTableLocation(msg)
})?)
} else {
UriType::Url(url)
}
} else {
UriType::LocalPath(PathBuf::from(table_uri))
};

// If it is a local path, we need to create it if it does not exist.
let mut url = match uri_type {
UriType::LocalPath(path) => {
if !path.exists() {
std::fs::create_dir_all(&path).map_err(|err| {
let msg = format!(
"Could not create local directory: {}\nError: {:?}",
table_uri, err
);
DeltaTableError::InvalidTableLocation(msg)
})?;
}
};
}
// The table uri still might be a relative paths that does not exist.
std::fs::create_dir_all(table_uri)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?;
let path = std::fs::canonicalize(table_uri)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?;
Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))
let path = std::fs::canonicalize(path).map_err(|err| {
let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err);
DeltaTableError::InvalidTableLocation(msg)
})?;
Url::from_directory_path(path).map_err(|_| {
let msg = format!(
"Could not construct a URL from canonicalized path: {}.\n\
Something must be very wrong with the table path.",
table_uri
);
DeltaTableError::InvalidTableLocation(msg)
})?
}
UriType::Url(url) => url,
};

let trimmed_path = url.path().trim_end_matches('/').to_owned();
url.set_path(&trimmed_path);
Ok(url)
}

#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;

#[test]
fn test_ensure_table_uri() {
Expand All @@ -387,13 +425,74 @@ mod tests {
assert!(uri.is_ok());
let uri = ensure_table_uri("s3://container/path");
assert!(uri.is_ok());
let uri = ensure_table_uri("file:///").unwrap();
assert_eq!("file:///", uri.as_str());
let uri = ensure_table_uri("memory://").unwrap();
assert_eq!("memory://", uri.as_str());
let uri = ensure_table_uri("s3://tests/data/delta-0.8.0/").unwrap();
assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str());
let _uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap();
assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str())

// These cases should all roundtrip to themselves
let roundtrip_cases = &[
"s3://tests/data/delta-0.8.0",
"memory://",
"file:///",
"s3://bucket/my%20table", // Doesn't double-encode
];

for case in roundtrip_cases {
let uri = ensure_table_uri(case).unwrap();
assert_eq!(case, &uri.as_str());
}

// Other cases
let map_cases = &[
// extra slashes are removed
(
"s3://tests/data/delta-0.8.0//",
"s3://tests/data/delta-0.8.0",
),
("s3://bucket/my table", "s3://bucket/my%20table"),
];

for (case, expected) in map_cases {
let uri = ensure_table_uri(case).unwrap();
assert_eq!(expected, &uri.as_str());
}
}

#[test]
fn test_ensure_table_uri_path() {
let tmp_dir = tempdir::TempDir::new("test").unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
let paths = &[
tmp_path.join("data/delta-0.8.0"),
tmp_path.join("space in path"),
tmp_path.join("special&chars/你好/😊"),
];

for path in paths {
assert!(!path.exists());
let expected = Url::parse(&format!("file://{}", path.to_str().unwrap())).unwrap();
let uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap();
assert_eq!(expected, uri);
assert!(path.exists());
}

// Creates non-existent relative directories
let relative_path = Path::new("_tmp/test %3F");
assert!(!relative_path.exists());
ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap();
assert!(relative_path.exists());
std::fs::remove_dir_all(relative_path).unwrap();
}

#[test]
fn test_ensure_table_uri_url() {
// Urls should round trips as-is
let expected = Url::parse("s3://tests/data/delta-0.8.0").unwrap();
let url = ensure_table_uri(&expected).unwrap();
assert_eq!(expected, url);

let tmp_dir = tempdir::TempDir::new("test").unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
let path = tmp_path.join("data/delta-0.8.0");
let expected = Url::parse(&format!("file://{}", path.to_str().unwrap())).unwrap();
let url = ensure_table_uri(&expected).unwrap();
assert_eq!(expected, url);
}
}
38 changes: 36 additions & 2 deletions rust/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
use std::str::FromStr;

/// Options used for configuring backend storage
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct StorageOptions(pub HashMap<String, String>);

impl StorageOptions {
Expand Down Expand Up @@ -246,10 +246,44 @@ fn url_prefix_handler<T: ObjectStore>(
store: T,
storage_url: &Url,
) -> DeltaResult<Arc<DynObjectStore>> {
let prefix = Path::from(storage_url.path().replace("%20", " ")); // allow spaces
let prefix = Path::parse(storage_url.path())?;
if prefix != Path::from("/") {
Ok(Arc::new(PrefixStore::new(store, prefix)))
} else {
Ok(Arc::new(store))
}
}

#[cfg(test)]
mod test {
use crate::ensure_table_uri;

use super::*;

#[tokio::test]
async fn test_configure_store_local() -> Result<(), Box<dyn std::error::Error + 'static>> {
let temp_dir = tempfile::tempdir().unwrap();
let temp_dir_path = temp_dir.path();
let path = temp_dir_path.join("test space 😁");

let table_uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap();

let store = configure_store(&table_uri, &StorageOptions::default()).unwrap();

let contents = b"test";
let key = "test.txt";
let file_path = path.join(key);
std::fs::write(&file_path, contents).unwrap();

let res = store
.get(&object_store::path::Path::from(key))
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(res.as_ref(), contents);

Ok(())
}
}
57 changes: 57 additions & 0 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ impl IntegrationContext {
std::fs::create_dir_all(&dest_path)?;
copy(table.as_path(), &dest_path, &options)?;
}
StorageIntegration::Amazon => {
let dest = format!("{}/{}", self.root_uri(), name.as_ref());
s3_cli::copy_directory(table.as_path(), dest)?;
}
StorageIntegration::Microsoft => {
let dest = format!("{}/{}", self.bucket, name.as_ref());
az_cli::copy_directory(table.as_path(), dest)?;
}
_ => {
let from = table.as_path().as_str().to_owned();
let to = format!("{}/{}", self.root_uri(), name.as_ref());
Expand Down Expand Up @@ -203,6 +211,7 @@ pub enum TestTables {
SimpleCommit,
Golden,
Delta0_8_0Partitioned,
Delta0_8_0SpecialPartitioned,
Custom(String),
}

Expand All @@ -226,6 +235,11 @@ impl TestTables {
.to_str()
.unwrap()
.to_owned(),
Self::Delta0_8_0SpecialPartitioned => data_path
.join("delta-0.8.0-special-partition")
.to_str()
.unwrap()
.to_owned(),
// the data path for upload does not apply to custom tables.
Self::Custom(_) => todo!(),
}
Expand All @@ -237,6 +251,7 @@ impl TestTables {
Self::SimpleCommit => "simple_commit".into(),
Self::Golden => "golden".into(),
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
Self::Delta0_8_0SpecialPartitioned => "delta-0.8.0-special-partition".into(),
Self::Custom(name) => name.to_owned(),
}
}
Expand Down Expand Up @@ -284,6 +299,26 @@ pub mod az_cli {
child.wait()
}

/// copy directory
pub fn copy_directory(
source: impl AsRef<str>,
destination: impl AsRef<str>,
) -> std::io::Result<ExitStatus> {
let mut child = Command::new("az")
.args([
"storage",
"blob",
"upload-batch",
"-s",
source.as_ref(),
"-d",
destination.as_ref(),
])
.spawn()
.expect("az command is installed");
child.wait()
}

/// prepare_env
pub fn prepare_env() {
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "1");
Expand Down Expand Up @@ -341,6 +376,28 @@ pub mod s3_cli {
child.wait()
}

/// copy directory
pub fn copy_directory(
source: impl AsRef<str>,
destination: impl AsRef<str>,
) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"cp",
source.as_ref(),
destination.as_ref(),
"--endpoint-url",
&endpoint,
"--recursive",
])
.spawn()
.expect("aws command is installed");
child.wait()
}

/// prepare_env
pub fn prepare_env() {
set_env_if_not_set(
Expand Down
1 change: 1 addition & 0 deletions rust/tests/integration_concurrent_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async fn test_concurrent_writes_local() -> TestResult {
Ok(())
}

#[cfg(feature = "s3")]
#[tokio::test]
async fn concurrent_writes_s3() -> TestResult {
test_concurrent_writes(StorageIntegration::Amazon).await?;
Expand Down
29 changes: 28 additions & 1 deletion rust/tests/integration_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use bytes::Bytes;
use deltalake::storage::utils::flatten_list_stream;
use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult};
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaTableBuilder, ObjectStore};
use object_store::{path::Path, DynObjectStore, Error as ObjectStoreError};
use serial_test::serial;

Expand All @@ -22,6 +22,7 @@ async fn test_object_store_azure() -> TestResult {
Ok(())
}

#[cfg(feature = "s3")]
#[tokio::test]
#[serial]
async fn test_object_store_aws() -> TestResult {
Expand Down Expand Up @@ -415,3 +416,29 @@ async fn delete_fixtures(storage: &DynObjectStore) -> TestResult {
}
Ok(())
}

#[tokio::test]
#[serial]
async fn test_object_store_prefixes_local() -> TestResult {
test_object_store_prefixes(StorageIntegration::Local).await?;
Ok(())
}

async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(integration)?;
let prefixes = &["table path", "table path/hello%3F", "你好/😊"];
for prefix in prefixes {
let rooturi = format!("{}/{}", context.root_uri(), prefix);
let delta_store = DeltaTableBuilder::from_uri(&rooturi)
.with_allow_http(true)
.build_storage()?;

let contents = Bytes::from("cats");
let path = Path::from("test");
delta_store.put(&path, contents.clone()).await?;
let data = delta_store.get(&path).await?.bytes().await?;
assert_eq!(&data, &contents);
}

Ok(())
}
Loading

0 comments on commit 214d9ec

Please sign in to comment.